aboutsummaryrefslogtreecommitdiff
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c881
1 files changed, 447 insertions, 434 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 6d828d9eda4..55c6c9d3e1c 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -3,7 +3,7 @@
*
* Copyright (c) 2004-2006, Ericsson AB
* Copyright (c) 2004, Intel Corporation.
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -38,24 +38,15 @@
#include "core.h"
#include "link.h"
#include "port.h"
-#include "name_distr.h"
#include "bcast.h"
+#include "name_distr.h"
-#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
-
-#define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */
-
-#define BCLINK_LOG_BUF_SIZE 0
-
-/*
- * Loss rate for incoming broadcast frames; used to test retransmission code.
- * Set to N to cause every N'th frame to be discarded; 0 => don't discard any.
- */
-
-#define TIPC_BCAST_LOSS_RATE 0
+#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
+#define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */
+#define BCBEARER MAX_BEARERS
/**
- * struct bcbearer_pair - a pair of bearers used by broadcast link
+ * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
* @primary: pointer to primary bearer
* @secondary: pointer to secondary bearer
*
@@ -63,13 +54,13 @@
* to be paired.
*/
-struct bcbearer_pair {
- struct bearer *primary;
- struct bearer *secondary;
+struct tipc_bcbearer_pair {
+ struct tipc_bearer *primary;
+ struct tipc_bearer *secondary;
};
/**
- * struct bcbearer - bearer used by broadcast link
+ * struct tipc_bcbearer - bearer used by broadcast link
* @bearer: (non-standard) broadcast bearer structure
* @media: (non-standard) broadcast media structure
* @bpairs: array of bearer pairs
@@ -80,46 +71,76 @@ struct bcbearer_pair {
* Note: The fields labelled "temporary" are incorporated into the bearer
* to avoid consuming potentially limited stack space through the use of
* large local variables within multicast routines. Concurrent access is
- * prevented through use of the spinlock "bc_lock".
+ * prevented through use of the spinlock "bclink_lock".
*/
-
-struct bcbearer {
- struct bearer bearer;
- struct media media;
- struct bcbearer_pair bpairs[MAX_BEARERS];
- struct bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
+struct tipc_bcbearer {
+ struct tipc_bearer bearer;
+ struct tipc_media media;
+ struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
+ struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
struct tipc_node_map remains;
struct tipc_node_map remains_new;
};
/**
- * struct bclink - link used for broadcast messages
+ * struct tipc_bclink - link used for broadcast messages
+ * @lock: spinlock governing access to structure
* @link: (non-standard) broadcast link structure
* @node: (non-standard) node structure representing b'cast link's peer node
+ * @flags: represent bclink states
+ * @bcast_nodes: map of broadcast-capable nodes
+ * @retransmit_to: node that most recently requested a retransmit
*
* Handles sequence numbering, fragmentation, bundling, etc.
*/
-
-struct bclink {
- struct link link;
+struct tipc_bclink {
+ spinlock_t lock;
+ struct tipc_link link;
struct tipc_node node;
+ unsigned int flags;
+ struct tipc_node_map bcast_nodes;
+ struct tipc_node *retransmit_to;
};
-
-static struct bcbearer *bcbearer = NULL;
-static struct bclink *bclink = NULL;
-static struct link *bcl = NULL;
-static DEFINE_SPINLOCK(bc_lock);
+static struct tipc_bcbearer *bcbearer;
+static struct tipc_bclink *bclink;
+static struct tipc_link *bcl;
const char tipc_bclink_name[] = "broadcast-link";
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff);
+static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
+static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
-static u32 buf_seqno(struct sk_buff *buf)
+static void tipc_bclink_lock(void)
{
- return msg_seqno(buf_msg(buf));
+ spin_lock_bh(&bclink->lock);
+}
+
+static void tipc_bclink_unlock(void)
+{
+ struct tipc_node *node = NULL;
+
+ if (likely(!bclink->flags)) {
+ spin_unlock_bh(&bclink->lock);
+ return;
+ }
+
+ if (bclink->flags & TIPC_BCLINK_RESET) {
+ bclink->flags &= ~TIPC_BCLINK_RESET;
+ node = tipc_bclink_retransmit_to();
+ }
+ spin_unlock_bh(&bclink->lock);
+
+ if (node)
+ tipc_link_reset_all(node);
+}
+
+void tipc_bclink_set_flags(unsigned int flags)
+{
+ bclink->flags |= flags;
}
static u32 bcbuf_acks(struct sk_buff *buf)
@@ -137,6 +158,19 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}
+void tipc_bclink_add_node(u32 addr)
+{
+ tipc_bclink_lock();
+ tipc_nmap_add(&bclink->bcast_nodes, addr);
+ tipc_bclink_unlock();
+}
+
+void tipc_bclink_remove_node(u32 addr)
+{
+ tipc_bclink_lock();
+ tipc_nmap_remove(&bclink->bcast_nodes, addr);
+ tipc_bclink_unlock();
+}
static void bclink_set_last_sent(void)
{
@@ -151,54 +185,37 @@ u32 tipc_bclink_get_last_sent(void)
return bcl->fsm_msg_cnt;
}
-/**
- * bclink_set_gap - set gap according to contents of current deferred pkt queue
- *
- * Called with 'node' locked, bc_lock unlocked
- */
-
-static void bclink_set_gap(struct tipc_node *n_ptr)
+static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
{
- struct sk_buff *buf = n_ptr->bclink.deferred_head;
-
- n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
- mod(n_ptr->bclink.last_in);
- if (unlikely(buf != NULL))
- n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
+ node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
+ seqno : node->bclink.last_sent;
}
+
/**
- * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
+ * tipc_bclink_retransmit_to - get most recent node to request retransmission
*
- * This mechanism endeavours to prevent all nodes in network from trying
- * to ACK or NACK at the same time.
- *
- * Note: TIPC uses a different trigger to distribute ACKs than it does to
- * distribute NACKs, but tries to use the same spacing (divide by 16).
+ * Called with bclink_lock locked
*/
-
-static int bclink_ack_allowed(u32 n)
+struct tipc_node *tipc_bclink_retransmit_to(void)
{
- return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag;
+ return bclink->retransmit_to;
}
-
/**
* bclink_retransmit_pkt - retransmit broadcast packets
* @after: sequence number of last packet to *not* retransmit
* @to: sequence number of last packet to retransmit
*
- * Called with bc_lock locked
+ * Called with bclink_lock locked
*/
-
static void bclink_retransmit_pkt(u32 after, u32 to)
{
struct sk_buff *buf;
buf = bcl->first_out;
- while (buf && less_eq(buf_seqno(buf), after)) {
+ while (buf && less_eq(buf_seqno(buf), after))
buf = buf->next;
- }
tipc_link_retransmit(bcl, buf, mod(to - after));
}
@@ -207,36 +224,63 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
* @n_ptr: node that sent acknowledgement info
* @acked: broadcast sequence # that has been acknowledged
*
- * Node is locked, bc_lock unlocked.
+ * Node is locked, bclink_lock unlocked.
*/
-
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
{
struct sk_buff *crs;
struct sk_buff *next;
unsigned int released = 0;
- if (less_eq(acked, n_ptr->bclink.acked))
- return;
+ tipc_bclink_lock();
+ /* Bail out if tx queue is empty (no clean up is required) */
+ crs = bcl->first_out;
+ if (!crs)
+ goto exit;
- spin_lock_bh(&bc_lock);
+ /* Determine which messages need to be acknowledged */
+ if (acked == INVALID_LINK_SEQ) {
+ /*
+ * Contact with specified node has been lost, so need to
+ * acknowledge sent messages only (if other nodes still exist)
+ * or both sent and unsent messages (otherwise)
+ */
+ if (bclink->bcast_nodes.count)
+ acked = bcl->fsm_msg_cnt;
+ else
+ acked = bcl->next_out_no;
+ } else {
+ /*
+ * Bail out if specified sequence number does not correspond
+ * to a message that has been sent and not yet acknowledged
+ */
+ if (less(acked, buf_seqno(crs)) ||
+ less(bcl->fsm_msg_cnt, acked) ||
+ less_eq(acked, n_ptr->bclink.acked))
+ goto exit;
+ }
/* Skip over packets that node has previously acknowledged */
-
- crs = bcl->first_out;
- while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) {
+ while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked))
crs = crs->next;
- }
/* Update packets that node is now acknowledging */
while (crs && less_eq(buf_seqno(crs), acked)) {
next = crs->next;
- bcbuf_decr_acks(crs);
+
+ if (crs != bcl->next_out)
+ bcbuf_decr_acks(crs);
+ else {
+ bcbuf_set_acks(crs, 0);
+ bcl->next_out = next;
+ bclink_set_last_sent();
+ }
+
if (bcbuf_acks(crs) == 0) {
bcl->first_out = next;
bcl->out_queue_size--;
- buf_discard(crs);
+ kfree_skb(crs);
released = 1;
}
crs = next;
@@ -251,297 +295,305 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
}
if (unlikely(released && !list_empty(&bcl->waiting_ports)))
tipc_link_wakeup_ports(bcl, 0);
- spin_unlock_bh(&bc_lock);
+exit:
+ tipc_bclink_unlock();
}
/**
- * bclink_send_ack - unicast an ACK msg
+ * tipc_bclink_update_link_state - update broadcast link state
*
- * tipc_net_lock and node lock set
+ * RCU and node lock set
*/
-
-static void bclink_send_ack(struct tipc_node *n_ptr)
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
{
- struct link *l_ptr = n_ptr->active_links[n_ptr->addr & 1];
+ struct sk_buff *buf;
- if (l_ptr != NULL)
- tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
-}
+ /* Ignore "stale" link state info */
-/**
- * bclink_send_nack- broadcast a NACK msg
- *
- * tipc_net_lock and node lock set
- */
+ if (less_eq(last_sent, n_ptr->bclink.last_in))
+ return;
-static void bclink_send_nack(struct tipc_node *n_ptr)
-{
- struct sk_buff *buf;
- struct tipc_msg *msg;
+ /* Update link synchronization state; quit if in sync */
- if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to))
- return;
+ bclink_update_last_sent(n_ptr, last_sent);
- buf = tipc_buf_acquire(INT_H_SIZE);
- if (buf) {
- msg = buf_msg(buf);
- tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
- INT_H_SIZE, n_ptr->addr);
- msg_set_mc_netid(msg, tipc_net_id);
- msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
- msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
- msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
- msg_set_bcast_tag(msg, tipc_own_tag);
-
- if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) {
- bcl->stats.sent_nacks++;
- buf_discard(buf);
- } else {
- tipc_bearer_schedule(bcl->b_ptr, bcl);
- bcl->proto_msg_queue = buf;
- bcl->stats.bearer_congs++;
- }
+ if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
+ return;
- /*
- * Ensure we doesn't send another NACK msg to the node
- * until 16 more deferred messages arrive from it
- * (i.e. helps prevent all nodes from NACK'ing at same time)
- */
+ /* Update out-of-sync state; quit if loss is still unconfirmed */
- n_ptr->bclink.nack_sync = tipc_own_tag;
+ if ((++n_ptr->bclink.oos_state) == 1) {
+ if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
+ return;
+ n_ptr->bclink.oos_state++;
}
-}
-/**
- * tipc_bclink_check_gap - send a NACK if a sequence gap exists
- *
- * tipc_net_lock and node lock set
- */
+ /* Don't NACK if one has been recently sent (or seen) */
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
-{
- if (!n_ptr->bclink.supported ||
- less_eq(last_sent, mod(n_ptr->bclink.last_in)))
+ if (n_ptr->bclink.oos_state & 0x1)
return;
- bclink_set_gap(n_ptr);
- if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
- n_ptr->bclink.gap_to = last_sent;
- bclink_send_nack(n_ptr);
+ /* Send NACK */
+
+ buf = tipc_buf_acquire(INT_H_SIZE);
+ if (buf) {
+ struct tipc_msg *msg = buf_msg(buf);
+
+ tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
+ INT_H_SIZE, n_ptr->addr);
+ msg_set_non_seq(msg, 1);
+ msg_set_mc_netid(msg, tipc_net_id);
+ msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
+ msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
+ msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
+ ? buf_seqno(n_ptr->bclink.deferred_head) - 1
+ : n_ptr->bclink.last_sent);
+
+ tipc_bclink_lock();
+ tipc_bearer_send(MAX_BEARERS, buf, NULL);
+ bcl->stats.sent_nacks++;
+ tipc_bclink_unlock();
+ kfree_skb(buf);
+
+ n_ptr->bclink.oos_state++;
+ }
}
/**
- * tipc_bclink_peek_nack - process a NACK msg meant for another node
+ * bclink_peek_nack - monitor retransmission requests sent by other nodes
*
- * Only tipc_net_lock set.
+ * Delay any upcoming NACK by this node if another node has already
+ * requested the first message this node is going to ask for.
*/
-
-static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to)
+static void bclink_peek_nack(struct tipc_msg *msg)
{
- struct tipc_node *n_ptr = tipc_node_find(dest);
- u32 my_after, my_to;
+ struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
- if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr)))
+ if (unlikely(!n_ptr))
return;
+
tipc_node_lock(n_ptr);
- /*
- * Modify gap to suppress unnecessary NACKs from this node
- */
- my_after = n_ptr->bclink.gap_after;
- my_to = n_ptr->bclink.gap_to;
-
- if (less_eq(gap_after, my_after)) {
- if (less(my_after, gap_to) && less(gap_to, my_to))
- n_ptr->bclink.gap_after = gap_to;
- else if (less_eq(my_to, gap_to))
- n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
- } else if (less_eq(gap_after, my_to)) {
- if (less_eq(my_to, gap_to))
- n_ptr->bclink.gap_to = gap_after;
- } else {
- /*
- * Expand gap if missing bufs not in deferred queue:
- */
- struct sk_buff *buf = n_ptr->bclink.deferred_head;
- u32 prev = n_ptr->bclink.gap_to;
- for (; buf; buf = buf->next) {
- u32 seqno = buf_seqno(buf);
+ if (n_ptr->bclink.recv_permitted &&
+ (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
+ (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
+ n_ptr->bclink.oos_state = 2;
- if (mod(seqno - prev) != 1) {
- buf = NULL;
- break;
- }
- if (seqno == gap_after)
- break;
- prev = seqno;
- }
- if (buf == NULL)
- n_ptr->bclink.gap_to = gap_after;
- }
- /*
- * Some nodes may send a complementary NACK now:
- */
- if (bclink_ack_allowed(sender_tag + 1)) {
- if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
- bclink_send_nack(n_ptr);
- bclink_set_gap(n_ptr);
- }
- }
tipc_node_unlock(n_ptr);
}
-/**
- * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
+/*
+ * tipc_bclink_xmit - broadcast a packet to all nodes in cluster
*/
-
-int tipc_bclink_send_msg(struct sk_buff *buf)
+int tipc_bclink_xmit(struct sk_buff *buf)
{
int res;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
- res = tipc_link_send_buf(bcl, buf);
- if (unlikely(res == -ELINKCONG))
- buf_discard(buf);
- else
- bclink_set_last_sent();
-
- if (bcl->out_queue_size > bcl->stats.max_queue_sz)
- bcl->stats.max_queue_sz = bcl->out_queue_size;
- bcl->stats.queue_sz_counts++;
- bcl->stats.accu_queue_sz += bcl->out_queue_size;
+ if (!bclink->bcast_nodes.count) {
+ res = msg_data_sz(buf_msg(buf));
+ kfree_skb(buf);
+ goto exit;
+ }
- spin_unlock_bh(&bc_lock);
+ res = __tipc_link_xmit(bcl, buf);
+ if (likely(res >= 0)) {
+ bclink_set_last_sent();
+ bcl->stats.queue_sz_counts++;
+ bcl->stats.accu_queue_sz += bcl->out_queue_size;
+ }
+exit:
+ tipc_bclink_unlock();
return res;
}
/**
- * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
+ * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
- * tipc_net_lock is read_locked, no other locks set
+ * Called with both sending node's lock and bclink_lock taken.
*/
+static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
+{
+ bclink_update_last_sent(node, seqno);
+ node->bclink.last_in = seqno;
+ node->bclink.oos_state = 0;
+ bcl->stats.recv_info++;
-void tipc_bclink_recv_pkt(struct sk_buff *buf)
+ /*
+ * Unicast an ACK periodically, ensuring that
+ * all nodes in the cluster don't ACK at the same time
+ */
+
+ if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
+ tipc_link_proto_xmit(node->active_links[node->addr & 1],
+ STATE_MSG, 0, 0, 0, 0, 0);
+ bcl->stats.sent_acks++;
+ }
+}
+
+/**
+ * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
+ *
+ * RCU is locked, no other locks set
+ */
+void tipc_bclink_rcv(struct sk_buff *buf)
{
-#if (TIPC_BCAST_LOSS_RATE)
- static int rx_count = 0;
-#endif
struct tipc_msg *msg = buf_msg(buf);
- struct tipc_node* node = tipc_node_find(msg_prevnode(msg));
+ struct tipc_node *node;
u32 next_in;
u32 seqno;
- struct sk_buff *deferred;
+ int deferred;
- msg_dbg(msg, "<BC<<<");
+ /* Screen out unwanted broadcast messages */
- if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported ||
- (msg_mc_netid(msg) != tipc_net_id))) {
- buf_discard(buf);
- return;
- }
+ if (msg_mc_netid(msg) != tipc_net_id)
+ goto exit;
+
+ node = tipc_node_find(msg_prevnode(msg));
+ if (unlikely(!node))
+ goto exit;
+
+ tipc_node_lock(node);
+ if (unlikely(!node->bclink.recv_permitted))
+ goto unlock;
+
+ /* Handle broadcast protocol message */
if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
- msg_dbg(msg, "<BCNACK<<<");
+ if (msg_type(msg) != STATE_MSG)
+ goto unlock;
if (msg_destnode(msg) == tipc_own_addr) {
- tipc_node_lock(node);
tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
tipc_node_unlock(node);
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
bcl->stats.recv_nacks++;
- bcl->owner->next = node; /* remember requestor */
+ bclink->retransmit_to = node;
bclink_retransmit_pkt(msg_bcgap_after(msg),
msg_bcgap_to(msg));
- bcl->owner->next = NULL;
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
} else {
- tipc_bclink_peek_nack(msg_destnode(msg),
- msg_bcast_tag(msg),
- msg_bcgap_after(msg),
- msg_bcgap_to(msg));
+ tipc_node_unlock(node);
+ bclink_peek_nack(msg);
}
- buf_discard(buf);
- return;
+ goto exit;
}
-#if (TIPC_BCAST_LOSS_RATE)
- if (++rx_count == TIPC_BCAST_LOSS_RATE) {
- rx_count = 0;
- buf_discard(buf);
- return;
- }
-#endif
+ /* Handle in-sequence broadcast message */
- tipc_node_lock(node);
-receive:
- deferred = node->bclink.deferred_head;
- next_in = mod(node->bclink.last_in + 1);
seqno = msg_seqno(msg);
+ next_in = mod(node->bclink.last_in + 1);
if (likely(seqno == next_in)) {
- bcl->stats.recv_info++;
- node->bclink.last_in++;
- bclink_set_gap(node);
- if (unlikely(bclink_ack_allowed(seqno))) {
- bclink_send_ack(node);
- bcl->stats.sent_acks++;
- }
+receive:
+ /* Deliver message to destination */
+
if (likely(msg_isdata(msg))) {
+ tipc_bclink_lock();
+ bclink_accept_pkt(node, seqno);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
- tipc_port_recv_mcast(buf, NULL);
+ if (likely(msg_mcast(msg)))
+ tipc_port_mcast_rcv(buf, NULL);
+ else
+ kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) {
+ tipc_bclink_lock();
+ bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
- tipc_link_recv_bundle(buf);
+ tipc_link_bundle_rcv(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
+ tipc_buf_append(&node->bclink.reasm_buf, &buf);
+ if (unlikely(!buf && !node->bclink.reasm_buf))
+ goto unlock;
+ tipc_bclink_lock();
+ bclink_accept_pkt(node, seqno);
bcl->stats.recv_fragments++;
- if (tipc_link_recv_fragment(&node->bclink.defragm,
- &buf, &msg))
+ if (buf) {
bcl->stats.recv_fragmented++;
+ msg = buf_msg(buf);
+ tipc_bclink_unlock();
+ goto receive;
+ }
+ tipc_bclink_unlock();
tipc_node_unlock(node);
- tipc_net_route_msg(buf);
+ } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
+ tipc_bclink_lock();
+ bclink_accept_pkt(node, seqno);
+ tipc_bclink_unlock();
+ tipc_node_unlock(node);
+ tipc_named_rcv(buf);
} else {
+ tipc_bclink_lock();
+ bclink_accept_pkt(node, seqno);
+ tipc_bclink_unlock();
tipc_node_unlock(node);
- tipc_net_route_msg(buf);
- }
- if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) {
- tipc_node_lock(node);
- buf = deferred;
- msg = buf_msg(buf);
- node->bclink.deferred_head = deferred->next;
- goto receive;
- }
- return;
- } else if (less(next_in, seqno)) {
- u32 gap_after = node->bclink.gap_after;
- u32 gap_to = node->bclink.gap_to;
-
- if (tipc_link_defer_pkt(&node->bclink.deferred_head,
- &node->bclink.deferred_tail,
- buf)) {
- node->bclink.nack_sync++;
- bcl->stats.deferred_recv++;
- if (seqno == mod(gap_after + 1))
- node->bclink.gap_after = seqno;
- else if (less(gap_after, seqno) && less(seqno, gap_to))
- node->bclink.gap_to = seqno;
+ kfree_skb(buf);
}
- if (bclink_ack_allowed(node->bclink.nack_sync)) {
- if (gap_to != gap_after)
- bclink_send_nack(node);
- bclink_set_gap(node);
+ buf = NULL;
+
+ /* Determine new synchronization state */
+
+ tipc_node_lock(node);
+ if (unlikely(!tipc_node_is_up(node)))
+ goto unlock;
+
+ if (node->bclink.last_in == node->bclink.last_sent)
+ goto unlock;
+
+ if (!node->bclink.deferred_head) {
+ node->bclink.oos_state = 1;
+ goto unlock;
}
- } else {
- bcl->stats.duplicates++;
- buf_discard(buf);
+
+ msg = buf_msg(node->bclink.deferred_head);
+ seqno = msg_seqno(msg);
+ next_in = mod(next_in + 1);
+ if (seqno != next_in)
+ goto unlock;
+
+ /* Take in-sequence message from deferred queue & deliver it */
+
+ buf = node->bclink.deferred_head;
+ node->bclink.deferred_head = buf->next;
+ buf->next = NULL;
+ node->bclink.deferred_size--;
+ goto receive;
}
+
+ /* Handle out-of-sequence broadcast message */
+
+ if (less(next_in, seqno)) {
+ deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
+ &node->bclink.deferred_tail,
+ buf);
+ node->bclink.deferred_size += deferred;
+ bclink_update_last_sent(node, seqno);
+ buf = NULL;
+ } else
+ deferred = 0;
+
+ tipc_bclink_lock();
+
+ if (deferred)
+ bcl->stats.deferred_recv++;
+ else
+ bcl->stats.duplicates++;
+
+ tipc_bclink_unlock();
+
+unlock:
tipc_node_unlock(node);
+exit:
+ kfree_skb(buf);
}
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
{
- return (n_ptr->bclink.supported &&
+ return (n_ptr->bclink.recv_permitted &&
(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
}
@@ -549,98 +601,106 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
/**
* tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
*
- * Send through as many bearers as necessary to reach all nodes
- * that support TIPC multicasting.
+ * Send packet over as many bearers as necessary to reach all nodes
+ * that have joined the broadcast link.
*
- * Returns 0 if packet sent successfully, non-zero if not
+ * Returns 0 (packet sent successfully) under all circumstances,
+ * since the broadcast link's pseudo-bearer never blocks
*/
-
-static int tipc_bcbearer_send(struct sk_buff *buf,
- struct tipc_bearer *unused1,
+static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
struct tipc_media_addr *unused2)
{
int bp_index;
- /* Prepare buffer for broadcasting (if first time trying to send it) */
-
+ /* Prepare broadcast link message for reliable transmission,
+ * if first time trying to send it;
+ * preparation is skipped for broadcast link protocol messages
+ * since they are sent in an unreliable manner and don't need it
+ */
if (likely(!msg_non_seq(buf_msg(buf)))) {
struct tipc_msg *msg;
- assert(tipc_cltr_bcast_nodes.count != 0);
- bcbuf_set_acks(buf, tipc_cltr_bcast_nodes.count);
+ bcbuf_set_acks(buf, bclink->bcast_nodes.count);
msg = buf_msg(buf);
msg_set_non_seq(msg, 1);
msg_set_mc_netid(msg, tipc_net_id);
bcl->stats.sent_info++;
+
+ if (WARN_ON(!bclink->bcast_nodes.count)) {
+ dump_stack();
+ return 0;
+ }
}
/* Send buffer over bearers until all targets reached */
-
- bcbearer->remains = tipc_cltr_bcast_nodes;
+ bcbearer->remains = bclink->bcast_nodes;
for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
- struct bearer *p = bcbearer->bpairs[bp_index].primary;
- struct bearer *s = bcbearer->bpairs[bp_index].secondary;
+ struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
+ struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
+ struct tipc_bearer *b = p;
+ struct sk_buff *tbuf;
if (!p)
- break; /* no more bearers to try */
+ break; /* No more bearers to try */
- tipc_nmap_diff(&bcbearer->remains, &p->nodes, &bcbearer->remains_new);
+ tipc_nmap_diff(&bcbearer->remains, &b->nodes,
+ &bcbearer->remains_new);
if (bcbearer->remains_new.count == bcbearer->remains.count)
- continue; /* bearer pair doesn't add anything */
-
- if (p->publ.blocked ||
- p->media->send_msg(buf, &p->publ, &p->media->bcast_addr)) {
- /* unable to send on primary bearer */
- if (!s || s->publ.blocked ||
- s->media->send_msg(buf, &s->publ,
- &s->media->bcast_addr)) {
- /* unable to send on either bearer */
- continue;
- }
+ continue; /* Nothing added by bearer pair */
+
+ if (bp_index == 0) {
+ /* Use original buffer for first bearer */
+ tipc_bearer_send(b->identity, buf, &b->bcast_addr);
+ } else {
+ /* Avoid concurrent buffer access */
+ tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
+ if (!tbuf)
+ break;
+ tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
+ kfree_skb(tbuf); /* Bearer keeps a clone */
}
+ /* Swap bearers for next packet */
if (s) {
bcbearer->bpairs[bp_index].primary = s;
bcbearer->bpairs[bp_index].secondary = p;
}
if (bcbearer->remains_new.count == 0)
- return 0;
+ break; /* All targets reached */
bcbearer->remains = bcbearer->remains_new;
}
- /*
- * Unable to reach all targets (indicate success, since currently
- * there isn't code in place to properly block & unblock the
- * pseudo-bearer used by the broadcast link)
- */
-
- return TIPC_OK;
+ return 0;
}
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/
-
-void tipc_bcbearer_sort(void)
+void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
{
- struct bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
- struct bcbearer_pair *bp_curr;
+ struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
+ struct tipc_bcbearer_pair *bp_curr;
+ struct tipc_bearer *b;
int b_index;
int pri;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
- /* Group bearers by priority (can assume max of two per priority) */
+ if (action)
+ tipc_nmap_add(nm_ptr, node);
+ else
+ tipc_nmap_remove(nm_ptr, node);
+ /* Group bearers by priority (can assume max of two per priority) */
memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
+ rcu_read_lock();
for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
- struct bearer *b = &tipc_bearers[b_index];
-
- if (!b->active || !b->nodes.count)
+ b = rcu_dereference_rtnl(bearer_list[b_index]);
+ if (!b || !b->nodes.count)
continue;
if (!bp_temp[b->priority].primary)
@@ -648,9 +708,9 @@ void tipc_bcbearer_sort(void)
else
bp_temp[b->priority].secondary = b;
}
+ rcu_read_unlock();
/* Create array of bearer pairs for broadcasting */
-
bp_curr = bcbearer->bpairs;
memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
@@ -674,75 +734,49 @@ void tipc_bcbearer_sort(void)
bp_curr++;
}
- spin_unlock_bh(&bc_lock);
-}
-
-/**
- * tipc_bcbearer_push - resolve bearer congestion
- *
- * Forces bclink to push out any unsent packets, until all packets are gone
- * or congestion reoccurs.
- * No locks set when function called
- */
-
-void tipc_bcbearer_push(void)
-{
- struct bearer *b_ptr;
-
- spin_lock_bh(&bc_lock);
- b_ptr = &bcbearer->bearer;
- if (b_ptr->publ.blocked) {
- b_ptr->publ.blocked = 0;
- tipc_bearer_lock_push(b_ptr);
- }
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
}
int tipc_bclink_stats(char *buf, const u32 buf_size)
{
- struct print_buf pb;
+ int ret;
+ struct tipc_stats *s;
if (!bcl)
return 0;
- tipc_printbuf_init(&pb, buf, buf_size);
-
- spin_lock_bh(&bc_lock);
-
- tipc_printf(&pb, "Link <%s>\n"
- " Window:%u packets\n",
- bcl->name, bcl->queue_limit[0]);
- tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n",
- bcl->stats.recv_info,
- bcl->stats.recv_fragments,
- bcl->stats.recv_fragmented,
- bcl->stats.recv_bundles,
- bcl->stats.recv_bundled);
- tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n",
- bcl->stats.sent_info,
- bcl->stats.sent_fragments,
- bcl->stats.sent_fragmented,
- bcl->stats.sent_bundles,
- bcl->stats.sent_bundled);
- tipc_printf(&pb, " RX naks:%u defs:%u dups:%u\n",
- bcl->stats.recv_nacks,
- bcl->stats.deferred_recv,
- bcl->stats.duplicates);
- tipc_printf(&pb, " TX naks:%u acks:%u dups:%u\n",
- bcl->stats.sent_nacks,
- bcl->stats.sent_acks,
- bcl->stats.retransmitted);
- tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n",
- bcl->stats.bearer_congs,
- bcl->stats.link_congs,
- bcl->stats.max_queue_sz,
- bcl->stats.queue_sz_counts
- ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts)
- : 0);
-
- spin_unlock_bh(&bc_lock);
- return tipc_printbuf_validate(&pb);
+ tipc_bclink_lock();
+
+ s = &bcl->stats;
+
+ ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
+ " Window:%u packets\n",
+ bcl->name, bcl->queue_limit[0]);
+ ret += tipc_snprintf(buf + ret, buf_size - ret,
+ " RX packets:%u fragments:%u/%u bundles:%u/%u\n",
+ s->recv_info, s->recv_fragments,
+ s->recv_fragmented, s->recv_bundles,
+ s->recv_bundled);
+ ret += tipc_snprintf(buf + ret, buf_size - ret,
+ " TX packets:%u fragments:%u/%u bundles:%u/%u\n",
+ s->sent_info, s->sent_fragments,
+ s->sent_fragmented, s->sent_bundles,
+ s->sent_bundled);
+ ret += tipc_snprintf(buf + ret, buf_size - ret,
+ " RX naks:%u defs:%u dups:%u\n",
+ s->recv_nacks, s->deferred_recv, s->duplicates);
+ ret += tipc_snprintf(buf + ret, buf_size - ret,
+ " TX naks:%u acks:%u dups:%u\n",
+ s->sent_nacks, s->sent_acks, s->retransmitted);
+ ret += tipc_snprintf(buf + ret, buf_size - ret,
+ " Congestion link:%u Send queue max:%u avg:%u\n",
+ s->link_congs, s->max_queue_sz,
+ s->queue_sz_counts ?
+ (s->accu_queue_sz / s->queue_sz_counts) : 0);
+
+ tipc_bclink_unlock();
+ return ret;
}
int tipc_bclink_reset_stats(void)
@@ -750,9 +784,9 @@ int tipc_bclink_reset_stats(void)
if (!bcl)
return -ENOPROTOOPT;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
memset(&bcl->stats, 0, sizeof(bcl->stats));
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
return 0;
}
@@ -763,75 +797,59 @@ int tipc_bclink_set_queue_limits(u32 limit)
if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
return -EINVAL;
- spin_lock_bh(&bc_lock);
+ tipc_bclink_lock();
tipc_link_set_queue_limits(bcl, limit);
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_unlock();
return 0;
}
int tipc_bclink_init(void)
{
bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
+ if (!bcbearer)
+ return -ENOMEM;
+
bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
- if (!bcbearer || !bclink) {
- nomem:
- warn("Multicast link creation failed, no memory\n");
+ if (!bclink) {
kfree(bcbearer);
- bcbearer = NULL;
- kfree(bclink);
- bclink = NULL;
return -ENOMEM;
}
- INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
+ bcl = &bclink->link;
bcbearer->bearer.media = &bcbearer->media;
bcbearer->media.send_msg = tipc_bcbearer_send;
- sprintf(bcbearer->media.name, "tipc-multicast");
+ sprintf(bcbearer->media.name, "tipc-broadcast");
- bcl = &bclink->link;
+ spin_lock_init(&bclink->lock);
INIT_LIST_HEAD(&bcl->waiting_ports);
bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock);
bcl->owner = &bclink->node;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
- bcl->b_ptr = &bcbearer->bearer;
+ bcl->bearer_id = MAX_BEARERS;
+ rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
bcl->state = WORKING_WORKING;
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
-
- if (BCLINK_LOG_BUF_SIZE) {
- char *pb = kmalloc(BCLINK_LOG_BUF_SIZE, GFP_ATOMIC);
-
- if (!pb)
- goto nomem;
- tipc_printbuf_init(&bcl->print_buf, pb, BCLINK_LOG_BUF_SIZE);
- }
-
return 0;
}
void tipc_bclink_stop(void)
{
- spin_lock_bh(&bc_lock);
- if (bcbearer) {
- tipc_link_stop(bcl);
- if (BCLINK_LOG_BUF_SIZE)
- kfree(bcl->print_buf.buf);
- bcl = NULL;
- kfree(bclink);
- bclink = NULL;
- kfree(bcbearer);
- bcbearer = NULL;
- }
- spin_unlock_bh(&bc_lock);
+ tipc_bclink_lock();
+ tipc_link_purge_queues(bcl);
+ tipc_bclink_unlock();
+
+ RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
+ synchronize_net();
+ kfree(bcbearer);
+ kfree(bclink);
}
-
/**
* tipc_nmap_add - add a node to a node map
*/
-
-void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
+static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
@@ -846,8 +864,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
/**
* tipc_nmap_remove - remove a node from a node map
*/
-
-void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
+static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
{
int n = tipc_node(node);
int w = n / WSIZE;
@@ -865,7 +882,6 @@ void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
* @nm_b: input node map B
* @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
*/
-
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
struct tipc_node_map *nm_b,
struct tipc_node_map *nm_diff)
@@ -891,10 +907,9 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a,
/**
* tipc_port_list_add - add a port to a port list, ensuring no duplicates
*/
-
-void tipc_port_list_add(struct port_list *pl_ptr, u32 port)
+void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)
{
- struct port_list *item = pl_ptr;
+ struct tipc_port_list *item = pl_ptr;
int i;
int item_sz = PLSIZE;
int cnt = pl_ptr->count;
@@ -913,7 +928,7 @@ void tipc_port_list_add(struct port_list *pl_ptr, u32 port)
if (!item->next) {
item->next = kmalloc(sizeof(*item), GFP_ATOMIC);
if (!item->next) {
- warn("Incomplete multicast delivery, no memory\n");
+ pr_warn("Incomplete multicast delivery, no memory\n");
return;
}
item->next->next = NULL;
@@ -925,15 +940,13 @@ void tipc_port_list_add(struct port_list *pl_ptr, u32 port)
* tipc_port_list_free - free dynamically created entries in port_list chain
*
*/
-
-void tipc_port_list_free(struct port_list *pl_ptr)
+void tipc_port_list_free(struct tipc_port_list *pl_ptr)
{
- struct port_list *item;
- struct port_list *next;
+ struct tipc_port_list *item;
+ struct tipc_port_list *next;
for (item = pl_ptr->next; item; item = next) {
next = item->next;
kfree(item);
}
}
-