aboutsummaryrefslogtreecommitdiff
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Kconfig12
-rw-r--r--net/tipc/addr.c15
-rw-r--r--net/tipc/addr.h17
-rw-r--r--net/tipc/bcast.c47
-rw-r--r--net/tipc/bcast.h3
-rw-r--r--net/tipc/bearer.c116
-rw-r--r--net/tipc/bearer.h73
-rw-r--r--net/tipc/config.c31
-rw-r--r--net/tipc/core.c9
-rw-r--r--net/tipc/core.h4
-rw-r--r--net/tipc/discover.c140
-rw-r--r--net/tipc/discover.h9
-rw-r--r--net/tipc/link.c130
-rw-r--r--net/tipc/link.h29
-rw-r--r--net/tipc/msg.c41
-rw-r--r--net/tipc/msg.h64
-rw-r--r--net/tipc/name_distr.c18
-rw-r--r--net/tipc/net.c32
-rw-r--r--net/tipc/net.h19
-rw-r--r--net/tipc/node.c125
-rw-r--r--net/tipc/node.h36
-rw-r--r--net/tipc/node_subscr.c21
-rw-r--r--net/tipc/node_subscr.h3
-rw-r--r--net/tipc/port.c306
-rw-r--r--net/tipc/port.h73
-rw-r--r--net/tipc/socket.c76
-rw-r--r--net/tipc/subscr.c13
27 files changed, 644 insertions, 818 deletions
diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig
index 0436927369f..2c5954b8593 100644
--- a/net/tipc/Kconfig
+++ b/net/tipc/Kconfig
@@ -29,18 +29,6 @@ config TIPC_ADVANCED
Saying Y here will open some advanced configuration for TIPC.
Most users do not need to bother; if unsure, just say N.
-config TIPC_NODES
- int "Maximum number of nodes in a cluster"
- depends on TIPC_ADVANCED
- range 8 2047
- default "255"
- help
- Specifies how many nodes can be supported in a TIPC cluster.
- Can range from 8 to 2047 nodes; default is 255.
-
- Setting this to a smaller value saves some memory;
- setting it to higher allows for more nodes.
-
config TIPC_PORTS
int "Maximum number of ports in a node"
depends on TIPC_ADVANCED
diff --git a/net/tipc/addr.c b/net/tipc/addr.c
index 88463d9a6f1..a6fdab33877 100644
--- a/net/tipc/addr.c
+++ b/net/tipc/addr.c
@@ -2,7 +2,7 @@
* net/tipc/addr.c: TIPC address utility routines
*
* Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2004-2005, Wind River Systems
+ * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,7 +41,7 @@
* tipc_addr_domain_valid - validates a network domain address
*
* Accepts <Z.C.N>, <Z.C.0>, <Z.0.0>, and <0.0.0>,
- * where Z, C, and N are non-zero and do not exceed the configured limits.
+ * where Z, C, and N are non-zero.
*
* Returns 1 if domain address is valid, otherwise 0
*/
@@ -51,10 +51,6 @@ int tipc_addr_domain_valid(u32 addr)
u32 n = tipc_node(addr);
u32 c = tipc_cluster(addr);
u32 z = tipc_zone(addr);
- u32 max_nodes = tipc_max_nodes;
-
- if (n > max_nodes)
- return 0;
if (n && (!z || !c))
return 0;
@@ -66,8 +62,7 @@ int tipc_addr_domain_valid(u32 addr)
/**
* tipc_addr_node_valid - validates a proposed network address for this node
*
- * Accepts <Z.C.N>, where Z, C, and N are non-zero and do not exceed
- * the configured limits.
+ * Accepts <Z.C.N>, where Z, C, and N are non-zero.
*
* Returns 1 if address can be used, otherwise 0
*/
@@ -81,9 +76,9 @@ int tipc_in_scope(u32 domain, u32 addr)
{
if (!domain || (domain == addr))
return 1;
- if (domain == (addr & 0xfffff000u)) /* domain <Z.C.0> */
+ if (domain == tipc_cluster_mask(addr)) /* domain <Z.C.0> */
return 1;
- if (domain == (addr & 0xff000000u)) /* domain <Z.0.0> */
+ if (domain == tipc_zone_mask(addr)) /* domain <Z.0.0> */
return 1;
return 0;
}
diff --git a/net/tipc/addr.h b/net/tipc/addr.h
index 2490fadd0ca..8971aba99ae 100644
--- a/net/tipc/addr.h
+++ b/net/tipc/addr.h
@@ -37,6 +37,16 @@
#ifndef _TIPC_ADDR_H
#define _TIPC_ADDR_H
+static inline u32 tipc_zone_mask(u32 addr)
+{
+ return addr & 0xff000000u;
+}
+
+static inline u32 tipc_cluster_mask(u32 addr)
+{
+ return addr & 0xfffff000u;
+}
+
static inline int in_own_cluster(u32 addr)
{
return !((addr ^ tipc_own_addr) >> 12);
@@ -49,14 +59,13 @@ static inline int in_own_cluster(u32 addr)
* after a network hop.
*/
-static inline int addr_domain(int sc)
+static inline u32 addr_domain(u32 sc)
{
if (likely(sc == TIPC_NODE_SCOPE))
return tipc_own_addr;
if (sc == TIPC_CLUSTER_SCOPE)
- return tipc_addr(tipc_zone(tipc_own_addr),
- tipc_cluster(tipc_own_addr), 0);
- return tipc_addr(tipc_zone(tipc_own_addr), 0, 0);
+ return tipc_cluster_mask(tipc_own_addr);
+ return tipc_zone_mask(tipc_own_addr);
}
int tipc_addr_domain_valid(u32);
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 70ab5ef4876..7dc1dc7151e 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -3,7 +3,7 @@
*
* Copyright (c) 2004-2006, Ericsson AB
* Copyright (c) 2004, Intel Corporation.
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -61,8 +61,8 @@
*/
struct bcbearer_pair {
- struct bearer *primary;
- struct bearer *secondary;
+ struct tipc_bearer *primary;
+ struct tipc_bearer *secondary;
};
/**
@@ -81,7 +81,7 @@ struct bcbearer_pair {
*/
struct bcbearer {
- struct bearer bearer;
+ struct tipc_bearer bearer;
struct media media;
struct bcbearer_pair bpairs[MAX_BEARERS];
struct bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
@@ -93,6 +93,7 @@ struct bcbearer {
* struct bclink - link used for broadcast messages
* @link: (non-standard) broadcast link structure
* @node: (non-standard) node structure representing b'cast link's peer node
+ * @retransmit_to: node that most recently requested a retransmit
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
@@ -100,6 +101,7 @@ struct bcbearer {
struct bclink {
struct link link;
struct tipc_node node;
+ struct tipc_node *retransmit_to;
};
@@ -184,6 +186,17 @@ static int bclink_ack_allowed(u32 n)
/**
+ * tipc_bclink_retransmit_to - get most recent node to request retransmission
+ *
+ * Called with bc_lock locked
+ */
+
+struct tipc_node *tipc_bclink_retransmit_to(void)
+{
+ return bclink->retransmit_to;
+}
+
+/**
* bclink_retransmit_pkt - retransmit broadcast packets
* @after: sequence number of last packet to *not* retransmit
* @to: sequence number of last packet to retransmit
@@ -285,6 +298,7 @@ static void bclink_send_nack(struct tipc_node *n_ptr)
msg = buf_msg(buf);
tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
INT_H_SIZE, n_ptr->addr);
+ msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
@@ -405,8 +419,6 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
else
bclink_set_last_sent();
- if (bcl->out_queue_size > bcl->stats.max_queue_sz)
- bcl->stats.max_queue_sz = bcl->out_queue_size;
bcl->stats.queue_sz_counts++;
bcl->stats.accu_queue_sz += bcl->out_queue_size;
@@ -444,10 +456,9 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
tipc_node_unlock(node);
spin_lock_bh(&bc_lock);
bcl->stats.recv_nacks++;
- bcl->owner->next = node; /* remember requestor */
+ bclink->retransmit_to = node;
bclink_retransmit_pkt(msg_bcgap_after(msg),
msg_bcgap_to(msg));
- bcl->owner->next = NULL;
spin_unlock_bh(&bc_lock);
} else {
tipc_bclink_peek_nack(msg_destnode(msg),
@@ -574,8 +585,8 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
bcbearer->remains = tipc_bcast_nmap;
for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
- struct bearer *p = bcbearer->bpairs[bp_index].primary;
- struct bearer *s = bcbearer->bpairs[bp_index].secondary;
+ struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
+ struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
if (!p)
break; /* no more bearers to try */
@@ -584,11 +595,11 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
if (bcbearer->remains_new.count == bcbearer->remains.count)
continue; /* bearer pair doesn't add anything */
- if (p->publ.blocked ||
- p->media->send_msg(buf, &p->publ, &p->media->bcast_addr)) {
+ if (p->blocked ||
+ p->media->send_msg(buf, p, &p->media->bcast_addr)) {
/* unable to send on primary bearer */
- if (!s || s->publ.blocked ||
- s->media->send_msg(buf, &s->publ,
+ if (!s || s->blocked ||
+ s->media->send_msg(buf, s,
&s->media->bcast_addr)) {
/* unable to send on either bearer */
continue;
@@ -633,7 +644,7 @@ void tipc_bcbearer_sort(void)
memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
- struct bearer *b = &tipc_bearers[b_index];
+ struct tipc_bearer *b = &tipc_bearers[b_index];
if (!b->active || !b->nodes.count)
continue;
@@ -682,12 +693,12 @@ void tipc_bcbearer_sort(void)
void tipc_bcbearer_push(void)
{
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
spin_lock_bh(&bc_lock);
b_ptr = &bcbearer->bearer;
- if (b_ptr->publ.blocked) {
- b_ptr->publ.blocked = 0;
+ if (b_ptr->blocked) {
+ b_ptr->blocked = 0;
tipc_bearer_lock_push(b_ptr);
}
spin_unlock_bh(&bc_lock);
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 51f8c5326ce..500c97f1c85 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -2,7 +2,7 @@
* net/tipc/bcast.h: Include file for TIPC broadcast code
*
* Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -90,6 +90,7 @@ void tipc_port_list_free(struct port_list *pl_ptr);
int tipc_bclink_init(void);
void tipc_bclink_stop(void);
+struct tipc_node *tipc_bclink_retransmit_to(void);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
int tipc_bclink_send_msg(struct sk_buff *buf);
void tipc_bclink_recv_pkt(struct sk_buff *buf);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 837b7a46788..411719feb80 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -2,7 +2,7 @@
* net/tipc/bearer.c: TIPC bearer code
*
* Copyright (c) 1996-2006, Ericsson AB
- * Copyright (c) 2004-2006, Wind River Systems
+ * Copyright (c) 2004-2006, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -44,7 +44,7 @@
static struct media media_list[MAX_MEDIA];
static u32 media_count;
-struct bearer tipc_bearers[MAX_BEARERS];
+struct tipc_bearer tipc_bearers[MAX_BEARERS];
/**
* media_name_valid - validate media name
@@ -158,7 +158,6 @@ int tipc_register_media(u32 media_type,
m_ptr->disable_bearer = disable;
m_ptr->addr2str = addr2str;
memcpy(&m_ptr->bcast_addr, bcast_addr, sizeof(*bcast_addr));
- m_ptr->bcast = 1;
strcpy(m_ptr->name, name);
m_ptr->priority = bearer_priority;
m_ptr->tolerance = link_tolerance;
@@ -278,13 +277,13 @@ static int bearer_name_validate(const char *name,
* bearer_find - locates bearer object with matching bearer name
*/
-static struct bearer *bearer_find(const char *name)
+static struct tipc_bearer *bearer_find(const char *name)
{
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
u32 i;
for (i = 0, b_ptr = tipc_bearers; i < MAX_BEARERS; i++, b_ptr++) {
- if (b_ptr->active && (!strcmp(b_ptr->publ.name, name)))
+ if (b_ptr->active && (!strcmp(b_ptr->name, name)))
return b_ptr;
}
return NULL;
@@ -294,16 +293,16 @@ static struct bearer *bearer_find(const char *name)
* tipc_bearer_find_interface - locates bearer object with matching interface name
*/
-struct bearer *tipc_bearer_find_interface(const char *if_name)
+struct tipc_bearer *tipc_bearer_find_interface(const char *if_name)
{
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
char *b_if_name;
u32 i;
for (i = 0, b_ptr = tipc_bearers; i < MAX_BEARERS; i++, b_ptr++) {
if (!b_ptr->active)
continue;
- b_if_name = strchr(b_ptr->publ.name, ':') + 1;
+ b_if_name = strchr(b_ptr->name, ':') + 1;
if (!strcmp(b_if_name, if_name))
return b_ptr;
}
@@ -318,7 +317,7 @@ struct sk_buff *tipc_bearer_get_names(void)
{
struct sk_buff *buf;
struct media *m_ptr;
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
int i, j;
buf = tipc_cfg_reply_alloc(MAX_BEARERS * TLV_SPACE(TIPC_MAX_BEARER_NAME));
@@ -331,8 +330,8 @@ struct sk_buff *tipc_bearer_get_names(void)
b_ptr = &tipc_bearers[j];
if (b_ptr->active && (b_ptr->media == m_ptr)) {
tipc_cfg_append_tlv(buf, TIPC_TLV_BEARER_NAME,
- b_ptr->publ.name,
- strlen(b_ptr->publ.name) + 1);
+ b_ptr->name,
+ strlen(b_ptr->name) + 1);
}
}
}
@@ -340,14 +339,14 @@ struct sk_buff *tipc_bearer_get_names(void)
return buf;
}
-void tipc_bearer_add_dest(struct bearer *b_ptr, u32 dest)
+void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest)
{
tipc_nmap_add(&b_ptr->nodes, dest);
tipc_disc_update_link_req(b_ptr->link_req);
tipc_bcbearer_sort();
}
-void tipc_bearer_remove_dest(struct bearer *b_ptr, u32 dest)
+void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
{
tipc_nmap_remove(&b_ptr->nodes, dest);
tipc_disc_update_link_req(b_ptr->link_req);
@@ -362,12 +361,12 @@ void tipc_bearer_remove_dest(struct bearer *b_ptr, u32 dest)
* bearer.lock must be taken before calling
* Returns binary true(1) ore false(0)
*/
-static int bearer_push(struct bearer *b_ptr)
+static int bearer_push(struct tipc_bearer *b_ptr)
{
u32 res = 0;
struct link *ln, *tln;
- if (b_ptr->publ.blocked)
+ if (b_ptr->blocked)
return 0;
while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) {
@@ -382,13 +381,13 @@ static int bearer_push(struct bearer *b_ptr)
return list_empty(&b_ptr->cong_links);
}
-void tipc_bearer_lock_push(struct bearer *b_ptr)
+void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
{
int res;
- spin_lock_bh(&b_ptr->publ.lock);
+ spin_lock_bh(&b_ptr->lock);
res = bearer_push(b_ptr);
- spin_unlock_bh(&b_ptr->publ.lock);
+ spin_unlock_bh(&b_ptr->lock);
if (res)
tipc_bcbearer_push();
}
@@ -398,16 +397,14 @@ void tipc_bearer_lock_push(struct bearer *b_ptr)
* Interrupt enabling new requests after bearer congestion or blocking:
* See bearer_send().
*/
-void tipc_continue(struct tipc_bearer *tb_ptr)
+void tipc_continue(struct tipc_bearer *b_ptr)
{
- struct bearer *b_ptr = (struct bearer *)tb_ptr;
-
- spin_lock_bh(&b_ptr->publ.lock);
+ spin_lock_bh(&b_ptr->lock);
b_ptr->continue_count++;
if (!list_empty(&b_ptr->cong_links))
tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr);
- b_ptr->publ.blocked = 0;
- spin_unlock_bh(&b_ptr->publ.lock);
+ b_ptr->blocked = 0;
+ spin_unlock_bh(&b_ptr->lock);
}
/*
@@ -418,7 +415,7 @@ void tipc_continue(struct tipc_bearer *tb_ptr)
* bearer.lock is busy
*/
-static void tipc_bearer_schedule_unlocked(struct bearer *b_ptr, struct link *l_ptr)
+static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr, struct link *l_ptr)
{
list_move_tail(&l_ptr->link_list, &b_ptr->cong_links);
}
@@ -431,11 +428,11 @@ static void tipc_bearer_schedule_unlocked(struct bearer *b_ptr, struct link *l_p
* bearer.lock is free
*/
-void tipc_bearer_schedule(struct bearer *b_ptr, struct link *l_ptr)
+void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct link *l_ptr)
{
- spin_lock_bh(&b_ptr->publ.lock);
+ spin_lock_bh(&b_ptr->lock);
tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
- spin_unlock_bh(&b_ptr->publ.lock);
+ spin_unlock_bh(&b_ptr->lock);
}
@@ -444,18 +441,18 @@ void tipc_bearer_schedule(struct bearer *b_ptr, struct link *l_ptr)
* and if there is, try to resolve it before returning.
* 'tipc_net_lock' is read_locked when this function is called
*/
-int tipc_bearer_resolve_congestion(struct bearer *b_ptr, struct link *l_ptr)
+int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, struct link *l_ptr)
{
int res = 1;
if (list_empty(&b_ptr->cong_links))
return 1;
- spin_lock_bh(&b_ptr->publ.lock);
+ spin_lock_bh(&b_ptr->lock);
if (!bearer_push(b_ptr)) {
tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
res = 0;
}
- spin_unlock_bh(&b_ptr->publ.lock);
+ spin_unlock_bh(&b_ptr->lock);
return res;
}
@@ -463,9 +460,9 @@ int tipc_bearer_resolve_congestion(struct bearer *b_ptr, struct link *l_ptr)
* tipc_bearer_congested - determines if bearer is currently congested
*/
-int tipc_bearer_congested(struct bearer *b_ptr, struct link *l_ptr)
+int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct link *l_ptr)
{
- if (unlikely(b_ptr->publ.blocked))
+ if (unlikely(b_ptr->blocked))
return 1;
if (likely(list_empty(&b_ptr->cong_links)))
return 0;
@@ -476,9 +473,9 @@ int tipc_bearer_congested(struct bearer *b_ptr, struct link *l_ptr)
* tipc_enable_bearer - enable bearer with the given name
*/
-int tipc_enable_bearer(const char *name, u32 bcast_scope, u32 priority)
+int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
{
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
struct media *m_ptr;
struct bearer_name b_name;
char addr_string[16];
@@ -496,9 +493,9 @@ int tipc_enable_bearer(const char *name, u32 bcast_scope, u32 priority)
warn("Bearer <%s> rejected, illegal name\n", name);
return -EINVAL;
}
- if (!tipc_addr_domain_valid(bcast_scope) ||
- !tipc_in_scope(bcast_scope, tipc_own_addr)) {
- warn("Bearer <%s> rejected, illegal broadcast scope\n", name);
+ if (!tipc_addr_domain_valid(disc_domain) ||
+ !tipc_in_scope(disc_domain, tipc_own_addr)) {
+ warn("Bearer <%s> rejected, illegal discovery domain\n", name);
return -EINVAL;
}
if ((priority < TIPC_MIN_LINK_PRI ||
@@ -528,7 +525,7 @@ restart:
bearer_id = i;
continue;
}
- if (!strcmp(name, tipc_bearers[i].publ.name)) {
+ if (!strcmp(name, tipc_bearers[i].name)) {
warn("Bearer <%s> rejected, already enabled\n", name);
goto failed;
}
@@ -551,8 +548,8 @@ restart:
}
b_ptr = &tipc_bearers[bearer_id];
- strcpy(b_ptr->publ.name, name);
- res = m_ptr->enable_bearer(&b_ptr->publ);
+ strcpy(b_ptr->name, name);
+ res = m_ptr->enable_bearer(b_ptr);
if (res) {
warn("Bearer <%s> rejected, enable failure (%d)\n", name, -res);
goto failed;
@@ -562,18 +559,15 @@ restart:
b_ptr->media = m_ptr;
b_ptr->net_plane = bearer_id + 'A';
b_ptr->active = 1;
- b_ptr->detect_scope = bcast_scope;
b_ptr->priority = priority;
INIT_LIST_HEAD(&b_ptr->cong_links);
INIT_LIST_HEAD(&b_ptr->links);
- if (m_ptr->bcast) {
- b_ptr->link_req = tipc_disc_init_link_req(b_ptr, &m_ptr->bcast_addr,
- bcast_scope, 2);
- }
- spin_lock_init(&b_ptr->publ.lock);
+ b_ptr->link_req = tipc_disc_init_link_req(b_ptr, &m_ptr->bcast_addr,
+ disc_domain);
+ spin_lock_init(&b_ptr->lock);
write_unlock_bh(&tipc_net_lock);
info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
- name, tipc_addr_string_fill(addr_string, bcast_scope), priority);
+ name, tipc_addr_string_fill(addr_string, disc_domain), priority);
return 0;
failed:
write_unlock_bh(&tipc_net_lock);
@@ -587,7 +581,7 @@ failed:
int tipc_block_bearer(const char *name)
{
- struct bearer *b_ptr = NULL;
+ struct tipc_bearer *b_ptr = NULL;
struct link *l_ptr;
struct link *temp_l_ptr;
@@ -600,8 +594,8 @@ int tipc_block_bearer(const char *name)
}
info("Blocking bearer <%s>\n", name);
- spin_lock_bh(&b_ptr->publ.lock);
- b_ptr->publ.blocked = 1;
+ spin_lock_bh(&b_ptr->lock);
+ b_ptr->blocked = 1;
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
struct tipc_node *n_ptr = l_ptr->owner;
@@ -609,7 +603,7 @@ int tipc_block_bearer(const char *name)
tipc_link_reset(l_ptr);
spin_unlock_bh(&n_ptr->lock);
}
- spin_unlock_bh(&b_ptr->publ.lock);
+ spin_unlock_bh(&b_ptr->lock);
read_unlock_bh(&tipc_net_lock);
return 0;
}
@@ -620,27 +614,27 @@ int tipc_block_bearer(const char *name)
* Note: This routine assumes caller holds tipc_net_lock.
*/
-static void bearer_disable(struct bearer *b_ptr)
+static void bearer_disable(struct tipc_bearer *b_ptr)
{
struct link *l_ptr;
struct link *temp_l_ptr;
- info("Disabling bearer <%s>\n", b_ptr->publ.name);
+ info("Disabling bearer <%s>\n", b_ptr->name);
tipc_disc_stop_link_req(b_ptr->link_req);
- spin_lock_bh(&b_ptr->publ.lock);
+ spin_lock_bh(&b_ptr->lock);
b_ptr->link_req = NULL;
- b_ptr->publ.blocked = 1;
- b_ptr->media->disable_bearer(&b_ptr->publ);
+ b_ptr->blocked = 1;
+ b_ptr->media->disable_bearer(b_ptr);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
tipc_link_delete(l_ptr);
}
- spin_unlock_bh(&b_ptr->publ.lock);
- memset(b_ptr, 0, sizeof(struct bearer));
+ spin_unlock_bh(&b_ptr->lock);
+ memset(b_ptr, 0, sizeof(struct tipc_bearer));
}
int tipc_disable_bearer(const char *name)
{
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
int res;
write_lock_bh(&tipc_net_lock);
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 85f451d5aac..31d6172b20f 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -2,7 +2,7 @@
* net/tipc/bearer.h: Include file for TIPC bearer code
*
* Copyright (c) 1996-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -61,26 +61,7 @@ struct tipc_media_addr {
} dev_addr;
};
-/**
- * struct tipc_bearer - TIPC bearer info available to media code
- * @usr_handle: pointer to additional media-specific information about bearer
- * @mtu: max packet size bearer can support
- * @blocked: non-zero if bearer is blocked
- * @lock: spinlock for controlling access to bearer
- * @addr: media-specific address associated with bearer
- * @name: bearer name (format = media:interface)
- *
- * Note: TIPC initializes "name" and "lock" fields; media code is responsible
- * for initialization all other fields when a bearer is enabled.
- */
-struct tipc_bearer {
- void *usr_handle;
- u32 mtu;
- int blocked;
- spinlock_t lock;
- struct tipc_media_addr addr;
- char name[TIPC_MAX_BEARER_NAME];
-};
+struct tipc_bearer;
/**
* struct media - TIPC media information available to internal users
@@ -89,7 +70,6 @@ struct tipc_bearer {
* @disable_bearer: routine which disables a bearer
* @addr2str: routine which converts bearer's address to string form
* @bcast_addr: media address used in broadcasting
- * @bcast: non-zero if media supports broadcasting [currently mandatory]
* @priority: default link (and bearer) priority
* @tolerance: default time (in ms) before declaring link failure
* @window: default window (in packets) before declaring link congestion
@@ -106,7 +86,6 @@ struct media {
char *(*addr2str)(struct tipc_media_addr *a,
char *str_buf, int str_size);
struct tipc_media_addr bcast_addr;
- int bcast;
u32 priority;
u32 tolerance;
u32 window;
@@ -115,11 +94,15 @@ struct media {
};
/**
- * struct bearer - TIPC bearer information available to internal users
- * @publ: bearer information available to privileged users
+ * struct tipc_bearer - TIPC bearer structure
+ * @usr_handle: pointer to additional media-specific information about bearer
+ * @mtu: max packet size bearer can support
+ * @blocked: non-zero if bearer is blocked
+ * @lock: spinlock for controlling access to bearer
+ * @addr: media-specific address associated with bearer
+ * @name: bearer name (format = media:interface)
* @media: ptr to media structure associated with bearer
* @priority: default link priority for bearer
- * @detect_scope: network address mask used during automatic link creation
* @identity: array index of this bearer within TIPC bearer array
* @link_req: ptr to (optional) structure making periodic link setup requests
* @links: list of non-congested links associated with bearer
@@ -128,13 +111,20 @@ struct media {
* @active: non-zero if bearer structure is represents a bearer
* @net_plane: network plane ('A' through 'H') currently associated with bearer
* @nodes: indicates which nodes in cluster can be reached through bearer
+ *
+ * Note: media-specific code is responsible for initialization of the fields
+ * indicated below when a bearer is enabled; TIPC's generic bearer code takes
+ * care of initializing all other fields.
*/
-
-struct bearer {
- struct tipc_bearer publ;
+struct tipc_bearer {
+ void *usr_handle; /* initalized by media */
+ u32 mtu; /* initalized by media */
+ int blocked; /* initalized by media */
+ struct tipc_media_addr addr; /* initalized by media */
+ char name[TIPC_MAX_BEARER_NAME];
+ spinlock_t lock;
struct media *media;
u32 priority;
- u32 detect_scope;
u32 identity;
struct link_req *link_req;
struct list_head links;
@@ -152,7 +142,7 @@ struct bearer_name {
struct link;
-extern struct bearer tipc_bearers[];
+extern struct tipc_bearer tipc_bearers[];
/*
* TIPC routines available to supported media types
@@ -173,7 +163,7 @@ void tipc_recv_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr);
int tipc_block_bearer(const char *name);
void tipc_continue(struct tipc_bearer *tb_ptr);
-int tipc_enable_bearer(const char *bearer_name, u32 bcast_scope, u32 priority);
+int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
int tipc_disable_bearer(const char *name);
/*
@@ -186,14 +176,14 @@ void tipc_media_addr_printf(struct print_buf *pb, struct tipc_media_addr *a);
struct sk_buff *tipc_media_get_names(void);
struct sk_buff *tipc_bearer_get_names(void);
-void tipc_bearer_add_dest(struct bearer *b_ptr, u32 dest);
-void tipc_bearer_remove_dest(struct bearer *b_ptr, u32 dest);
-void tipc_bearer_schedule(struct bearer *b_ptr, struct link *l_ptr);
-struct bearer *tipc_bearer_find_interface(const char *if_name);
-int tipc_bearer_resolve_congestion(struct bearer *b_ptr, struct link *l_ptr);
-int tipc_bearer_congested(struct bearer *b_ptr, struct link *l_ptr);
+void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
+void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
+void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct link *l_ptr);
+struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
+int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, struct link *l_ptr);
+int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct link *l_ptr);
void tipc_bearer_stop(void);
-void tipc_bearer_lock_push(struct bearer *b_ptr);
+void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
/**
@@ -214,10 +204,11 @@ void tipc_bearer_lock_push(struct bearer *b_ptr);
* and let TIPC's link code deal with the undelivered message.
*/
-static inline int tipc_bearer_send(struct bearer *b_ptr, struct sk_buff *buf,
+static inline int tipc_bearer_send(struct tipc_bearer *b_ptr,
+ struct sk_buff *buf,
struct tipc_media_addr *dest)
{
- return !b_ptr->media->send_msg(buf, &b_ptr->publ, dest);
+ return !b_ptr->media->send_msg(buf, b_ptr, dest);
}
#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/config.c b/net/tipc/config.c
index e16750dcf3c..b25a396b7e1 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -2,7 +2,7 @@
* net/tipc/config.c: TIPC configuration management code
*
* Copyright (c) 2002-2006, Ericsson AB
- * Copyright (c) 2004-2007, Wind River Systems
+ * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -148,7 +148,7 @@ static struct sk_buff *cfg_enable_bearer(void)
args = (struct tipc_bearer_config *)TLV_DATA(req_tlv_area);
if (tipc_enable_bearer(args->name,
- ntohl(args->detect_scope),
+ ntohl(args->disc_domain),
ntohl(args->priority)))
return tipc_cfg_reply_error_string("unable to enable bearer");
@@ -260,25 +260,6 @@ static struct sk_buff *cfg_set_max_ports(void)
return tipc_cfg_reply_none();
}
-static struct sk_buff *cfg_set_max_nodes(void)
-{
- u32 value;
-
- if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED))
- return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
- value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
- if (value == tipc_max_nodes)
- return tipc_cfg_reply_none();
- if (value != delimit(value, 8, 2047))
- return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
- " (max nodes must be 8-2047)");
- if (tipc_mode == TIPC_NET_MODE)
- return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
- " (cannot change max nodes once TIPC has joined a network)");
- tipc_max_nodes = value;
- return tipc_cfg_reply_none();
-}
-
static struct sk_buff *cfg_set_netid(void)
{
u32 value;
@@ -397,9 +378,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
case TIPC_CMD_SET_MAX_SUBSCR:
rep_tlv_buf = cfg_set_max_subscriptions();
break;
- case TIPC_CMD_SET_MAX_NODES:
- rep_tlv_buf = cfg_set_max_nodes();
- break;
case TIPC_CMD_SET_NETID:
rep_tlv_buf = cfg_set_netid();
break;
@@ -415,9 +393,6 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
case TIPC_CMD_GET_MAX_SUBSCR:
rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_subscriptions);
break;
- case TIPC_CMD_GET_MAX_NODES:
- rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_nodes);
- break;
case TIPC_CMD_GET_NETID:
rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_net_id);
break;
@@ -431,6 +406,8 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area
case TIPC_CMD_GET_MAX_SLAVES:
case TIPC_CMD_SET_MAX_CLUSTERS:
case TIPC_CMD_GET_MAX_CLUSTERS:
+ case TIPC_CMD_SET_MAX_NODES:
+ case TIPC_CMD_GET_MAX_NODES:
rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
" (obsolete command)");
break;
diff --git a/net/tipc/core.c b/net/tipc/core.c
index e071579e085..c9a73e7763f 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -2,7 +2,7 @@
* net/tipc/core.c: TIPC module code
*
* Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005-2006, Wind River Systems
+ * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,10 +41,6 @@
#include "config.h"
-#ifndef CONFIG_TIPC_NODES
-#define CONFIG_TIPC_NODES 255
-#endif
-
#ifndef CONFIG_TIPC_PORTS
#define CONFIG_TIPC_PORTS 8191
#endif
@@ -57,7 +53,6 @@
int tipc_mode = TIPC_NOT_RUNNING;
int tipc_random;
-atomic_t tipc_user_count = ATOMIC_INIT(0);
const char tipc_alphabet[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_.";
@@ -65,7 +60,6 @@ const char tipc_alphabet[] =
/* configurable TIPC parameters */
u32 tipc_own_addr;
-int tipc_max_nodes;
int tipc_max_ports;
int tipc_max_subscriptions;
int tipc_max_publications;
@@ -193,7 +187,6 @@ static int __init tipc_init(void)
tipc_max_publications = 10000;
tipc_max_subscriptions = 2000;
tipc_max_ports = CONFIG_TIPC_PORTS;
- tipc_max_nodes = CONFIG_TIPC_NODES;
tipc_net_id = 4711;
res = tipc_core_start();
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 997158546e2..436dda1159d 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -2,7 +2,7 @@
* net/tipc/core.h: Include file for TIPC global declarations
*
* Copyright (c) 2005-2006, Ericsson AB
- * Copyright (c) 2005-2007, Wind River Systems
+ * Copyright (c) 2005-2007, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -147,7 +147,6 @@ void tipc_msg_dbg(struct print_buf *, struct tipc_msg *, const char *);
*/
extern u32 tipc_own_addr;
-extern int tipc_max_nodes;
extern int tipc_max_ports;
extern int tipc_max_subscriptions;
extern int tipc_max_publications;
@@ -161,7 +160,6 @@ extern int tipc_remote_management;
extern int tipc_mode;
extern int tipc_random;
extern const char tipc_alphabet[];
-extern atomic_t tipc_user_count;
/*
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index fa026bd91a6..491eff56b9d 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -2,7 +2,7 @@
* net/tipc/discover.c
*
* Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005-2006, Wind River Systems
+ * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -57,7 +57,7 @@
* @timer_intv: current interval between requests (in ms)
*/
struct link_req {
- struct bearer *bearer;
+ struct tipc_bearer *bearer;
struct tipc_media_addr dest;
struct sk_buff *buf;
struct timer_list timer;
@@ -67,27 +67,24 @@ struct link_req {
/**
* tipc_disc_init_msg - initialize a link setup message
* @type: message type (request or response)
- * @req_links: number of links associated with message
* @dest_domain: network domain of node(s) which should respond to message
* @b_ptr: ptr to bearer issuing message
*/
static struct sk_buff *tipc_disc_init_msg(u32 type,
- u32 req_links,
u32 dest_domain,
- struct bearer *b_ptr)
+ struct tipc_bearer *b_ptr)
{
- struct sk_buff *buf = tipc_buf_acquire(DSC_H_SIZE);
+ struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE);
struct tipc_msg *msg;
if (buf) {
msg = buf_msg(buf);
- tipc_msg_init(msg, LINK_CONFIG, type, DSC_H_SIZE, dest_domain);
+ tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain);
msg_set_non_seq(msg, 1);
- msg_set_req_links(msg, req_links);
msg_set_dest_domain(msg, dest_domain);
msg_set_bc_netid(msg, tipc_net_id);
- msg_set_media_addr(msg, &b_ptr->publ.addr);
+ msg_set_media_addr(msg, &b_ptr->addr);
}
return buf;
}
@@ -99,7 +96,7 @@ static struct sk_buff *tipc_disc_init_msg(u32 type,
* @media_addr: media address advertised by duplicated node
*/
-static void disc_dupl_alert(struct bearer *b_ptr, u32 node_addr,
+static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,
struct tipc_media_addr *media_addr)
{
char node_addr_str[16];
@@ -111,7 +108,7 @@ static void disc_dupl_alert(struct bearer *b_ptr, u32 node_addr,
tipc_media_addr_printf(&pb, media_addr);
tipc_printbuf_validate(&pb);
warn("Duplicate %s using %s seen on <%s>\n",
- node_addr_str, media_addr_str, b_ptr->publ.name);
+ node_addr_str, media_addr_str, b_ptr->name);
}
/**
@@ -120,19 +117,23 @@ static void disc_dupl_alert(struct bearer *b_ptr, u32 node_addr,
* @b_ptr: bearer that message arrived on
*/
-void tipc_disc_recv_msg(struct sk_buff *buf, struct bearer *b_ptr)
+void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
{
+ struct tipc_node *n_ptr;
struct link *link;
- struct tipc_media_addr media_addr;
+ struct tipc_media_addr media_addr, *addr;
+ struct sk_buff *rbuf;
struct tipc_msg *msg = buf_msg(buf);
u32 dest = msg_dest_domain(msg);
u32 orig = msg_prevnode(msg);
u32 net_id = msg_bc_netid(msg);
u32 type = msg_type(msg);
+ int link_fully_up;
msg_get_media_addr(msg, &media_addr);
buf_discard(buf);
+ /* Validate discovery message from requesting node */
if (net_id != tipc_net_id)
return;
if (!tipc_addr_domain_valid(dest))
@@ -140,63 +141,76 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct bearer *b_ptr)
if (!tipc_addr_node_valid(orig))
return;
if (orig == tipc_own_addr) {
- if (memcmp(&media_addr, &b_ptr->publ.addr, sizeof(media_addr)))
+ if (memcmp(&media_addr, &b_ptr->addr, sizeof(media_addr)))
disc_dupl_alert(b_ptr, tipc_own_addr, &media_addr);
return;
}
if (!tipc_in_scope(dest, tipc_own_addr))
return;
- if (in_own_cluster(orig)) {
- /* Always accept link here */
- struct sk_buff *rbuf;
- struct tipc_media_addr *addr;
- struct tipc_node *n_ptr = tipc_node_find(orig);
- int link_fully_up;
-
- if (n_ptr == NULL) {
- n_ptr = tipc_node_create(orig);
- if (!n_ptr)
- return;
- }
- spin_lock_bh(&n_ptr->lock);
-
- /* Don't talk to neighbor during cleanup after last session */
+ if (!in_own_cluster(orig))
+ return;
- if (n_ptr->cleanup_required) {
- spin_unlock_bh(&n_ptr->lock);
+ /* Locate structure corresponding to requesting node */
+ n_ptr = tipc_node_find(orig);
+ if (!n_ptr) {
+ n_ptr = tipc_node_create(orig);
+ if (!n_ptr)
return;
- }
+ }
+ tipc_node_lock(n_ptr);
+
+ /* Don't talk to neighbor during cleanup after last session */
+ if (n_ptr->cleanup_required) {
+ tipc_node_unlock(n_ptr);
+ return;
+ }
+
+ link = n_ptr->links[b_ptr->identity];
- link = n_ptr->links[b_ptr->identity];
+ /* Create a link endpoint for this bearer, if necessary */
+ if (!link) {
+ link = tipc_link_create(n_ptr, b_ptr, &media_addr);
if (!link) {
- link = tipc_link_create(b_ptr, orig, &media_addr);
- if (!link) {
- spin_unlock_bh(&n_ptr->lock);
- return;
- }
- }
- addr = &link->media_addr;
- if (memcmp(addr, &media_addr, sizeof(*addr))) {
- if (tipc_link_is_up(link) || (!link->started)) {
- disc_dupl_alert(b_ptr, orig, &media_addr);
- spin_unlock_bh(&n_ptr->lock);
- return;
- }
- warn("Resetting link <%s>, peer interface address changed\n",
- link->name);
- memcpy(addr, &media_addr, sizeof(*addr));
- tipc_link_reset(link);
+ tipc_node_unlock(n_ptr);
+ return;
}
- link_fully_up = link_working_working(link);
- spin_unlock_bh(&n_ptr->lock);
- if ((type == DSC_RESP_MSG) || link_fully_up)
+ }
+
+ /*
+ * Ensure requesting node's media address is correct
+ *
+ * If media address doesn't match and the link is working, reject the
+ * request (must be from a duplicate node).
+ *
+ * If media address doesn't match and the link is not working, accept
+ * the new media address and reset the link to ensure it starts up
+ * cleanly.
+ */
+ addr = &link->media_addr;
+ if (memcmp(addr, &media_addr, sizeof(*addr))) {
+ if (tipc_link_is_up(link) || (!link->started)) {
+ disc_dupl_alert(b_ptr, orig, &media_addr);
+ tipc_node_unlock(n_ptr);
return;
- rbuf = tipc_disc_init_msg(DSC_RESP_MSG, 1, orig, b_ptr);
- if (rbuf != NULL) {
- b_ptr->media->send_msg(rbuf, &b_ptr->publ, &media_addr);
+ }
+ warn("Resetting link <%s>, peer interface address changed\n",
+ link->name);
+ memcpy(addr, &media_addr, sizeof(*addr));
+ tipc_link_reset(link);
+ }
+
+ /* Accept discovery message & send response, if necessary */
+ link_fully_up = link_working_working(link);
+
+ if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
+ rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
+ if (rbuf) {
+ b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
buf_discard(rbuf);
}
}
+
+ tipc_node_unlock(n_ptr);
}
/**
@@ -249,9 +263,9 @@ void tipc_disc_update_link_req(struct link_req *req)
static void disc_timeout(struct link_req *req)
{
- spin_lock_bh(&req->bearer->publ.lock);
+ spin_lock_bh(&req->bearer->lock);
- req->bearer->media->send_msg(req->buf, &req->bearer->publ, &req->dest);
+ req->bearer->media->send_msg(req->buf, req->bearer, &req->dest);
if ((req->timer_intv == TIPC_LINK_REQ_SLOW) ||
(req->timer_intv == TIPC_LINK_REQ_FAST)) {
@@ -266,7 +280,7 @@ static void disc_timeout(struct link_req *req)
}
k_start_timer(&req->timer, req->timer_intv);
- spin_unlock_bh(&req->bearer->publ.lock);
+ spin_unlock_bh(&req->bearer->lock);
}
/**
@@ -274,15 +288,13 @@ static void disc_timeout(struct link_req *req)
* @b_ptr: ptr to bearer issuing requests
* @dest: destination address for request messages
* @dest_domain: network domain of node(s) which should respond to message
- * @req_links: max number of desired links
*
* Returns pointer to link request structure, or NULL if unable to create.
*/
-struct link_req *tipc_disc_init_link_req(struct bearer *b_ptr,
+struct link_req *tipc_disc_init_link_req(struct tipc_bearer *b_ptr,
const struct tipc_media_addr *dest,
- u32 dest_domain,
- u32 req_links)
+ u32 dest_domain)
{
struct link_req *req;
@@ -290,7 +302,7 @@ struct link_req *tipc_disc_init_link_req(struct bearer *b_ptr,
if (!req)
return NULL;
- req->buf = tipc_disc_init_msg(DSC_REQ_MSG, req_links, dest_domain, b_ptr);
+ req->buf = tipc_disc_init_msg(DSC_REQ_MSG, dest_domain, b_ptr);
if (!req->buf) {
kfree(req);
return NULL;
diff --git a/net/tipc/discover.h b/net/tipc/discover.h
index d2c3cffb79f..e48a167e47b 100644
--- a/net/tipc/discover.h
+++ b/net/tipc/discover.h
@@ -2,7 +2,7 @@
* net/tipc/discover.h
*
* Copyright (c) 2003-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -39,13 +39,12 @@
struct link_req;
-struct link_req *tipc_disc_init_link_req(struct bearer *b_ptr,
+struct link_req *tipc_disc_init_link_req(struct tipc_bearer *b_ptr,
const struct tipc_media_addr *dest,
- u32 dest_domain,
- u32 req_links);
+ u32 dest_domain);
void tipc_disc_update_link_req(struct link_req *req);
void tipc_disc_stop_link_req(struct link_req *req);
-void tipc_disc_recv_msg(struct sk_buff *buf, struct bearer *b_ptr);
+void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr);
#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 18702f58d11..43639ff1cbe 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -2,7 +2,7 @@
* net/tipc/link.c: TIPC link code
*
* Copyright (c) 1996-2007, Ericsson AB
- * Copyright (c) 2004-2007, Wind River Systems
+ * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -90,7 +90,7 @@ static void link_handle_out_of_seq_msg(struct link *l_ptr,
static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
-static int link_send_sections_long(struct port *sender,
+static int link_send_sections_long(struct tipc_port *sender,
struct iovec const *msg_sect,
u32 num_sect, u32 destnode);
static void link_check_defragm_bufs(struct link *l_ptr);
@@ -113,7 +113,7 @@ static void link_init_max_pkt(struct link *l_ptr)
{
u32 max_pkt;
- max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
+ max_pkt = (l_ptr->b_ptr->mtu & ~3);
if (max_pkt > MAX_MSG_SIZE)
max_pkt = MAX_MSG_SIZE;
@@ -246,9 +246,6 @@ static void link_timeout(struct link *l_ptr)
l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
l_ptr->stats.queue_sz_counts++;
- if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
- l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
-
if (l_ptr->first_out) {
struct tipc_msg *msg = buf_msg(l_ptr->first_out);
u32 length = msg_size(msg);
@@ -296,19 +293,35 @@ static void link_set_timer(struct link *l_ptr, u32 time)
/**
* tipc_link_create - create a new link
+ * @n_ptr: pointer to associated node
* @b_ptr: pointer to associated bearer
- * @peer: network address of node at other end of link
* @media_addr: media address to use when sending messages over link
*
* Returns pointer to link.
*/
-struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
+struct link *tipc_link_create(struct tipc_node *n_ptr,
+ struct tipc_bearer *b_ptr,
const struct tipc_media_addr *media_addr)
{
struct link *l_ptr;
struct tipc_msg *msg;
char *if_name;
+ char addr_string[16];
+ u32 peer = n_ptr->addr;
+
+ if (n_ptr->link_cnt >= 2) {
+ tipc_addr_string_fill(addr_string, n_ptr->addr);
+ err("Attempt to establish third link to %s\n", addr_string);
+ return NULL;
+ }
+
+ if (n_ptr->links[b_ptr->identity]) {
+ tipc_addr_string_fill(addr_string, n_ptr->addr);
+ err("Attempt to establish second link on <%s> to %s\n",
+ b_ptr->name, addr_string);
+ return NULL;
+ }
l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
if (!l_ptr) {
@@ -317,7 +330,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
}
l_ptr->addr = peer;
- if_name = strchr(b_ptr->publ.name, ':') + 1;
+ if_name = strchr(b_ptr->name, ':') + 1;
sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
tipc_node(tipc_own_addr),
@@ -325,6 +338,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
/* note: peer i/f is appended to link name by reset/activate */
memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
+ l_ptr->owner = n_ptr;
l_ptr->checkpoint = 1;
l_ptr->b_ptr = b_ptr;
link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
@@ -348,11 +362,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
link_reset_statistics(l_ptr);
- l_ptr->owner = tipc_node_attach_link(l_ptr);
- if (!l_ptr->owner) {
- kfree(l_ptr);
- return NULL;
- }
+ tipc_node_attach_link(n_ptr, l_ptr);
k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
list_add_tail(&l_ptr->link_list, &b_ptr->links);
@@ -391,7 +401,9 @@ void tipc_link_delete(struct link *l_ptr)
static void link_start(struct link *l_ptr)
{
+ tipc_node_lock(l_ptr->owner);
link_state_event(l_ptr, STARTING_EVT);
+ tipc_node_unlock(l_ptr->owner);
}
/**
@@ -406,7 +418,7 @@ static void link_start(struct link *l_ptr)
static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
spin_lock_bh(&tipc_port_list_lock);
p_ptr = tipc_port_lock(origport);
@@ -415,7 +427,7 @@ static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
goto exit;
if (!list_empty(&p_ptr->wait_list))
goto exit;
- p_ptr->publ.congested = 1;
+ p_ptr->congested = 1;
p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
l_ptr->stats.link_congs++;
@@ -428,8 +440,8 @@ exit:
void tipc_link_wakeup_ports(struct link *l_ptr, int all)
{
- struct port *p_ptr;
- struct port *temp_p_ptr;
+ struct tipc_port *p_ptr;
+ struct tipc_port *temp_p_ptr;
int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
if (all)
@@ -445,11 +457,11 @@ void tipc_link_wakeup_ports(struct link *l_ptr, int all)
if (win <= 0)
break;
list_del_init(&p_ptr->wait_list);
- spin_lock_bh(p_ptr->publ.lock);
- p_ptr->publ.congested = 0;
- p_ptr->wakeup(&p_ptr->publ);
+ spin_lock_bh(p_ptr->lock);
+ p_ptr->congested = 0;
+ p_ptr->wakeup(p_ptr);
win -= p_ptr->waiting_pkts;
- spin_unlock_bh(p_ptr->publ.lock);
+ spin_unlock_bh(p_ptr->lock);
}
exit:
@@ -549,7 +561,7 @@ void tipc_link_reset(struct link *l_ptr)
tipc_node_link_down(l_ptr->owner, l_ptr);
tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
- if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
+ if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
l_ptr->owner->permit_changeover) {
l_ptr->reset_checkpoint = checkpoint;
l_ptr->exp_msg_count = START_CHANGEOVER;
@@ -824,7 +836,10 @@ static void link_add_to_outqueue(struct link *l_ptr,
l_ptr->last_out = buf;
} else
l_ptr->first_out = l_ptr->last_out = buf;
+
l_ptr->out_queue_size++;
+ if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
+ l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
}
/*
@@ -867,9 +882,6 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
/* Packet can be queued or sent: */
- if (queue_size > l_ptr->stats.max_queue_sz)
- l_ptr->stats.max_queue_sz = queue_size;
-
if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
!link_congested(l_ptr))) {
link_add_to_outqueue(l_ptr, buf, msg);
@@ -1027,12 +1039,12 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
* except for total message length.
* Returns user data length or errno.
*/
-int tipc_link_send_sections_fast(struct port *sender,
+int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
const u32 num_sect,
u32 destaddr)
{
- struct tipc_msg *hdr = &sender->publ.phdr;
+ struct tipc_msg *hdr = &sender->phdr;
struct link *l_ptr;
struct sk_buff *buf;
struct tipc_node *node;
@@ -1045,7 +1057,7 @@ again:
* (Must not hold any locks while building message.)
*/
- res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
+ res = tipc_msg_build(hdr, msg_sect, num_sect, sender->max_pkt,
!sender->user_port, &buf);
read_lock_bh(&tipc_net_lock);
@@ -1056,7 +1068,7 @@ again:
if (likely(l_ptr)) {
if (likely(buf)) {
res = link_send_buf_fast(l_ptr, buf,
- &sender->publ.max_pkt);
+ &sender->max_pkt);
if (unlikely(res < 0))
buf_discard(buf);
exit:
@@ -1075,7 +1087,7 @@ exit:
if (link_congested(l_ptr) ||
!list_empty(&l_ptr->b_ptr->cong_links)) {
res = link_schedule_port(l_ptr,
- sender->publ.ref, res);
+ sender->ref, res);
goto exit;
}
@@ -1084,12 +1096,12 @@ exit:
* then re-try fast path or fragment the message
*/
- sender->publ.max_pkt = l_ptr->max_pkt;
+ sender->max_pkt = l_ptr->max_pkt;
tipc_node_unlock(node);
read_unlock_bh(&tipc_net_lock);
- if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
+ if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
goto again;
return link_send_sections_long(sender, msg_sect,
@@ -1123,14 +1135,14 @@ exit:
*
* Returns user data length or errno.
*/
-static int link_send_sections_long(struct port *sender,
+static int link_send_sections_long(struct tipc_port *sender,
struct iovec const *msg_sect,
u32 num_sect,
u32 destaddr)
{
struct link *l_ptr;
struct tipc_node *node;
- struct tipc_msg *hdr = &sender->publ.phdr;
+ struct tipc_msg *hdr = &sender->phdr;
u32 dsz = msg_data_sz(hdr);
u32 max_pkt, fragm_sz, rest;
struct tipc_msg fragm_hdr;
@@ -1142,7 +1154,7 @@ static int link_send_sections_long(struct port *sender,
again:
fragm_no = 1;
- max_pkt = sender->publ.max_pkt - INT_H_SIZE;
+ max_pkt = sender->max_pkt - INT_H_SIZE;
/* leave room for tunnel header in case of link changeover */
fragm_sz = max_pkt - INT_H_SIZE;
/* leave room for fragmentation header in each fragment */
@@ -1157,7 +1169,7 @@ again:
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
INT_H_SIZE, msg_destnode(hdr));
- msg_set_link_selector(&fragm_hdr, sender->publ.ref);
+ msg_set_link_selector(&fragm_hdr, sender->ref);
msg_set_size(&fragm_hdr, max_pkt);
msg_set_fragm_no(&fragm_hdr, 1);
@@ -1238,13 +1250,13 @@ error:
node = tipc_node_find(destaddr);
if (likely(node)) {
tipc_node_lock(node);
- l_ptr = node->active_links[sender->publ.ref & 1];
+ l_ptr = node->active_links[sender->ref & 1];
if (!l_ptr) {
tipc_node_unlock(node);
goto reject;
}
if (l_ptr->max_pkt < max_pkt) {
- sender->publ.max_pkt = l_ptr->max_pkt;
+ sender->max_pkt = l_ptr->max_pkt;
tipc_node_unlock(node);
for (; buf_chain; buf_chain = buf) {
buf = buf_chain->next;
@@ -1441,7 +1453,7 @@ static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
info("Outstanding acks: %lu\n",
(unsigned long) TIPC_SKB_CB(buf)->handle);
- n_ptr = l_ptr->owner->next;
+ n_ptr = tipc_bclink_retransmit_to();
tipc_node_lock(n_ptr);
tipc_addr_string_fill(addr_string, n_ptr->addr);
@@ -1595,11 +1607,10 @@ static int link_recv_buf_validate(struct sk_buff *buf)
* structure (i.e. cannot be NULL), but bearer can be inactive.
*/
-void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
+void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
{
read_lock_bh(&tipc_net_lock);
while (head) {
- struct bearer *b_ptr = (struct bearer *)tb_ptr;
struct tipc_node *n_ptr;
struct link *l_ptr;
struct sk_buff *crs;
@@ -1735,10 +1746,6 @@ deliver:
tipc_node_unlock(n_ptr);
tipc_link_recv_bundle(buf);
continue;
- case ROUTE_DISTRIBUTOR:
- tipc_node_unlock(n_ptr);
- buf_discard(buf);
- continue;
case NAME_DISTRIBUTOR:
tipc_node_unlock(n_ptr);
tipc_named_recv(buf);
@@ -1765,6 +1772,10 @@ deliver:
goto protocol_check;
}
break;
+ default:
+ buf_discard(buf);
+ buf = NULL;
+ break;
}
}
tipc_node_unlock(n_ptr);
@@ -1900,6 +1911,7 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
struct sk_buff *buf = NULL;
struct tipc_msg *msg = l_ptr->pmsg;
u32 msg_size = sizeof(l_ptr->proto_msg);
+ int r_flag;
if (link_blocked(l_ptr))
return;
@@ -1950,15 +1962,14 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
msg_set_seq_gap(msg, 0);
msg_set_next_sent(msg, 1);
+ msg_set_probe(msg, 0);
msg_set_link_tolerance(msg, l_ptr->tolerance);
msg_set_linkprio(msg, l_ptr->priority);
msg_set_max_pkt(msg, l_ptr->max_pkt_target);
}
- if (tipc_node_has_redundant_links(l_ptr->owner))
- msg_set_redundant_link(msg);
- else
- msg_clear_redundant_link(msg);
+ r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
+ msg_set_redundant_link(msg, r_flag);
msg_set_linkprio(msg, l_ptr->priority);
/* Ensure sequence number will not fit : */
@@ -1978,7 +1989,6 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
return;
}
- msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
/* Message can be sent */
@@ -2066,7 +2076,7 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
l_ptr->peer_bearer_id = msg_bearer_id(msg);
/* Synchronize broadcast sequence numbers */
- if (!tipc_node_has_redundant_links(l_ptr->owner))
+ if (!tipc_node_redundant_links(l_ptr->owner))
l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
break;
case STATE_MSG:
@@ -2413,9 +2423,6 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
else
destaddr = msg_destnode(inmsg);
- if (msg_routed(inmsg))
- msg_set_prevnode(inmsg, tipc_own_addr);
-
/* Prepare reusable fragment header: */
tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
@@ -2618,6 +2625,9 @@ static void link_check_defragm_bufs(struct link *l_ptr)
static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
{
+ if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
+ return;
+
l_ptr->tolerance = tolerance;
l_ptr->continuity_interval =
((tolerance / 4) > 500) ? 500 : tolerance / 4;
@@ -2658,7 +2668,7 @@ void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
static struct link *link_find_link(const char *name, struct tipc_node **node)
{
struct link_name link_name_parts;
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
struct link *l_ptr;
if (!link_name_validate(name, &link_name_parts))
@@ -2961,7 +2971,7 @@ static void link_print(struct link *l_ptr, const char *str)
tipc_printf(buf, str);
tipc_printf(buf, "Link %x<%s>:",
- l_ptr->addr, l_ptr->b_ptr->publ.name);
+ l_ptr->addr, l_ptr->b_ptr->name);
#ifdef CONFIG_TIPC_DEBUG
if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
@@ -2981,9 +2991,9 @@ static void link_print(struct link *l_ptr, const char *str)
!= (l_ptr->out_queue_size - 1)) ||
(l_ptr->last_out->next != NULL)) {
tipc_printf(buf, "\nSend queue inconsistency\n");
- tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
- tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
- tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
+ tipc_printf(buf, "first_out= %p ", l_ptr->first_out);
+ tipc_printf(buf, "next_out= %p ", l_ptr->next_out);
+ tipc_printf(buf, "last_out= %p ", l_ptr->last_out);
}
} else
tipc_printf(buf, "[]");
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 70967e63702..e6a30dbe1aa 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -2,7 +2,7 @@
* net/tipc/link.h: Include file for TIPC link code
*
* Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2004-2005, Wind River Systems
+ * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -122,7 +122,7 @@ struct link {
u32 checkpoint;
u32 peer_session;
u32 peer_bearer_id;
- struct bearer *b_ptr;
+ struct tipc_bearer *b_ptr;
u32 tolerance;
u32 continuity_interval;
u32 abort_limit;
@@ -196,24 +196,19 @@ struct link {
u32 bearer_congs;
u32 deferred_recv;
u32 duplicates;
-
- /* for statistical profiling of send queue size */
-
- u32 max_queue_sz;
- u32 accu_queue_sz;
- u32 queue_sz_counts;
-
- /* for statistical profiling of message lengths */
-
- u32 msg_length_counts;
- u32 msg_lengths_total;
- u32 msg_length_profile[7];
+ u32 max_queue_sz; /* send queue size high water mark */
+ u32 accu_queue_sz; /* used for send queue size profiling */
+ u32 queue_sz_counts; /* used for send queue size profiling */
+ u32 msg_length_counts; /* used for message length profiling */
+ u32 msg_lengths_total; /* used for message length profiling */
+ u32 msg_length_profile[7]; /* used for msg. length profiling */
} stats;
};
-struct port;
+struct tipc_port;
-struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
+struct link *tipc_link_create(struct tipc_node *n_ptr,
+ struct tipc_bearer *b_ptr,
const struct tipc_media_addr *media_addr);
void tipc_link_delete(struct link *l_ptr);
void tipc_link_changeover(struct link *l_ptr);
@@ -230,7 +225,7 @@ void tipc_link_reset(struct link *l_ptr);
int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector);
int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf);
u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
-int tipc_link_send_sections_fast(struct port *sender,
+int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect,
const u32 num_sect,
u32 destnode);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index bb6180c4fcb..6d92d17e7fb 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -2,7 +2,7 @@
* net/tipc/msg.c: TIPC message header routines
*
* Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -192,8 +192,6 @@ void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)
default:
tipc_printf(buf, "UNKNOWN TYPE %u", msg_type(msg));
}
- if (msg_routed(msg) && !msg_non_seq(msg))
- tipc_printf(buf, "ROUT:");
if (msg_reroute_cnt(msg))
tipc_printf(buf, "REROUTED(%u):",
msg_reroute_cnt(msg));
@@ -210,8 +208,6 @@ void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)
default:
tipc_printf(buf, "UNKNOWN:%x", msg_type(msg));
}
- if (msg_routed(msg))
- tipc_printf(buf, "ROUT:");
if (msg_reroute_cnt(msg))
tipc_printf(buf, "REROUTED(%u):",
msg_reroute_cnt(msg));
@@ -232,13 +228,10 @@ void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)
default:
tipc_printf(buf, "UNKNOWN TYPE:%x", msg_type(msg));
}
- if (msg_routed(msg))
- tipc_printf(buf, "ROUT:");
if (msg_reroute_cnt(msg))
tipc_printf(buf, "REROUTED(%u):", msg_reroute_cnt(msg));
break;
case LINK_PROTOCOL:
- tipc_printf(buf, "PROT:TIM(%u):", msg_timestamp(msg));
switch (msg_type(msg)) {
case STATE_MSG:
tipc_printf(buf, "STATE:");
@@ -275,33 +268,6 @@ void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)
tipc_printf(buf, "UNKNOWN TYPE:%x", msg_type(msg));
}
break;
- case ROUTE_DISTRIBUTOR:
- tipc_printf(buf, "ROUTING_MNG:");
- switch (msg_type(msg)) {
- case EXT_ROUTING_TABLE:
- tipc_printf(buf, "EXT_TBL:");
- tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
- break;
- case LOCAL_ROUTING_TABLE:
- tipc_printf(buf, "LOCAL_TBL:");
- tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
- break;
- case SLAVE_ROUTING_TABLE:
- tipc_printf(buf, "DP_TBL:");
- tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
- break;
- case ROUTE_ADDITION:
- tipc_printf(buf, "ADD:");
- tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
- break;
- case ROUTE_REMOVAL:
- tipc_printf(buf, "REMOVE:");
- tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
- break;
- default:
- tipc_printf(buf, "UNKNOWN TYPE:%x", msg_type(msg));
- }
- break;
case LINK_CONFIG:
tipc_printf(buf, "CFG:");
switch (msg_type(msg)) {
@@ -381,20 +347,15 @@ void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)
tipc_printf(buf, ":OPRT(%u):", msg_origport(msg));
tipc_printf(buf, ":DPRT(%u):", msg_destport(msg));
}
- if (msg_routed(msg) && !msg_non_seq(msg))
- tipc_printf(buf, ":TSEQN(%u)", msg_transp_seqno(msg));
}
if (msg_user(msg) == NAME_DISTRIBUTOR) {
tipc_printf(buf, ":ONOD(%x):", msg_orignode(msg));
tipc_printf(buf, ":DNOD(%x):", msg_destnode(msg));
- if (msg_routed(msg))
- tipc_printf(buf, ":CSEQN(%u)", msg_transp_seqno(msg));
}
if (msg_user(msg) == LINK_CONFIG) {
u32 *raw = (u32 *)msg;
struct tipc_media_addr *orig = (struct tipc_media_addr *)&raw[5];
- tipc_printf(buf, ":REQL(%u):", msg_req_links(msg));
tipc_printf(buf, ":DDOM(%x):", msg_dest_domain(msg));
tipc_printf(buf, ":NETID(%u):", msg_bc_netid(msg));
tipc_media_addr_printf(buf, orig);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 92c4c4fd7b3..de02339fc17 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -2,7 +2,7 @@
* net/tipc/msg.h: Include file for TIPC message header routines
*
* Copyright (c) 2000-2007, Ericsson AB
- * Copyright (c) 2005-2008, Wind River Systems
+ * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -421,13 +421,6 @@ static inline int msg_is_dest(struct tipc_msg *m, u32 d)
return msg_short(m) || (msg_destnode(m) == d);
}
-static inline u32 msg_routed(struct tipc_msg *m)
-{
- if (likely(msg_short(m)))
- return 0;
- return (msg_destnode(m) ^ msg_orignode(m)) >> 11;
-}
-
static inline u32 msg_nametype(struct tipc_msg *m)
{
return msg_word(m, 8);
@@ -438,26 +431,6 @@ static inline void msg_set_nametype(struct tipc_msg *m, u32 n)
msg_set_word(m, 8, n);
}
-static inline u32 msg_transp_seqno(struct tipc_msg *m)
-{
- return msg_word(m, 8);
-}
-
-static inline void msg_set_timestamp(struct tipc_msg *m, u32 n)
-{
- msg_set_word(m, 8, n);
-}
-
-static inline u32 msg_timestamp(struct tipc_msg *m)
-{
- return msg_word(m, 8);
-}
-
-static inline void msg_set_transp_seqno(struct tipc_msg *m, u32 n)
-{
- msg_set_word(m, 8, n);
-}
-
static inline u32 msg_nameinst(struct tipc_msg *m)
{
return msg_word(m, 9);
@@ -545,7 +518,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
#define NAME_DISTRIBUTOR 11
#define MSG_FRAGMENTER 12
#define LINK_CONFIG 13
-#define DSC_H_SIZE 40
/*
* Connection management protocol messages
@@ -577,16 +549,6 @@ static inline void msg_set_seq_gap(struct tipc_msg *m, u32 n)
msg_set_bits(m, 1, 16, 0x1fff, n);
}
-static inline u32 msg_req_links(struct tipc_msg *m)
-{
- return msg_bits(m, 1, 16, 0xfff);
-}
-
-static inline void msg_set_req_links(struct tipc_msg *m, u32 n)
-{
- msg_set_bits(m, 1, 16, 0xfff, n);
-}
-
/*
* Word 2
@@ -749,14 +711,9 @@ static inline u32 msg_redundant_link(struct tipc_msg *m)
return msg_bits(m, 5, 12, 0x1);
}
-static inline void msg_set_redundant_link(struct tipc_msg *m)
+static inline void msg_set_redundant_link(struct tipc_msg *m, u32 r)
{
- msg_set_bits(m, 5, 12, 0x1, 1);
-}
-
-static inline void msg_clear_redundant_link(struct tipc_msg *m)
-{
- msg_set_bits(m, 5, 12, 0x1, 0);
+ msg_set_bits(m, 5, 12, 0x1, r);
}
@@ -805,21 +762,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
}
/*
- * Routing table message data
- */
-
-
-static inline u32 msg_remote_node(struct tipc_msg *m)
-{
- return msg_word(m, msg_hdr_sz(m)/4);
-}
-
-static inline void msg_set_remote_node(struct tipc_msg *m, u32 a)
-{
- msg_set_word(m, msg_hdr_sz(m)/4, a);
-}
-
-/*
* Segmentation message types
*/
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 483c226c958..c9fa6dfcf28 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -2,7 +2,7 @@
* net/tipc/name_distr.c: TIPC name distribution code
*
* Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -109,11 +109,9 @@ static void named_cluster_distribute(struct sk_buff *buf)
{
struct sk_buff *buf_copy;
struct tipc_node *n_ptr;
- u32 n_num;
- for (n_num = 1; n_num <= tipc_net.highest_node; n_num++) {
- n_ptr = tipc_net.nodes[n_num];
- if (n_ptr && tipc_node_has_active_links(n_ptr)) {
+ list_for_each_entry(n_ptr, &tipc_node_list, list) {
+ if (tipc_node_active_links(n_ptr)) {
buf_copy = skb_copy(buf, GFP_ATOMIC);
if (!buf_copy)
break;
@@ -214,17 +212,16 @@ exit:
}
/**
- * node_is_down - remove publication associated with a failed node
+ * named_purge_publ - remove publication associated with a failed node
*
* Invoked for each publication issued by a newly failed node.
* Removes publication structure from name table & deletes it.
* In rare cases the link may have come back up again when this
* function is called, and we have two items representing the same
* publication. Nudge this item's key to distinguish it from the other.
- * (Note: Publication's node subscription is already unsubscribed.)
*/
-static void node_is_down(struct publication *publ)
+static void named_purge_publ(struct publication *publ)
{
struct publication *p;
@@ -232,6 +229,8 @@ static void node_is_down(struct publication *publ)
publ->key += 1222345;
p = tipc_nametbl_remove_publ(publ->type, publ->lower,
publ->node, publ->ref, publ->key);
+ if (p)
+ tipc_nodesub_unsubscribe(&p->subscr);
write_unlock_bh(&tipc_nametbl_lock);
if (p != publ) {
@@ -268,7 +267,8 @@ void tipc_named_recv(struct sk_buff *buf)
tipc_nodesub_subscribe(&publ->subscr,
msg_orignode(msg),
publ,
- (net_ev_handler)node_is_down);
+ (net_ev_handler)
+ named_purge_publ);
}
} else if (msg_type(msg) == WITHDRAWAL) {
publ = tipc_nametbl_remove_publ(ntohl(item->type),
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 9bacfd00b91..68b3dd63729 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -2,7 +2,7 @@
* net/tipc/net.c: TIPC network routing code
*
* Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -39,6 +39,7 @@
#include "name_distr.h"
#include "subscr.h"
#include "port.h"
+#include "node.h"
#include "config.h"
/*
@@ -108,26 +109,6 @@
*/
DEFINE_RWLOCK(tipc_net_lock);
-struct network tipc_net;
-
-static int net_start(void)
-{
- tipc_net.nodes = kcalloc(tipc_max_nodes + 1,
- sizeof(*tipc_net.nodes), GFP_ATOMIC);
- tipc_net.highest_node = 0;
-
- return tipc_net.nodes ? 0 : -ENOMEM;
-}
-
-static void net_stop(void)
-{
- u32 n_num;
-
- for (n_num = 1; n_num <= tipc_net.highest_node; n_num++)
- tipc_node_delete(tipc_net.nodes[n_num]);
- kfree(tipc_net.nodes);
- tipc_net.nodes = NULL;
-}
static void net_route_named_msg(struct sk_buff *buf)
{
@@ -217,9 +198,6 @@ int tipc_net_start(u32 addr)
tipc_named_reinit();
tipc_port_reinit();
- res = net_start();
- if (res)
- return res;
res = tipc_bclink_init();
if (res)
return res;
@@ -235,14 +213,16 @@ int tipc_net_start(u32 addr)
void tipc_net_stop(void)
{
+ struct tipc_node *node, *t_node;
+
if (tipc_mode != TIPC_NET_MODE)
return;
write_lock_bh(&tipc_net_lock);
tipc_bearer_stop();
tipc_mode = TIPC_NODE_MODE;
tipc_bclink_stop();
- net_stop();
+ list_for_each_entry_safe(node, t_node, &tipc_node_list, list)
+ tipc_node_delete(node);
write_unlock_bh(&tipc_net_lock);
info("Left network mode\n");
}
-
diff --git a/net/tipc/net.h b/net/tipc/net.h
index 4ae59ad0489..9eb4b9e220e 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -2,7 +2,7 @@
* net/tipc/net.h: Include file for TIPC network routing code
*
* Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,23 +37,6 @@
#ifndef _TIPC_NET_H
#define _TIPC_NET_H
-struct tipc_node;
-
-/**
- * struct network - TIPC network structure
- * @nodes: array of pointers to all nodes within cluster
- * @highest_node: id of highest numbered node within cluster
- * @links: number of (unicast) links to cluster
- */
-
-struct network {
- struct tipc_node **nodes;
- u32 highest_node;
- u32 links;
-};
-
-
-extern struct network tipc_net;
extern rwlock_t tipc_net_lock;
void tipc_net_route_msg(struct sk_buff *buf);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 3af53e327f4..2d106ef4fa4 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -2,7 +2,7 @@
* net/tipc/node.c: TIPC node management routines
*
* Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005-2006, Wind River Systems
+ * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -44,9 +44,33 @@ static void node_established_contact(struct tipc_node *n_ptr);
static DEFINE_SPINLOCK(node_create_lock);
+static struct hlist_head node_htable[NODE_HTABLE_SIZE];
+LIST_HEAD(tipc_node_list);
+static u32 tipc_num_nodes;
+
+static atomic_t tipc_num_links = ATOMIC_INIT(0);
u32 tipc_own_tag;
/**
+ * tipc_node_find - locate specified node object, if it exists
+ */
+
+struct tipc_node *tipc_node_find(u32 addr)
+{
+ struct tipc_node *node;
+ struct hlist_node *pos;
+
+ if (unlikely(!in_own_cluster(addr)))
+ return NULL;
+
+ hlist_for_each_entry(node, pos, &node_htable[tipc_hashfn(addr)], hash) {
+ if (node->addr == addr)
+ return node;
+ }
+ return NULL;
+}
+
+/**
* tipc_node_create - create neighboring node
*
* Currently, this routine is called by neighbor discovery code, which holds
@@ -58,8 +82,7 @@ u32 tipc_own_tag;
struct tipc_node *tipc_node_create(u32 addr)
{
- struct tipc_node *n_ptr;
- u32 n_num;
+ struct tipc_node *n_ptr, *temp_node;
spin_lock_bh(&node_create_lock);
@@ -78,12 +101,19 @@ struct tipc_node *tipc_node_create(u32 addr)
n_ptr->addr = addr;
spin_lock_init(&n_ptr->lock);
+ INIT_HLIST_NODE(&n_ptr->hash);
+ INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->nsub);
- n_num = tipc_node(addr);
- tipc_net.nodes[n_num] = n_ptr;
- if (n_num > tipc_net.highest_node)
- tipc_net.highest_node = n_num;
+ hlist_add_head(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
+
+ list_for_each_entry(temp_node, &tipc_node_list, list) {
+ if (n_ptr->addr < temp_node->addr)
+ break;
+ }
+ list_add_tail(&n_ptr->list, &temp_node->list);
+
+ tipc_num_nodes++;
spin_unlock_bh(&node_create_lock);
return n_ptr;
@@ -91,18 +121,11 @@ struct tipc_node *tipc_node_create(u32 addr)
void tipc_node_delete(struct tipc_node *n_ptr)
{
- u32 n_num;
-
- if (!n_ptr)
- return;
-
- n_num = tipc_node(n_ptr->addr);
- tipc_net.nodes[n_num] = NULL;
+ list_del(&n_ptr->list);
+ hlist_del(&n_ptr->hash);
kfree(n_ptr);
- while (!tipc_net.nodes[tipc_net.highest_node])
- if (--tipc_net.highest_node == 0)
- break;
+ tipc_num_nodes--;
}
@@ -200,54 +223,32 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct link *l_ptr)
node_lost_contact(n_ptr);
}
-int tipc_node_has_active_links(struct tipc_node *n_ptr)
+int tipc_node_active_links(struct tipc_node *n_ptr)
{
return n_ptr->active_links[0] != NULL;
}
-int tipc_node_has_redundant_links(struct tipc_node *n_ptr)
+int tipc_node_redundant_links(struct tipc_node *n_ptr)
{
return n_ptr->working_links > 1;
}
int tipc_node_is_up(struct tipc_node *n_ptr)
{
- return tipc_node_has_active_links(n_ptr);
+ return tipc_node_active_links(n_ptr);
}
-struct tipc_node *tipc_node_attach_link(struct link *l_ptr)
+void tipc_node_attach_link(struct tipc_node *n_ptr, struct link *l_ptr)
{
- struct tipc_node *n_ptr = tipc_node_find(l_ptr->addr);
-
- if (!n_ptr)
- n_ptr = tipc_node_create(l_ptr->addr);
- if (n_ptr) {
- u32 bearer_id = l_ptr->b_ptr->identity;
- char addr_string[16];
-
- if (n_ptr->link_cnt >= 2) {
- err("Attempt to create third link to %s\n",
- tipc_addr_string_fill(addr_string, n_ptr->addr));
- return NULL;
- }
-
- if (!n_ptr->links[bearer_id]) {
- n_ptr->links[bearer_id] = l_ptr;
- tipc_net.links++;
- n_ptr->link_cnt++;
- return n_ptr;
- }
- err("Attempt to establish second link on <%s> to %s\n",
- l_ptr->b_ptr->publ.name,
- tipc_addr_string_fill(addr_string, l_ptr->addr));
- }
- return NULL;
+ n_ptr->links[l_ptr->b_ptr->identity] = l_ptr;
+ atomic_inc(&tipc_num_links);
+ n_ptr->link_cnt++;
}
void tipc_node_detach_link(struct tipc_node *n_ptr, struct link *l_ptr)
{
n_ptr->links[l_ptr->b_ptr->identity] = NULL;
- tipc_net.links--;
+ atomic_dec(&tipc_num_links);
n_ptr->link_cnt--;
}
@@ -327,7 +328,6 @@ static void node_cleanup_finished(unsigned long node_addr)
static void node_lost_contact(struct tipc_node *n_ptr)
{
- struct tipc_node_subscr *ns, *tns;
char addr_string[16];
u32 i;
@@ -365,12 +365,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
}
/* Notify subscribers */
- list_for_each_entry_safe(ns, tns, &n_ptr->nsub, nodesub_list) {
- ns->node = NULL;
- list_del_init(&ns->nodesub_list);
- tipc_k_signal((Handler)ns->handle_node_down,
- (unsigned long)ns->usr_handle);
- }
+ tipc_nodesub_notify(n_ptr);
/* Prevent re-contact with node until all cleanup is done */
@@ -385,7 +380,6 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
struct tipc_node *n_ptr;
struct tipc_node_info node_info;
u32 payload_size;
- u32 n_num;
if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_NET_ADDR))
return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@@ -396,15 +390,14 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
" (network address)");
read_lock_bh(&tipc_net_lock);
- if (!tipc_net.nodes) {
+ if (!tipc_num_nodes) {
read_unlock_bh(&tipc_net_lock);
return tipc_cfg_reply_none();
}
/* For now, get space for all other nodes */
- payload_size = TLV_SPACE(sizeof(node_info)) *
- (tipc_net.highest_node - 1);
+ payload_size = TLV_SPACE(sizeof(node_info)) * tipc_num_nodes;
if (payload_size > 32768u) {
read_unlock_bh(&tipc_net_lock);
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
@@ -418,9 +411,8 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
/* Add TLVs for all nodes in scope */
- for (n_num = 1; n_num <= tipc_net.highest_node; n_num++) {
- n_ptr = tipc_net.nodes[n_num];
- if (!n_ptr || !tipc_in_scope(domain, n_ptr->addr))
+ list_for_each_entry(n_ptr, &tipc_node_list, list) {
+ if (!tipc_in_scope(domain, n_ptr->addr))
continue;
node_info.addr = htonl(n_ptr->addr);
node_info.up = htonl(tipc_node_is_up(n_ptr));
@@ -439,7 +431,6 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
struct tipc_node *n_ptr;
struct tipc_link_info link_info;
u32 payload_size;
- u32 n_num;
if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_NET_ADDR))
return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@@ -456,7 +447,8 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
/* Get space for all unicast links + multicast link */
- payload_size = TLV_SPACE(sizeof(link_info)) * (tipc_net.links + 1);
+ payload_size = TLV_SPACE(sizeof(link_info)) *
+ (atomic_read(&tipc_num_links) + 1);
if (payload_size > 32768u) {
read_unlock_bh(&tipc_net_lock);
return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
@@ -470,18 +462,17 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
/* Add TLV for broadcast link */
- link_info.dest = htonl(tipc_own_addr & 0xfffff00);
+ link_info.dest = htonl(tipc_cluster_mask(tipc_own_addr));
link_info.up = htonl(1);
strlcpy(link_info.str, tipc_bclink_name, TIPC_MAX_LINK_NAME);
tipc_cfg_append_tlv(buf, TIPC_TLV_LINK_INFO, &link_info, sizeof(link_info));
/* Add TLVs for any other links in scope */
- for (n_num = 1; n_num <= tipc_net.highest_node; n_num++) {
+ list_for_each_entry(n_ptr, &tipc_node_list, list) {
u32 i;
- n_ptr = tipc_net.nodes[n_num];
- if (!n_ptr || !tipc_in_scope(domain, n_ptr->addr))
+ if (!tipc_in_scope(domain, n_ptr->addr))
continue;
tipc_node_lock(n_ptr);
for (i = 0; i < MAX_BEARERS; i++) {
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 206a8efa410..5c61afc7a0b 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -2,7 +2,7 @@
* net/tipc/node.h: Include file for TIPC node management routines
*
* Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -46,7 +46,8 @@
* struct tipc_node - TIPC node structure
* @addr: network address of node
* @lock: spinlock governing access to structure
- * @next: pointer to next node in sorted list of cluster's nodes
+ * @hash: links to adjacent nodes in unsorted hash chain
+ * @list: links to adjacent nodes in sorted list of cluster's nodes
* @nsub: list of "node down" subscriptions monitoring node
* @active_links: pointers to active links to node
* @links: pointers to all links to node
@@ -69,7 +70,8 @@
struct tipc_node {
u32 addr;
spinlock_t lock;
- struct tipc_node *next;
+ struct hlist_node hash;
+ struct list_head list;
struct list_head nsub;
struct link *active_links[2];
struct link *links[MAX_BEARERS];
@@ -90,27 +92,35 @@ struct tipc_node {
} bclink;
};
+#define NODE_HTABLE_SIZE 512
+extern struct list_head tipc_node_list;
+
+/*
+ * A trivial power-of-two bitmask technique is used for speed, since this
+ * operation is done for every incoming TIPC packet. The number of hash table
+ * entries has been chosen so that no hash chain exceeds 8 nodes and will
+ * usually be much smaller (typically only a single node).
+ */
+static inline unsigned int tipc_hashfn(u32 addr)
+{
+ return addr & (NODE_HTABLE_SIZE - 1);
+}
+
extern u32 tipc_own_tag;
+struct tipc_node *tipc_node_find(u32 addr);
struct tipc_node *tipc_node_create(u32 addr);
void tipc_node_delete(struct tipc_node *n_ptr);
-struct tipc_node *tipc_node_attach_link(struct link *l_ptr);
+void tipc_node_attach_link(struct tipc_node *n_ptr, struct link *l_ptr);
void tipc_node_detach_link(struct tipc_node *n_ptr, struct link *l_ptr);
void tipc_node_link_down(struct tipc_node *n_ptr, struct link *l_ptr);
void tipc_node_link_up(struct tipc_node *n_ptr, struct link *l_ptr);
-int tipc_node_has_active_links(struct tipc_node *n_ptr);
-int tipc_node_has_redundant_links(struct tipc_node *n_ptr);
+int tipc_node_active_links(struct tipc_node *n_ptr);
+int tipc_node_redundant_links(struct tipc_node *n_ptr);
int tipc_node_is_up(struct tipc_node *n_ptr);
struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space);
struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space);
-static inline struct tipc_node *tipc_node_find(u32 addr)
-{
- if (likely(in_own_cluster(addr)))
- return tipc_net.nodes[tipc_node(addr)];
- return NULL;
-}
-
static inline void tipc_node_lock(struct tipc_node *n_ptr)
{
spin_lock_bh(&n_ptr->lock);
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index 018a55332d9..c3c2815ae63 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -2,7 +2,7 @@
* net/tipc/node_subscr.c: TIPC "node down" subscription handling
*
* Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -76,3 +76,22 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
list_del_init(&node_sub->nodesub_list);
tipc_node_unlock(node_sub->node);
}
+
+/**
+ * tipc_nodesub_notify - notify subscribers that a node is unreachable
+ *
+ * Note: node is locked by caller
+ */
+
+void tipc_nodesub_notify(struct tipc_node *node)
+{
+ struct tipc_node_subscr *ns;
+
+ list_for_each_entry(ns, &node->nsub, nodesub_list) {
+ if (ns->handle_node_down) {
+ tipc_k_signal((Handler)ns->handle_node_down,
+ (unsigned long)ns->usr_handle);
+ ns->handle_node_down = NULL;
+ }
+ }
+}
diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h
index 006ed739f51..4bc2ca0867a 100644
--- a/net/tipc/node_subscr.h
+++ b/net/tipc/node_subscr.h
@@ -2,7 +2,7 @@
* net/tipc/node_subscr.h: Include file for TIPC "node down" subscription handling
*
* Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -59,5 +59,6 @@ struct tipc_node_subscr {
void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
void *usr_handle, net_ev_handler handle_down);
void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub);
+void tipc_nodesub_notify(struct tipc_node *node);
#endif
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 067bab2a0b9..6ff78f9c7d6 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -2,7 +2,7 @@
* net/tipc/port.c: TIPC port code
*
* Copyright (c) 1992-2007, Ericsson AB
- * Copyright (c) 2004-2008, Wind River Systems
+ * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -54,33 +54,19 @@ static DEFINE_SPINLOCK(queue_lock);
static LIST_HEAD(ports);
static void port_handle_node_down(unsigned long ref);
-static struct sk_buff *port_build_self_abort_msg(struct port *, u32 err);
-static struct sk_buff *port_build_peer_abort_msg(struct port *, u32 err);
+static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
+static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
static void port_timeout(unsigned long ref);
-static u32 port_peernode(struct port *p_ptr)
+static u32 port_peernode(struct tipc_port *p_ptr)
{
- return msg_destnode(&p_ptr->publ.phdr);
+ return msg_destnode(&p_ptr->phdr);
}
-static u32 port_peerport(struct port *p_ptr)
+static u32 port_peerport(struct tipc_port *p_ptr)
{
- return msg_destport(&p_ptr->publ.phdr);
-}
-
-static u32 port_out_seqno(struct port *p_ptr)
-{
- return msg_transp_seqno(&p_ptr->publ.phdr);
-}
-
-static void port_incr_out_seqno(struct port *p_ptr)
-{
- struct tipc_msg *m = &p_ptr->publ.phdr;
-
- if (likely(!msg_routed(m)))
- return;
- msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
+ return msg_destport(&p_ptr->phdr);
}
/**
@@ -94,7 +80,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
struct sk_buff *buf;
struct sk_buff *ibuf = NULL;
struct port_list dports = {0, NULL, };
- struct port *oport = tipc_port_deref(ref);
+ struct tipc_port *oport = tipc_port_deref(ref);
int ext_targets;
int res;
@@ -103,7 +89,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
/* Create multicast message */
- hdr = &oport->publ.phdr;
+ hdr = &oport->phdr;
msg_set_type(hdr, TIPC_MCAST_MSG);
msg_set_nametype(hdr, seq->type);
msg_set_namelower(hdr, seq->lower);
@@ -211,7 +197,7 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
void (*wakeup)(struct tipc_port *),
const u32 importance)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct tipc_msg *msg;
u32 ref;
@@ -220,21 +206,19 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
warn("Port creation failed, no memory\n");
return NULL;
}
- ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);
+ ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
if (!ref) {
warn("Port creation failed, reference table exhausted\n");
kfree(p_ptr);
return NULL;
}
- p_ptr->publ.usr_handle = usr_handle;
- p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;
- p_ptr->publ.ref = ref;
- msg = &p_ptr->publ.phdr;
+ p_ptr->usr_handle = usr_handle;
+ p_ptr->max_pkt = MAX_PKT_DEFAULT;
+ p_ptr->ref = ref;
+ msg = &p_ptr->phdr;
tipc_msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);
msg_set_origport(msg, ref);
- p_ptr->last_in_seqno = 41;
- p_ptr->sent = 1;
INIT_LIST_HEAD(&p_ptr->wait_list);
INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
p_ptr->dispatcher = dispatcher;
@@ -246,12 +230,12 @@ struct tipc_port *tipc_createport_raw(void *usr_handle,
INIT_LIST_HEAD(&p_ptr->port_list);
list_add_tail(&p_ptr->port_list, &ports);
spin_unlock_bh(&tipc_port_list_lock);
- return &(p_ptr->publ);
+ return p_ptr;
}
int tipc_deleteport(u32 ref)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
tipc_withdraw(ref, 0, NULL);
@@ -263,7 +247,7 @@ int tipc_deleteport(u32 ref)
tipc_port_unlock(p_ptr);
k_cancel_timer(&p_ptr->timer);
- if (p_ptr->publ.connected) {
+ if (p_ptr->connected) {
buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
tipc_nodesub_unsubscribe(&p_ptr->subscription);
}
@@ -279,14 +263,14 @@ int tipc_deleteport(u32 ref)
return 0;
}
-static int port_unreliable(struct port *p_ptr)
+static int port_unreliable(struct tipc_port *p_ptr)
{
- return msg_src_droppable(&p_ptr->publ.phdr);
+ return msg_src_droppable(&p_ptr->phdr);
}
int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
@@ -298,24 +282,24 @@ int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
- msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
+ msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
tipc_port_unlock(p_ptr);
return 0;
}
-static int port_unreturnable(struct port *p_ptr)
+static int port_unreturnable(struct tipc_port *p_ptr)
{
- return msg_dest_droppable(&p_ptr->publ.phdr);
+ return msg_dest_droppable(&p_ptr->phdr);
}
int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
@@ -327,12 +311,12 @@ int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
- msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
+ msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
tipc_port_unlock(p_ptr);
return 0;
}
@@ -345,7 +329,7 @@ int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
u32 origport, u32 orignode,
u32 usr, u32 type, u32 err,
- u32 seqno, u32 ack)
+ u32 ack)
{
struct sk_buff *buf;
struct tipc_msg *msg;
@@ -358,7 +342,6 @@ static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
msg_set_destport(msg, destport);
msg_set_origport(msg, origport);
msg_set_orignode(msg, orignode);
- msg_set_transp_seqno(msg, seqno);
msg_set_msgcnt(msg, ack);
}
return buf;
@@ -413,10 +396,10 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
/* send self-abort message when rejecting on a connected port */
if (msg_connected(msg)) {
struct sk_buff *abuf = NULL;
- struct port *p_ptr = tipc_port_lock(msg_destport(msg));
+ struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
if (p_ptr) {
- if (p_ptr->publ.connected)
+ if (p_ptr->connected)
abuf = port_build_self_abort_msg(p_ptr, err);
tipc_port_unlock(p_ptr);
}
@@ -429,7 +412,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
return data_sz;
}
-int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
+int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
struct iovec const *msg_sect, u32 num_sect,
int err)
{
@@ -446,13 +429,13 @@ int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
static void port_timeout(unsigned long ref)
{
- struct port *p_ptr = tipc_port_lock(ref);
+ struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
if (!p_ptr)
return;
- if (!p_ptr->publ.connected) {
+ if (!p_ptr->connected) {
tipc_port_unlock(p_ptr);
return;
}
@@ -463,14 +446,12 @@ static void port_timeout(unsigned long ref)
} else {
buf = port_build_proto_msg(port_peerport(p_ptr),
port_peernode(p_ptr),
- p_ptr->publ.ref,
+ p_ptr->ref,
tipc_own_addr,
CONN_MANAGER,
CONN_PROBE,
TIPC_OK,
- port_out_seqno(p_ptr),
0);
- port_incr_out_seqno(p_ptr);
p_ptr->probing_state = PROBING;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
}
@@ -481,7 +462,7 @@ static void port_timeout(unsigned long ref)
static void port_handle_node_down(unsigned long ref)
{
- struct port *p_ptr = tipc_port_lock(ref);
+ struct tipc_port *p_ptr = tipc_port_lock(ref);
struct sk_buff *buf = NULL;
if (!p_ptr)
@@ -492,73 +473,71 @@ static void port_handle_node_down(unsigned long ref)
}
-static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
+static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
{
- u32 imp = msg_importance(&p_ptr->publ.phdr);
+ u32 imp = msg_importance(&p_ptr->phdr);
- if (!p_ptr->publ.connected)
+ if (!p_ptr->connected)
return NULL;
if (imp < TIPC_CRITICAL_IMPORTANCE)
imp++;
- return port_build_proto_msg(p_ptr->publ.ref,
+ return port_build_proto_msg(p_ptr->ref,
tipc_own_addr,
port_peerport(p_ptr),
port_peernode(p_ptr),
imp,
TIPC_CONN_MSG,
err,
- p_ptr->last_in_seqno + 1,
0);
}
-static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
+static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
{
- u32 imp = msg_importance(&p_ptr->publ.phdr);
+ u32 imp = msg_importance(&p_ptr->phdr);
- if (!p_ptr->publ.connected)
+ if (!p_ptr->connected)
return NULL;
if (imp < TIPC_CRITICAL_IMPORTANCE)
imp++;
return port_build_proto_msg(port_peerport(p_ptr),
port_peernode(p_ptr),
- p_ptr->publ.ref,
+ p_ptr->ref,
tipc_own_addr,
imp,
TIPC_CONN_MSG,
err,
- port_out_seqno(p_ptr),
0);
}
void tipc_port_recv_proto_msg(struct sk_buff *buf)
{
struct tipc_msg *msg = buf_msg(buf);
- struct port *p_ptr = tipc_port_lock(msg_destport(msg));
+ struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
u32 err = TIPC_OK;
struct sk_buff *r_buf = NULL;
struct sk_buff *abort_buf = NULL;
if (!p_ptr) {
err = TIPC_ERR_NO_PORT;
- } else if (p_ptr->publ.connected) {
+ } else if (p_ptr->connected) {
if ((port_peernode(p_ptr) != msg_orignode(msg)) ||
(port_peerport(p_ptr) != msg_origport(msg))) {
err = TIPC_ERR_NO_PORT;
} else if (msg_type(msg) == CONN_ACK) {
int wakeup = tipc_port_congested(p_ptr) &&
- p_ptr->publ.congested &&
+ p_ptr->congested &&
p_ptr->wakeup;
p_ptr->acked += msg_msgcnt(msg);
if (tipc_port_congested(p_ptr))
goto exit;
- p_ptr->publ.congested = 0;
+ p_ptr->congested = 0;
if (!wakeup)
goto exit;
- p_ptr->wakeup(&p_ptr->publ);
+ p_ptr->wakeup(p_ptr);
goto exit;
}
- } else if (p_ptr->publ.published) {
+ } else if (p_ptr->published) {
err = TIPC_ERR_NO_PORT;
}
if (err) {
@@ -569,7 +548,6 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
TIPC_HIGH_IMPORTANCE,
TIPC_CONN_MSG,
err,
- 0,
0);
goto exit;
}
@@ -583,11 +561,9 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
CONN_MANAGER,
CONN_PROBE_REPLY,
TIPC_OK,
- port_out_seqno(p_ptr),
0);
}
p_ptr->probing_state = CONFIRMED;
- port_incr_out_seqno(p_ptr);
exit:
if (p_ptr)
tipc_port_unlock(p_ptr);
@@ -596,29 +572,29 @@ exit:
buf_discard(buf);
}
-static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
+static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
{
struct publication *publ;
if (full_id)
tipc_printf(buf, "<%u.%u.%u:%u>:",
tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
- tipc_node(tipc_own_addr), p_ptr->publ.ref);
+ tipc_node(tipc_own_addr), p_ptr->ref);
else
- tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
+ tipc_printf(buf, "%-10u:", p_ptr->ref);
- if (p_ptr->publ.connected) {
+ if (p_ptr->connected) {
u32 dport = port_peerport(p_ptr);
u32 destnode = port_peernode(p_ptr);
tipc_printf(buf, " connected to <%u.%u.%u:%u>",
tipc_zone(destnode), tipc_cluster(destnode),
tipc_node(destnode), dport);
- if (p_ptr->publ.conn_type != 0)
+ if (p_ptr->conn_type != 0)
tipc_printf(buf, " via {%u,%u}",
- p_ptr->publ.conn_type,
- p_ptr->publ.conn_instance);
- } else if (p_ptr->publ.published) {
+ p_ptr->conn_type,
+ p_ptr->conn_instance);
+ } else if (p_ptr->published) {
tipc_printf(buf, " bound to");
list_for_each_entry(publ, &p_ptr->publications, pport_list) {
if (publ->lower == publ->upper)
@@ -639,7 +615,7 @@ struct sk_buff *tipc_port_get_ports(void)
struct sk_buff *buf;
struct tlv_desc *rep_tlv;
struct print_buf pb;
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
int str_len;
buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
@@ -650,9 +626,9 @@ struct sk_buff *tipc_port_get_ports(void)
tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
spin_lock_bh(&tipc_port_list_lock);
list_for_each_entry(p_ptr, &ports, port_list) {
- spin_lock_bh(p_ptr->publ.lock);
+ spin_lock_bh(p_ptr->lock);
port_print(p_ptr, &pb, 0);
- spin_unlock_bh(p_ptr->publ.lock);
+ spin_unlock_bh(p_ptr->lock);
}
spin_unlock_bh(&tipc_port_list_lock);
str_len = tipc_printbuf_validate(&pb);
@@ -665,12 +641,12 @@ struct sk_buff *tipc_port_get_ports(void)
void tipc_port_reinit(void)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct tipc_msg *msg;
spin_lock_bh(&tipc_port_list_lock);
list_for_each_entry(p_ptr, &ports, port_list) {
- msg = &p_ptr->publ.phdr;
+ msg = &p_ptr->phdr;
if (msg_orignode(msg) == tipc_own_addr)
break;
msg_set_prevnode(msg, tipc_own_addr);
@@ -695,7 +671,7 @@ static void port_dispatcher_sigh(void *dummy)
spin_unlock_bh(&queue_lock);
while (buf) {
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct user_port *up_ptr;
struct tipc_portid orig;
struct tipc_name_seq dseq;
@@ -720,8 +696,8 @@ static void port_dispatcher_sigh(void *dummy)
orig.node = msg_orignode(msg);
up_ptr = p_ptr->user_port;
usr_handle = up_ptr->usr_handle;
- connected = p_ptr->publ.connected;
- published = p_ptr->publ.published;
+ connected = p_ptr->connected;
+ published = p_ptr->published;
if (unlikely(msg_errcode(msg)))
goto err;
@@ -732,6 +708,7 @@ static void port_dispatcher_sigh(void *dummy)
tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
u32 peer_port = port_peerport(p_ptr);
u32 peer_node = port_peernode(p_ptr);
+ u32 dsz;
tipc_port_unlock(p_ptr);
if (unlikely(!cb))
@@ -742,13 +719,14 @@ static void port_dispatcher_sigh(void *dummy)
} else if ((msg_origport(msg) != peer_port) ||
(msg_orignode(msg) != peer_node))
goto reject;
- if (unlikely(++p_ptr->publ.conn_unacked >=
- TIPC_FLOW_CONTROL_WIN))
+ dsz = msg_data_sz(msg);
+ if (unlikely(dsz &&
+ (++p_ptr->conn_unacked >=
+ TIPC_FLOW_CONTROL_WIN)))
tipc_acknowledge(dref,
- p_ptr->publ.conn_unacked);
+ p_ptr->conn_unacked);
skb_pull(buf, msg_hdr_sz(msg));
- cb(usr_handle, dref, &buf, msg_data(msg),
- msg_data_sz(msg));
+ cb(usr_handle, dref, &buf, msg_data(msg), dsz);
break;
}
case TIPC_DIRECT_MSG:{
@@ -872,7 +850,7 @@ static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
static void port_wakeup_sh(unsigned long ref)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct user_port *up_ptr;
tipc_continue_event cb = NULL;
void *uh = NULL;
@@ -898,14 +876,14 @@ static void port_wakeup(struct tipc_port *p_ptr)
void tipc_acknowledge(u32 ref, u32 ack)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return;
- if (p_ptr->publ.connected) {
- p_ptr->publ.conn_unacked -= ack;
+ if (p_ptr->connected) {
+ p_ptr->conn_unacked -= ack;
buf = port_build_proto_msg(port_peerport(p_ptr),
port_peernode(p_ptr),
ref,
@@ -913,7 +891,6 @@ void tipc_acknowledge(u32 ref, u32 ack)
CONN_MANAGER,
CONN_ACK,
TIPC_OK,
- port_out_seqno(p_ptr),
ack);
}
tipc_port_unlock(p_ptr);
@@ -936,14 +913,14 @@ int tipc_createport(void *usr_handle,
u32 *portref)
{
struct user_port *up_ptr;
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
if (!up_ptr) {
warn("Port creation failed, no memory\n");
return -ENOMEM;
}
- p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher,
+ p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher,
port_wakeup, importance);
if (!p_ptr) {
kfree(up_ptr);
@@ -952,7 +929,7 @@ int tipc_createport(void *usr_handle,
p_ptr->user_port = up_ptr;
up_ptr->usr_handle = usr_handle;
- up_ptr->ref = p_ptr->publ.ref;
+ up_ptr->ref = p_ptr->ref;
up_ptr->err_cb = error_cb;
up_ptr->named_err_cb = named_error_cb;
up_ptr->conn_err_cb = conn_error_cb;
@@ -960,26 +937,26 @@ int tipc_createport(void *usr_handle,
up_ptr->named_msg_cb = named_msg_cb;
up_ptr->conn_msg_cb = conn_msg_cb;
up_ptr->continue_event_cb = continue_event_cb;
- *portref = p_ptr->publ.ref;
+ *portref = p_ptr->ref;
tipc_port_unlock(p_ptr);
return 0;
}
int tipc_portimportance(u32 ref, unsigned int *importance)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
- *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
+ *importance = (unsigned int)msg_importance(&p_ptr->phdr);
tipc_port_unlock(p_ptr);
return 0;
}
int tipc_set_portimportance(u32 ref, unsigned int imp)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
if (imp > TIPC_CRITICAL_IMPORTANCE)
return -EINVAL;
@@ -987,7 +964,7 @@ int tipc_set_portimportance(u32 ref, unsigned int imp)
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
- msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
+ msg_set_importance(&p_ptr->phdr, (u32)imp);
tipc_port_unlock(p_ptr);
return 0;
}
@@ -995,7 +972,7 @@ int tipc_set_portimportance(u32 ref, unsigned int imp)
int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct publication *publ;
u32 key;
int res = -EINVAL;
@@ -1004,7 +981,7 @@ int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
if (!p_ptr)
return -EINVAL;
- if (p_ptr->publ.connected)
+ if (p_ptr->connected)
goto exit;
if (seq->lower > seq->upper)
goto exit;
@@ -1016,11 +993,11 @@ int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
goto exit;
}
publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
- scope, p_ptr->publ.ref, key);
+ scope, p_ptr->ref, key);
if (publ) {
list_add(&publ->pport_list, &p_ptr->publications);
p_ptr->pub_count++;
- p_ptr->publ.published = 1;
+ p_ptr->published = 1;
res = 0;
}
exit:
@@ -1030,7 +1007,7 @@ exit:
int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct publication *publ;
struct publication *tpubl;
int res = -EINVAL;
@@ -1063,37 +1040,36 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
}
}
if (list_empty(&p_ptr->publications))
- p_ptr->publ.published = 0;
+ p_ptr->published = 0;
tipc_port_unlock(p_ptr);
return res;
}
int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct tipc_msg *msg;
int res = -EINVAL;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
- if (p_ptr->publ.published || p_ptr->publ.connected)
+ if (p_ptr->published || p_ptr->connected)
goto exit;
if (!peer->ref)
goto exit;
- msg = &p_ptr->publ.phdr;
+ msg = &p_ptr->phdr;
msg_set_destnode(msg, peer->node);
msg_set_destport(msg, peer->ref);
msg_set_orignode(msg, tipc_own_addr);
- msg_set_origport(msg, p_ptr->publ.ref);
- msg_set_transp_seqno(msg, 42);
+ msg_set_origport(msg, p_ptr->ref);
msg_set_type(msg, TIPC_CONN_MSG);
msg_set_hdr_sz(msg, SHORT_H_SIZE);
p_ptr->probing_interval = PROBING_INTERVAL;
p_ptr->probing_state = CONFIRMED;
- p_ptr->publ.connected = 1;
+ p_ptr->connected = 1;
k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
@@ -1102,7 +1078,7 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
res = 0;
exit:
tipc_port_unlock(p_ptr);
- p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref);
+ p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
return res;
}
@@ -1120,7 +1096,7 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr)
tp_ptr->connected = 0;
/* let timer expire on it's own to avoid deadlock! */
tipc_nodesub_unsubscribe(
- &((struct port *)tp_ptr)->subscription);
+ &((struct tipc_port *)tp_ptr)->subscription);
res = 0;
} else {
res = -ENOTCONN;
@@ -1135,7 +1111,7 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr)
int tipc_disconnect(u32 ref)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
int res;
p_ptr = tipc_port_lock(ref);
@@ -1151,15 +1127,15 @@ int tipc_disconnect(u32 ref)
*/
int tipc_shutdown(u32 ref)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct sk_buff *buf = NULL;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
- if (p_ptr->publ.connected) {
- u32 imp = msg_importance(&p_ptr->publ.phdr);
+ if (p_ptr->connected) {
+ u32 imp = msg_importance(&p_ptr->phdr);
if (imp < TIPC_CRITICAL_IMPORTANCE)
imp++;
buf = port_build_proto_msg(port_peerport(p_ptr),
@@ -1169,7 +1145,6 @@ int tipc_shutdown(u32 ref)
imp,
TIPC_CONN_MSG,
TIPC_CONN_SHUTDOWN,
- port_out_seqno(p_ptr),
0);
}
tipc_port_unlock(p_ptr);
@@ -1182,13 +1157,13 @@ int tipc_shutdown(u32 ref)
* message for this node.
*/
-static int tipc_port_recv_sections(struct port *sender, unsigned int num_sect,
+static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
struct iovec const *msg_sect)
{
struct sk_buff *buf;
int res;
- res = tipc_msg_build(&sender->publ.phdr, msg_sect, num_sect,
+ res = tipc_msg_build(&sender->phdr, msg_sect, num_sect,
MAX_MSG_SIZE, !sender->user_port, &buf);
if (likely(buf))
tipc_port_recv_msg(buf);
@@ -1201,15 +1176,15 @@ static int tipc_port_recv_sections(struct port *sender, unsigned int num_sect,
int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
u32 destnode;
int res;
p_ptr = tipc_port_deref(ref);
- if (!p_ptr || !p_ptr->publ.connected)
+ if (!p_ptr || !p_ptr->connected)
return -EINVAL;
- p_ptr->publ.congested = 1;
+ p_ptr->congested = 1;
if (!tipc_port_congested(p_ptr)) {
destnode = port_peernode(p_ptr);
if (likely(destnode != tipc_own_addr))
@@ -1219,14 +1194,14 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
if (likely(res != -ELINKCONG)) {
- port_incr_out_seqno(p_ptr);
- p_ptr->publ.congested = 0;
- p_ptr->sent++;
+ p_ptr->congested = 0;
+ if (res > 0)
+ p_ptr->sent++;
return res;
}
}
if (port_unreliable(p_ptr)) {
- p_ptr->publ.congested = 0;
+ p_ptr->congested = 0;
/* Just calculate msg length and return */
return tipc_msg_calc_data_size(msg_sect, num_sect);
}
@@ -1240,17 +1215,17 @@ int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
unsigned int num_sect, struct iovec const *msg_sect)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct tipc_msg *msg;
u32 destnode = domain;
u32 destport;
int res;
p_ptr = tipc_port_deref(ref);
- if (!p_ptr || p_ptr->publ.connected)
+ if (!p_ptr || p_ptr->connected)
return -EINVAL;
- msg = &p_ptr->publ.phdr;
+ msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_NAMED_MSG);
msg_set_orignode(msg, tipc_own_addr);
msg_set_origport(msg, ref);
@@ -1263,13 +1238,17 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
msg_set_destport(msg, destport);
if (likely(destport)) {
- p_ptr->sent++;
if (likely(destnode == tipc_own_addr))
- return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
- res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
- destnode);
- if (likely(res != -ELINKCONG))
+ res = tipc_port_recv_sections(p_ptr, num_sect,
+ msg_sect);
+ else
+ res = tipc_link_send_sections_fast(p_ptr, msg_sect,
+ num_sect, destnode);
+ if (likely(res != -ELINKCONG)) {
+ if (res > 0)
+ p_ptr->sent++;
return res;
+ }
if (port_unreliable(p_ptr)) {
/* Just calculate msg length and return */
return tipc_msg_calc_data_size(msg_sect, num_sect);
@@ -1287,27 +1266,32 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
int tipc_send2port(u32 ref, struct tipc_portid const *dest,
unsigned int num_sect, struct iovec const *msg_sect)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct tipc_msg *msg;
int res;
p_ptr = tipc_port_deref(ref);
- if (!p_ptr || p_ptr->publ.connected)
+ if (!p_ptr || p_ptr->connected)
return -EINVAL;
- msg = &p_ptr->publ.phdr;
+ msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_DIRECT_MSG);
msg_set_orignode(msg, tipc_own_addr);
msg_set_origport(msg, ref);
msg_set_destnode(msg, dest->node);
msg_set_destport(msg, dest->ref);
msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
- p_ptr->sent++;
+
if (dest->node == tipc_own_addr)
- return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
- res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
- if (likely(res != -ELINKCONG))
+ res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
+ else
+ res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
+ dest->node);
+ if (likely(res != -ELINKCONG)) {
+ if (res > 0)
+ p_ptr->sent++;
return res;
+ }
if (port_unreliable(p_ptr)) {
/* Just calculate msg length and return */
return tipc_msg_calc_data_size(msg_sect, num_sect);
@@ -1322,15 +1306,15 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
struct sk_buff *buf, unsigned int dsz)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct tipc_msg *msg;
int res;
- p_ptr = (struct port *)tipc_ref_deref(ref);
- if (!p_ptr || p_ptr->publ.connected)
+ p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
+ if (!p_ptr || p_ptr->connected)
return -EINVAL;
- msg = &p_ptr->publ.phdr;
+ msg = &p_ptr->phdr;
msg_set_type(msg, TIPC_DIRECT_MSG);
msg_set_orignode(msg, tipc_own_addr);
msg_set_origport(msg, ref);
@@ -1343,12 +1327,16 @@ int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
skb_push(buf, DIR_MSG_H_SIZE);
skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE);
- p_ptr->sent++;
+
if (dest->node == tipc_own_addr)
- return tipc_port_recv_msg(buf);
- res = tipc_send_buf_fast(buf, dest->node);
- if (likely(res != -ELINKCONG))
+ res = tipc_port_recv_msg(buf);
+ else
+ res = tipc_send_buf_fast(buf, dest->node);
+ if (likely(res != -ELINKCONG)) {
+ if (res > 0)
+ p_ptr->sent++;
return res;
+ }
if (port_unreliable(p_ptr))
return dsz;
return -ELINKCONG;
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 8e84b989949..87b9424ae0e 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -2,7 +2,7 @@
* net/tipc/port.h: Include file for TIPC port code
*
* Copyright (c) 1994-2007, Ericsson AB
- * Copyright (c) 2004-2007, Wind River Systems
+ * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -95,7 +95,7 @@ struct user_port {
};
/**
- * struct tipc_port - TIPC port info available to socket API
+ * struct tipc_port - TIPC port structure
* @usr_handle: pointer to additional user-defined information about port
* @lock: pointer to spinlock for controlling access to port
* @connected: non-zero if port is currently connected to a peer port
@@ -107,43 +107,33 @@ struct user_port {
* @max_pkt: maximum packet size "hint" used when building messages sent by port
* @ref: unique reference to port in TIPC object registry
* @phdr: preformatted message header used when sending messages
- */
-struct tipc_port {
- void *usr_handle;
- spinlock_t *lock;
- int connected;
- u32 conn_type;
- u32 conn_instance;
- u32 conn_unacked;
- int published;
- u32 congested;
- u32 max_pkt;
- u32 ref;
- struct tipc_msg phdr;
-};
-
-/**
- * struct port - TIPC port structure
- * @publ: TIPC port info available to privileged users
* @port_list: adjacent ports in TIPC's global list of ports
* @dispatcher: ptr to routine which handles received messages
* @wakeup: ptr to routine to call when port is no longer congested
* @user_port: ptr to user port associated with port (if any)
* @wait_list: adjacent ports in list of ports waiting on link congestion
* @waiting_pkts:
- * @sent:
- * @acked:
+ * @sent: # of non-empty messages sent by port
+ * @acked: # of non-empty message acknowledgements from connected port's peer
* @publications: list of publications for port
* @pub_count: total # of publications port has made during its lifetime
* @probing_state:
* @probing_interval:
- * @last_in_seqno:
* @timer_ref:
* @subscription: "node down" subscription used to terminate failed connections
*/
-
-struct port {
- struct tipc_port publ;
+struct tipc_port {
+ void *usr_handle;
+ spinlock_t *lock;
+ int connected;
+ u32 conn_type;
+ u32 conn_instance;
+ u32 conn_unacked;
+ int published;
+ u32 congested;
+ u32 max_pkt;
+ u32 ref;
+ struct tipc_msg phdr;
struct list_head port_list;
u32 (*dispatcher)(struct tipc_port *, struct sk_buff *);
void (*wakeup)(struct tipc_port *);
@@ -156,7 +146,6 @@ struct port {
u32 pub_count;
u32 probing_state;
u32 probing_interval;
- u32 last_in_seqno;
struct timer_list timer;
struct tipc_node_subscr subscription;
};
@@ -230,7 +219,7 @@ int tipc_send_buf2port(u32 portref, struct tipc_portid const *dest,
int tipc_multicast(u32 portref, struct tipc_name_seq const *seq,
unsigned int section_count, struct iovec const *msg);
-int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
+int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
struct iovec const *msg_sect, u32 num_sect,
int err);
struct sk_buff *tipc_port_get_ports(void);
@@ -242,9 +231,9 @@ void tipc_port_reinit(void);
* tipc_port_lock - lock port instance referred to and return its pointer
*/
-static inline struct port *tipc_port_lock(u32 ref)
+static inline struct tipc_port *tipc_port_lock(u32 ref)
{
- return (struct port *)tipc_ref_lock(ref);
+ return (struct tipc_port *)tipc_ref_lock(ref);
}
/**
@@ -253,27 +242,27 @@ static inline struct port *tipc_port_lock(u32 ref)
* Can use pointer instead of tipc_ref_unlock() since port is already locked.
*/
-static inline void tipc_port_unlock(struct port *p_ptr)
+static inline void tipc_port_unlock(struct tipc_port *p_ptr)
{
- spin_unlock_bh(p_ptr->publ.lock);
+ spin_unlock_bh(p_ptr->lock);
}
-static inline struct port *tipc_port_deref(u32 ref)
+static inline struct tipc_port *tipc_port_deref(u32 ref)
{
- return (struct port *)tipc_ref_deref(ref);
+ return (struct tipc_port *)tipc_ref_deref(ref);
}
-static inline u32 tipc_peer_port(struct port *p_ptr)
+static inline u32 tipc_peer_port(struct tipc_port *p_ptr)
{
- return msg_destport(&p_ptr->publ.phdr);
+ return msg_destport(&p_ptr->phdr);
}
-static inline u32 tipc_peer_node(struct port *p_ptr)
+static inline u32 tipc_peer_node(struct tipc_port *p_ptr)
{
- return msg_destnode(&p_ptr->publ.phdr);
+ return msg_destnode(&p_ptr->phdr);
}
-static inline int tipc_port_congested(struct port *p_ptr)
+static inline int tipc_port_congested(struct tipc_port *p_ptr)
{
return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2);
}
@@ -284,7 +273,7 @@ static inline int tipc_port_congested(struct port *p_ptr)
static inline int tipc_port_recv_msg(struct sk_buff *buf)
{
- struct port *p_ptr;
+ struct tipc_port *p_ptr;
struct tipc_msg *msg = buf_msg(buf);
u32 destport = msg_destport(msg);
u32 dsz = msg_data_sz(msg);
@@ -299,7 +288,7 @@ static inline int tipc_port_recv_msg(struct sk_buff *buf)
/* validate destination & pass to port, otherwise reject message */
p_ptr = tipc_port_lock(destport);
if (likely(p_ptr)) {
- if (likely(p_ptr->publ.connected)) {
+ if (likely(p_ptr->connected)) {
if ((unlikely(msg_origport(msg) != tipc_peer_port(p_ptr))) ||
(unlikely(msg_orignode(msg) != tipc_peer_node(p_ptr))) ||
(unlikely(!msg_connected(msg)))) {
@@ -308,7 +297,7 @@ static inline int tipc_port_recv_msg(struct sk_buff *buf)
goto reject;
}
}
- err = p_ptr->dispatcher(&p_ptr->publ, buf);
+ err = p_ptr->dispatcher(p_ptr, buf);
tipc_port_unlock(p_ptr);
if (likely(!err))
return dsz;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 2b02a3a8031..29d94d53198 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -2,7 +2,7 @@
* net/tipc/socket.c: TIPC socket API
*
* Copyright (c) 2001-2007, Ericsson AB
- * Copyright (c) 2004-2008, Wind River Systems
+ * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -58,6 +58,9 @@ struct tipc_sock {
#define tipc_sk(sk) ((struct tipc_sock *)(sk))
#define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
+#define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
+ (sock->state == SS_DISCONNECTING))
+
static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
@@ -241,7 +244,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
tipc_set_portunreliable(tp_ptr->ref, 1);
}
- atomic_inc(&tipc_user_count);
return 0;
}
@@ -290,7 +292,7 @@ static int release(struct socket *sock)
if (buf == NULL)
break;
atomic_dec(&tipc_queue_size);
- if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf)))
+ if (TIPC_SKB_CB(buf)->handle != 0)
buf_discard(buf);
else {
if ((sock->state == SS_CONNECTING) ||
@@ -321,7 +323,6 @@ static int release(struct socket *sock)
sock_put(sk);
sock->sk = NULL;
- atomic_dec(&tipc_user_count);
return res;
}
@@ -495,6 +496,8 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
return -EACCES;
+ if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
+ return -EMSGSIZE;
if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
return -EFAULT;
if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
@@ -911,15 +914,13 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
struct tipc_port *tport = tipc_sk_port(sk);
struct sk_buff *buf;
struct tipc_msg *msg;
+ long timeout;
unsigned int sz;
u32 err;
int res;
/* Catch invalid receive requests */
- if (m->msg_iovlen != 1)
- return -EOPNOTSUPP; /* Don't do multiple iovec entries yet */
-
if (unlikely(!buf_len))
return -EINVAL;
@@ -930,6 +931,7 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
goto exit;
}
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
restart:
/* Look for a message in receive queue; wait if necessary */
@@ -939,17 +941,15 @@ restart:
res = -ENOTCONN;
goto exit;
}
- if (flags & MSG_DONTWAIT) {
- res = -EWOULDBLOCK;
+ if (timeout <= 0L) {
+ res = timeout ? timeout : -EWOULDBLOCK;
goto exit;
}
release_sock(sk);
- res = wait_event_interruptible(*sk_sleep(sk),
- (!skb_queue_empty(&sk->sk_receive_queue) ||
- (sock->state == SS_DISCONNECTING)));
+ timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
+ tipc_rx_ready(sock),
+ timeout);
lock_sock(sk);
- if (res)
- goto exit;
}
/* Look at first message in receive queue */
@@ -991,11 +991,10 @@ restart:
sz = buf_len;
m->msg_flags |= MSG_TRUNC;
}
- if (unlikely(copy_to_user(m->msg_iov->iov_base, msg_data(msg),
- sz))) {
- res = -EFAULT;
+ res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
+ m->msg_iov, sz);
+ if (res)
goto exit;
- }
res = sz;
} else {
if ((sock->state == SS_READY) ||
@@ -1038,19 +1037,15 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
struct tipc_port *tport = tipc_sk_port(sk);
struct sk_buff *buf;
struct tipc_msg *msg;
+ long timeout;
unsigned int sz;
int sz_to_copy, target, needed;
int sz_copied = 0;
- char __user *crs = m->msg_iov->iov_base;
- unsigned char *buf_crs;
u32 err;
int res = 0;
/* Catch invalid receive attempts */
- if (m->msg_iovlen != 1)
- return -EOPNOTSUPP; /* Don't do multiple iovec entries yet */
-
if (unlikely(!buf_len))
return -EINVAL;
@@ -1063,7 +1058,7 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
}
target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
-
+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
restart:
/* Look for a message in receive queue; wait if necessary */
@@ -1073,17 +1068,15 @@ restart:
res = -ENOTCONN;
goto exit;
}
- if (flags & MSG_DONTWAIT) {
- res = -EWOULDBLOCK;
+ if (timeout <= 0L) {
+ res = timeout ? timeout : -EWOULDBLOCK;
goto exit;
}
release_sock(sk);
- res = wait_event_interruptible(*sk_sleep(sk),
- (!skb_queue_empty(&sk->sk_receive_queue) ||
- (sock->state == SS_DISCONNECTING)));
+ timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
+ tipc_rx_ready(sock),
+ timeout);
lock_sock(sk);
- if (res)
- goto exit;
}
/* Look at first message in receive queue */
@@ -1112,24 +1105,25 @@ restart:
/* Capture message data (if valid) & compute return value (always) */
if (!err) {
- buf_crs = (unsigned char *)(TIPC_SKB_CB(buf)->handle);
- sz = (unsigned char *)msg + msg_size(msg) - buf_crs;
+ u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
+ sz -= offset;
needed = (buf_len - sz_copied);
sz_to_copy = (sz <= needed) ? sz : needed;
- if (unlikely(copy_to_user(crs, buf_crs, sz_to_copy))) {
- res = -EFAULT;
+
+ res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
+ m->msg_iov, sz_to_copy);
+ if (res)
goto exit;
- }
+
sz_copied += sz_to_copy;
if (sz_to_copy < sz) {
if (!(flags & MSG_PEEK))
- TIPC_SKB_CB(buf)->handle = buf_crs + sz_to_copy;
+ TIPC_SKB_CB(buf)->handle =
+ (void *)(unsigned long)(offset + sz_to_copy);
goto exit;
}
-
- crs += sz_to_copy;
} else {
if (sz_copied != 0)
goto exit; /* can't add error msg to valid data */
@@ -1256,7 +1250,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
/* Enqueue message (finally!) */
- TIPC_SKB_CB(buf)->handle = msg_data(msg);
+ TIPC_SKB_CB(buf)->handle = 0;
atomic_inc(&tipc_queue_size);
__skb_queue_tail(&sk->sk_receive_queue, buf);
@@ -1608,7 +1602,7 @@ restart:
buf = __skb_dequeue(&sk->sk_receive_queue);
if (buf) {
atomic_dec(&tipc_queue_size);
- if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) {
+ if (TIPC_SKB_CB(buf)->handle != 0) {
buf_discard(buf);
goto restart;
}
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index ca04479c3d4..aae9eae1340 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -2,7 +2,7 @@
* net/tipc/subscr.c: TIPC network topology service
*
* Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005-2007, Wind River Systems
+ * Copyright (c) 2005-2007, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -160,7 +160,7 @@ void tipc_subscr_report_overlap(struct subscription *sub,
static void subscr_timeout(struct subscription *sub)
{
- struct port *server_port;
+ struct tipc_port *server_port;
/* Validate server port reference (in case subscriber is terminating) */
@@ -472,8 +472,6 @@ static void subscr_named_msg_event(void *usr_handle,
struct tipc_portid const *orig,
struct tipc_name_seq const *dest)
{
- static struct iovec msg_sect = {NULL, 0};
-
struct subscriber *subscriber;
u32 server_port_ref;
@@ -508,7 +506,7 @@ static void subscr_named_msg_event(void *usr_handle,
/* Lock server port (& save lock address for future use) */
- subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock;
+ subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;
/* Add subscriber to topology server's subscriber list */
@@ -523,7 +521,7 @@ static void subscr_named_msg_event(void *usr_handle,
/* Send an ACK- to complete connection handshaking */
- tipc_send(server_port_ref, 1, &msg_sect);
+ tipc_send(server_port_ref, 0, NULL);
/* Handle optional subscription request */
@@ -542,7 +540,6 @@ int tipc_subscr_start(void)
spin_lock_init(&topsrv.lock);
INIT_LIST_HEAD(&topsrv.subscriber_list);
- spin_lock_bh(&topsrv.lock);
res = tipc_createport(NULL,
TIPC_CRITICAL_IMPORTANCE,
NULL,
@@ -563,12 +560,10 @@ int tipc_subscr_start(void)
goto failed;
}
- spin_unlock_bh(&topsrv.lock);
return 0;
failed:
err("Failed to create subscription service\n");
- spin_unlock_bh(&topsrv.lock);
return res;
}