diff options
Diffstat (limited to 'net/tipc/bcast.c')
| -rw-r--r-- | net/tipc/bcast.c | 881 | 
1 files changed, 447 insertions, 434 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 6d828d9eda4..55c6c9d3e1c 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -3,7 +3,7 @@   *   * Copyright (c) 2004-2006, Ericsson AB   * Copyright (c) 2004, Intel Corporation. - * Copyright (c) 2005, Wind River Systems + * Copyright (c) 2005, 2010-2011, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -38,24 +38,15 @@  #include "core.h"  #include "link.h"  #include "port.h" -#include "name_distr.h"  #include "bcast.h" +#include "name_distr.h" -#define MAX_PKT_DEFAULT_MCAST 1500	/* bcast link max packet size (fixed) */ - -#define BCLINK_WIN_DEFAULT 20		/* bcast link window size (default) */ - -#define BCLINK_LOG_BUF_SIZE 0 - -/* - * Loss rate for incoming broadcast frames; used to test retransmission code. - * Set to N to cause every N'th frame to be discarded; 0 => don't discard any. - */ - -#define TIPC_BCAST_LOSS_RATE 0 +#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */ +#define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */ +#define	BCBEARER		MAX_BEARERS  /** - * struct bcbearer_pair - a pair of bearers used by broadcast link + * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link   * @primary: pointer to primary bearer   * @secondary: pointer to secondary bearer   * @@ -63,13 +54,13 @@   * to be paired.   */ -struct bcbearer_pair { -	struct bearer *primary; -	struct bearer *secondary; +struct tipc_bcbearer_pair { +	struct tipc_bearer *primary; +	struct tipc_bearer *secondary;  };  /** - * struct bcbearer - bearer used by broadcast link + * struct tipc_bcbearer - bearer used by broadcast link   * @bearer: (non-standard) broadcast bearer structure   * @media: (non-standard) broadcast media structure   * @bpairs: array of bearer pairs @@ -80,46 +71,76 @@ struct bcbearer_pair {   * Note: The fields labelled "temporary" are incorporated into the bearer   * to avoid consuming potentially limited stack space through the use of   * large local variables within multicast routines.  Concurrent access is - * prevented through use of the spinlock "bc_lock". + * prevented through use of the spinlock "bclink_lock".   */ - -struct bcbearer { -	struct bearer bearer; -	struct media media; -	struct bcbearer_pair bpairs[MAX_BEARERS]; -	struct bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1]; +struct tipc_bcbearer { +	struct tipc_bearer bearer; +	struct tipc_media media; +	struct tipc_bcbearer_pair bpairs[MAX_BEARERS]; +	struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];  	struct tipc_node_map remains;  	struct tipc_node_map remains_new;  };  /** - * struct bclink - link used for broadcast messages + * struct tipc_bclink - link used for broadcast messages + * @lock: spinlock governing access to structure   * @link: (non-standard) broadcast link structure   * @node: (non-standard) node structure representing b'cast link's peer node + * @flags: represent bclink states + * @bcast_nodes: map of broadcast-capable nodes + * @retransmit_to: node that most recently requested a retransmit   *   * Handles sequence numbering, fragmentation, bundling, etc.   */ - -struct bclink { -	struct link link; +struct tipc_bclink { +	spinlock_t lock; +	struct tipc_link link;  	struct tipc_node node; +	unsigned int flags; +	struct tipc_node_map bcast_nodes; +	struct tipc_node *retransmit_to;  }; - -static struct bcbearer *bcbearer = NULL; -static struct bclink *bclink = NULL; -static struct link *bcl = NULL; -static DEFINE_SPINLOCK(bc_lock); +static struct tipc_bcbearer *bcbearer; +static struct tipc_bclink *bclink; +static struct tipc_link *bcl;  const char tipc_bclink_name[] = "broadcast-link";  static void tipc_nmap_diff(struct tipc_node_map *nm_a,  			   struct tipc_node_map *nm_b,  			   struct tipc_node_map *nm_diff); +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); -static u32 buf_seqno(struct sk_buff *buf) +static void tipc_bclink_lock(void)  { -	return msg_seqno(buf_msg(buf)); +	spin_lock_bh(&bclink->lock); +} + +static void tipc_bclink_unlock(void) +{ +	struct tipc_node *node = NULL; + +	if (likely(!bclink->flags)) { +		spin_unlock_bh(&bclink->lock); +		return; +	} + +	if (bclink->flags & TIPC_BCLINK_RESET) { +		bclink->flags &= ~TIPC_BCLINK_RESET; +		node = tipc_bclink_retransmit_to(); +	} +	spin_unlock_bh(&bclink->lock); + +	if (node) +		tipc_link_reset_all(node); +} + +void tipc_bclink_set_flags(unsigned int flags) +{ +	bclink->flags |= flags;  }  static u32 bcbuf_acks(struct sk_buff *buf) @@ -137,6 +158,19 @@ static void bcbuf_decr_acks(struct sk_buff *buf)  	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);  } +void tipc_bclink_add_node(u32 addr) +{ +	tipc_bclink_lock(); +	tipc_nmap_add(&bclink->bcast_nodes, addr); +	tipc_bclink_unlock(); +} + +void tipc_bclink_remove_node(u32 addr) +{ +	tipc_bclink_lock(); +	tipc_nmap_remove(&bclink->bcast_nodes, addr); +	tipc_bclink_unlock(); +}  static void bclink_set_last_sent(void)  { @@ -151,54 +185,37 @@ u32 tipc_bclink_get_last_sent(void)  	return bcl->fsm_msg_cnt;  } -/** - * bclink_set_gap - set gap according to contents of current deferred pkt queue - * - * Called with 'node' locked, bc_lock unlocked - */ - -static void bclink_set_gap(struct tipc_node *n_ptr) +static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)  { -	struct sk_buff *buf = n_ptr->bclink.deferred_head; - -	n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = -		mod(n_ptr->bclink.last_in); -	if (unlikely(buf != NULL)) -		n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); +	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? +						seqno : node->bclink.last_sent;  } +  /** - * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment + * tipc_bclink_retransmit_to - get most recent node to request retransmission   * - * This mechanism endeavours to prevent all nodes in network from trying - * to ACK or NACK at the same time. - * - * Note: TIPC uses a different trigger to distribute ACKs than it does to - *       distribute NACKs, but tries to use the same spacing (divide by 16). + * Called with bclink_lock locked   */ - -static int bclink_ack_allowed(u32 n) +struct tipc_node *tipc_bclink_retransmit_to(void)  { -	return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; +	return bclink->retransmit_to;  } -  /**   * bclink_retransmit_pkt - retransmit broadcast packets   * @after: sequence number of last packet to *not* retransmit   * @to: sequence number of last packet to retransmit   * - * Called with bc_lock locked + * Called with bclink_lock locked   */ -  static void bclink_retransmit_pkt(u32 after, u32 to)  {  	struct sk_buff *buf;  	buf = bcl->first_out; -	while (buf && less_eq(buf_seqno(buf), after)) { +	while (buf && less_eq(buf_seqno(buf), after))  		buf = buf->next; -	}  	tipc_link_retransmit(bcl, buf, mod(to - after));  } @@ -207,36 +224,63 @@ static void bclink_retransmit_pkt(u32 after, u32 to)   * @n_ptr: node that sent acknowledgement info   * @acked: broadcast sequence # that has been acknowledged   * - * Node is locked, bc_lock unlocked. + * Node is locked, bclink_lock unlocked.   */ -  void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  {  	struct sk_buff *crs;  	struct sk_buff *next;  	unsigned int released = 0; -	if (less_eq(acked, n_ptr->bclink.acked)) -		return; +	tipc_bclink_lock(); +	/* Bail out if tx queue is empty (no clean up is required) */ +	crs = bcl->first_out; +	if (!crs) +		goto exit; -	spin_lock_bh(&bc_lock); +	/* Determine which messages need to be acknowledged */ +	if (acked == INVALID_LINK_SEQ) { +		/* +		 * Contact with specified node has been lost, so need to +		 * acknowledge sent messages only (if other nodes still exist) +		 * or both sent and unsent messages (otherwise) +		 */ +		if (bclink->bcast_nodes.count) +			acked = bcl->fsm_msg_cnt; +		else +			acked = bcl->next_out_no; +	} else { +		/* +		 * Bail out if specified sequence number does not correspond +		 * to a message that has been sent and not yet acknowledged +		 */ +		if (less(acked, buf_seqno(crs)) || +		    less(bcl->fsm_msg_cnt, acked) || +		    less_eq(acked, n_ptr->bclink.acked)) +			goto exit; +	}  	/* Skip over packets that node has previously acknowledged */ - -	crs = bcl->first_out; -	while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) { +	while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked))  		crs = crs->next; -	}  	/* Update packets that node is now acknowledging */  	while (crs && less_eq(buf_seqno(crs), acked)) {  		next = crs->next; -		bcbuf_decr_acks(crs); + +		if (crs != bcl->next_out) +			bcbuf_decr_acks(crs); +		else { +			bcbuf_set_acks(crs, 0); +			bcl->next_out = next; +			bclink_set_last_sent(); +		} +  		if (bcbuf_acks(crs) == 0) {  			bcl->first_out = next;  			bcl->out_queue_size--; -			buf_discard(crs); +			kfree_skb(crs);  			released = 1;  		}  		crs = next; @@ -251,297 +295,305 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)  	}  	if (unlikely(released && !list_empty(&bcl->waiting_ports)))  		tipc_link_wakeup_ports(bcl, 0); -	spin_unlock_bh(&bc_lock); +exit: +	tipc_bclink_unlock();  }  /** - * bclink_send_ack - unicast an ACK msg + * tipc_bclink_update_link_state - update broadcast link state   * - * tipc_net_lock and node lock set + * RCU and node lock set   */ - -static void bclink_send_ack(struct tipc_node *n_ptr) +void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)  { -	struct link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; +	struct sk_buff *buf; -	if (l_ptr != NULL) -		tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); -} +	/* Ignore "stale" link state info */ -/** - * bclink_send_nack- broadcast a NACK msg - * - * tipc_net_lock and node lock set - */ +	if (less_eq(last_sent, n_ptr->bclink.last_in)) +		return; -static void bclink_send_nack(struct tipc_node *n_ptr) -{ -	struct sk_buff *buf; -	struct tipc_msg *msg; +	/* Update link synchronization state; quit if in sync */ -	if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) -		return; +	bclink_update_last_sent(n_ptr, last_sent); -	buf = tipc_buf_acquire(INT_H_SIZE); -	if (buf) { -		msg = buf_msg(buf); -		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, -			 INT_H_SIZE, n_ptr->addr); -		msg_set_mc_netid(msg, tipc_net_id); -		msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); -		msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); -		msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); -		msg_set_bcast_tag(msg, tipc_own_tag); - -		if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) { -			bcl->stats.sent_nacks++; -			buf_discard(buf); -		} else { -			tipc_bearer_schedule(bcl->b_ptr, bcl); -			bcl->proto_msg_queue = buf; -			bcl->stats.bearer_congs++; -		} +	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) +		return; -		/* -		 * Ensure we doesn't send another NACK msg to the node -		 * until 16 more deferred messages arrive from it -		 * (i.e. helps prevent all nodes from NACK'ing at same time) -		 */ +	/* Update out-of-sync state; quit if loss is still unconfirmed */ -		n_ptr->bclink.nack_sync = tipc_own_tag; +	if ((++n_ptr->bclink.oos_state) == 1) { +		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) +			return; +		n_ptr->bclink.oos_state++;  	} -} -/** - * tipc_bclink_check_gap - send a NACK if a sequence gap exists - * - * tipc_net_lock and node lock set - */ +	/* Don't NACK if one has been recently sent (or seen) */ -void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent) -{ -	if (!n_ptr->bclink.supported || -	    less_eq(last_sent, mod(n_ptr->bclink.last_in))) +	if (n_ptr->bclink.oos_state & 0x1)  		return; -	bclink_set_gap(n_ptr); -	if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) -		n_ptr->bclink.gap_to = last_sent; -	bclink_send_nack(n_ptr); +	/* Send NACK */ + +	buf = tipc_buf_acquire(INT_H_SIZE); +	if (buf) { +		struct tipc_msg *msg = buf_msg(buf); + +		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, +			      INT_H_SIZE, n_ptr->addr); +		msg_set_non_seq(msg, 1); +		msg_set_mc_netid(msg, tipc_net_id); +		msg_set_bcast_ack(msg, n_ptr->bclink.last_in); +		msg_set_bcgap_after(msg, n_ptr->bclink.last_in); +		msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head +				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1 +				 : n_ptr->bclink.last_sent); + +		tipc_bclink_lock(); +		tipc_bearer_send(MAX_BEARERS, buf, NULL); +		bcl->stats.sent_nacks++; +		tipc_bclink_unlock(); +		kfree_skb(buf); + +		n_ptr->bclink.oos_state++; +	}  }  /** - * tipc_bclink_peek_nack - process a NACK msg meant for another node + * bclink_peek_nack - monitor retransmission requests sent by other nodes   * - * Only tipc_net_lock set. + * Delay any upcoming NACK by this node if another node has already + * requested the first message this node is going to ask for.   */ - -static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) +static void bclink_peek_nack(struct tipc_msg *msg)  { -	struct tipc_node *n_ptr = tipc_node_find(dest); -	u32 my_after, my_to; +	struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg)); -	if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) +	if (unlikely(!n_ptr))  		return; +  	tipc_node_lock(n_ptr); -	/* -	 * Modify gap to suppress unnecessary NACKs from this node -	 */ -	my_after = n_ptr->bclink.gap_after; -	my_to = n_ptr->bclink.gap_to; - -	if (less_eq(gap_after, my_after)) { -		if (less(my_after, gap_to) && less(gap_to, my_to)) -			n_ptr->bclink.gap_after = gap_to; -		else if (less_eq(my_to, gap_to)) -			n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; -	} else if (less_eq(gap_after, my_to)) { -		if (less_eq(my_to, gap_to)) -			n_ptr->bclink.gap_to = gap_after; -	} else { -		/* -		 * Expand gap if missing bufs not in deferred queue: -		 */ -		struct sk_buff *buf = n_ptr->bclink.deferred_head; -		u32 prev = n_ptr->bclink.gap_to; -		for (; buf; buf = buf->next) { -			u32 seqno = buf_seqno(buf); +	if (n_ptr->bclink.recv_permitted && +	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && +	    (n_ptr->bclink.last_in == msg_bcgap_after(msg))) +		n_ptr->bclink.oos_state = 2; -			if (mod(seqno - prev) != 1) { -				buf = NULL; -				break; -			} -			if (seqno == gap_after) -				break; -			prev = seqno; -		} -		if (buf == NULL) -			n_ptr->bclink.gap_to = gap_after; -	} -	/* -	 * Some nodes may send a complementary NACK now: -	 */ -	if (bclink_ack_allowed(sender_tag + 1)) { -		if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { -			bclink_send_nack(n_ptr); -			bclink_set_gap(n_ptr); -		} -	}  	tipc_node_unlock(n_ptr);  } -/** - * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster +/* + * tipc_bclink_xmit - broadcast a packet to all nodes in cluster   */ - -int tipc_bclink_send_msg(struct sk_buff *buf) +int tipc_bclink_xmit(struct sk_buff *buf)  {  	int res; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock(); -	res = tipc_link_send_buf(bcl, buf); -	if (unlikely(res == -ELINKCONG)) -		buf_discard(buf); -	else -		bclink_set_last_sent(); - -	if (bcl->out_queue_size > bcl->stats.max_queue_sz) -		bcl->stats.max_queue_sz = bcl->out_queue_size; -	bcl->stats.queue_sz_counts++; -	bcl->stats.accu_queue_sz += bcl->out_queue_size; +	if (!bclink->bcast_nodes.count) { +		res = msg_data_sz(buf_msg(buf)); +		kfree_skb(buf); +		goto exit; +	} -	spin_unlock_bh(&bc_lock); +	res = __tipc_link_xmit(bcl, buf); +	if (likely(res >= 0)) { +		bclink_set_last_sent(); +		bcl->stats.queue_sz_counts++; +		bcl->stats.accu_queue_sz += bcl->out_queue_size; +	} +exit: +	tipc_bclink_unlock();  	return res;  }  /** - * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards + * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet   * - * tipc_net_lock is read_locked, no other locks set + * Called with both sending node's lock and bclink_lock taken.   */ +static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) +{ +	bclink_update_last_sent(node, seqno); +	node->bclink.last_in = seqno; +	node->bclink.oos_state = 0; +	bcl->stats.recv_info++; -void tipc_bclink_recv_pkt(struct sk_buff *buf) +	/* +	 * Unicast an ACK periodically, ensuring that +	 * all nodes in the cluster don't ACK at the same time +	 */ + +	if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { +		tipc_link_proto_xmit(node->active_links[node->addr & 1], +				     STATE_MSG, 0, 0, 0, 0, 0); +		bcl->stats.sent_acks++; +	} +} + +/** + * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards + * + * RCU is locked, no other locks set + */ +void tipc_bclink_rcv(struct sk_buff *buf)  { -#if (TIPC_BCAST_LOSS_RATE) -	static int rx_count = 0; -#endif  	struct tipc_msg *msg = buf_msg(buf); -	struct tipc_node* node = tipc_node_find(msg_prevnode(msg)); +	struct tipc_node *node;  	u32 next_in;  	u32 seqno; -	struct sk_buff *deferred; +	int deferred; -	msg_dbg(msg, "<BC<<<"); +	/* Screen out unwanted broadcast messages */ -	if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported || -		     (msg_mc_netid(msg) != tipc_net_id))) { -		buf_discard(buf); -		return; -	} +	if (msg_mc_netid(msg) != tipc_net_id) +		goto exit; + +	node = tipc_node_find(msg_prevnode(msg)); +	if (unlikely(!node)) +		goto exit; + +	tipc_node_lock(node); +	if (unlikely(!node->bclink.recv_permitted)) +		goto unlock; + +	/* Handle broadcast protocol message */  	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { -		msg_dbg(msg, "<BCNACK<<<"); +		if (msg_type(msg) != STATE_MSG) +			goto unlock;  		if (msg_destnode(msg) == tipc_own_addr) { -			tipc_node_lock(node);  			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));  			tipc_node_unlock(node); -			spin_lock_bh(&bc_lock); +			tipc_bclink_lock();  			bcl->stats.recv_nacks++; -			bcl->owner->next = node;   /* remember requestor */ +			bclink->retransmit_to = node;  			bclink_retransmit_pkt(msg_bcgap_after(msg),  					      msg_bcgap_to(msg)); -			bcl->owner->next = NULL; -			spin_unlock_bh(&bc_lock); +			tipc_bclink_unlock();  		} else { -			tipc_bclink_peek_nack(msg_destnode(msg), -					      msg_bcast_tag(msg), -					      msg_bcgap_after(msg), -					      msg_bcgap_to(msg)); +			tipc_node_unlock(node); +			bclink_peek_nack(msg);  		} -		buf_discard(buf); -		return; +		goto exit;  	} -#if (TIPC_BCAST_LOSS_RATE) -	if (++rx_count == TIPC_BCAST_LOSS_RATE) { -		rx_count = 0; -		buf_discard(buf); -		return; -	} -#endif +	/* Handle in-sequence broadcast message */ -	tipc_node_lock(node); -receive: -	deferred = node->bclink.deferred_head; -	next_in = mod(node->bclink.last_in + 1);  	seqno = msg_seqno(msg); +	next_in = mod(node->bclink.last_in + 1);  	if (likely(seqno == next_in)) { -		bcl->stats.recv_info++; -		node->bclink.last_in++; -		bclink_set_gap(node); -		if (unlikely(bclink_ack_allowed(seqno))) { -			bclink_send_ack(node); -			bcl->stats.sent_acks++; -		} +receive: +		/* Deliver message to destination */ +  		if (likely(msg_isdata(msg))) { +			tipc_bclink_lock(); +			bclink_accept_pkt(node, seqno); +			tipc_bclink_unlock();  			tipc_node_unlock(node); -			tipc_port_recv_mcast(buf, NULL); +			if (likely(msg_mcast(msg))) +				tipc_port_mcast_rcv(buf, NULL); +			else +				kfree_skb(buf);  		} else if (msg_user(msg) == MSG_BUNDLER) { +			tipc_bclink_lock(); +			bclink_accept_pkt(node, seqno);  			bcl->stats.recv_bundles++;  			bcl->stats.recv_bundled += msg_msgcnt(msg); +			tipc_bclink_unlock();  			tipc_node_unlock(node); -			tipc_link_recv_bundle(buf); +			tipc_link_bundle_rcv(buf);  		} else if (msg_user(msg) == MSG_FRAGMENTER) { +			tipc_buf_append(&node->bclink.reasm_buf, &buf); +			if (unlikely(!buf && !node->bclink.reasm_buf)) +				goto unlock; +			tipc_bclink_lock(); +			bclink_accept_pkt(node, seqno);  			bcl->stats.recv_fragments++; -			if (tipc_link_recv_fragment(&node->bclink.defragm, -						    &buf, &msg)) +			if (buf) {  				bcl->stats.recv_fragmented++; +				msg = buf_msg(buf); +				tipc_bclink_unlock(); +				goto receive; +			} +			tipc_bclink_unlock();  			tipc_node_unlock(node); -			tipc_net_route_msg(buf); +		} else if (msg_user(msg) == NAME_DISTRIBUTOR) { +			tipc_bclink_lock(); +			bclink_accept_pkt(node, seqno); +			tipc_bclink_unlock(); +			tipc_node_unlock(node); +			tipc_named_rcv(buf);  		} else { +			tipc_bclink_lock(); +			bclink_accept_pkt(node, seqno); +			tipc_bclink_unlock();  			tipc_node_unlock(node); -			tipc_net_route_msg(buf); -		} -		if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { -			tipc_node_lock(node); -			buf = deferred; -			msg = buf_msg(buf); -			node->bclink.deferred_head = deferred->next; -			goto receive; -		} -		return; -	} else if (less(next_in, seqno)) { -		u32 gap_after = node->bclink.gap_after; -		u32 gap_to = node->bclink.gap_to; - -		if (tipc_link_defer_pkt(&node->bclink.deferred_head, -					&node->bclink.deferred_tail, -					buf)) { -			node->bclink.nack_sync++; -			bcl->stats.deferred_recv++; -			if (seqno == mod(gap_after + 1)) -				node->bclink.gap_after = seqno; -			else if (less(gap_after, seqno) && less(seqno, gap_to)) -				node->bclink.gap_to = seqno; +			kfree_skb(buf);  		} -		if (bclink_ack_allowed(node->bclink.nack_sync)) { -			if (gap_to != gap_after) -				bclink_send_nack(node); -			bclink_set_gap(node); +		buf = NULL; + +		/* Determine new synchronization state */ + +		tipc_node_lock(node); +		if (unlikely(!tipc_node_is_up(node))) +			goto unlock; + +		if (node->bclink.last_in == node->bclink.last_sent) +			goto unlock; + +		if (!node->bclink.deferred_head) { +			node->bclink.oos_state = 1; +			goto unlock;  		} -	} else { -		bcl->stats.duplicates++; -		buf_discard(buf); + +		msg = buf_msg(node->bclink.deferred_head); +		seqno = msg_seqno(msg); +		next_in = mod(next_in + 1); +		if (seqno != next_in) +			goto unlock; + +		/* Take in-sequence message from deferred queue & deliver it */ + +		buf = node->bclink.deferred_head; +		node->bclink.deferred_head = buf->next; +		buf->next = NULL; +		node->bclink.deferred_size--; +		goto receive;  	} + +	/* Handle out-of-sequence broadcast message */ + +	if (less(next_in, seqno)) { +		deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, +					       &node->bclink.deferred_tail, +					       buf); +		node->bclink.deferred_size += deferred; +		bclink_update_last_sent(node, seqno); +		buf = NULL; +	} else +		deferred = 0; + +	tipc_bclink_lock(); + +	if (deferred) +		bcl->stats.deferred_recv++; +	else +		bcl->stats.duplicates++; + +	tipc_bclink_unlock(); + +unlock:  	tipc_node_unlock(node); +exit: +	kfree_skb(buf);  }  u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)  { -	return (n_ptr->bclink.supported && +	return (n_ptr->bclink.recv_permitted &&  		(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));  } @@ -549,98 +601,106 @@ u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)  /**   * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer   * - * Send through as many bearers as necessary to reach all nodes - * that support TIPC multicasting. + * Send packet over as many bearers as necessary to reach all nodes + * that have joined the broadcast link.   * - * Returns 0 if packet sent successfully, non-zero if not + * Returns 0 (packet sent successfully) under all circumstances, + * since the broadcast link's pseudo-bearer never blocks   */ - -static int tipc_bcbearer_send(struct sk_buff *buf, -			      struct tipc_bearer *unused1, +static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,  			      struct tipc_media_addr *unused2)  {  	int bp_index; -	/* Prepare buffer for broadcasting (if first time trying to send it) */ - +	/* Prepare broadcast link message for reliable transmission, +	 * if first time trying to send it; +	 * preparation is skipped for broadcast link protocol messages +	 * since they are sent in an unreliable manner and don't need it +	 */  	if (likely(!msg_non_seq(buf_msg(buf)))) {  		struct tipc_msg *msg; -		assert(tipc_cltr_bcast_nodes.count != 0); -		bcbuf_set_acks(buf, tipc_cltr_bcast_nodes.count); +		bcbuf_set_acks(buf, bclink->bcast_nodes.count);  		msg = buf_msg(buf);  		msg_set_non_seq(msg, 1);  		msg_set_mc_netid(msg, tipc_net_id);  		bcl->stats.sent_info++; + +		if (WARN_ON(!bclink->bcast_nodes.count)) { +			dump_stack(); +			return 0; +		}  	}  	/* Send buffer over bearers until all targets reached */ - -	bcbearer->remains = tipc_cltr_bcast_nodes; +	bcbearer->remains = bclink->bcast_nodes;  	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { -		struct bearer *p = bcbearer->bpairs[bp_index].primary; -		struct bearer *s = bcbearer->bpairs[bp_index].secondary; +		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; +		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; +		struct tipc_bearer *b = p; +		struct sk_buff *tbuf;  		if (!p) -			break;	/* no more bearers to try */ +			break; /* No more bearers to try */ -		tipc_nmap_diff(&bcbearer->remains, &p->nodes, &bcbearer->remains_new); +		tipc_nmap_diff(&bcbearer->remains, &b->nodes, +			       &bcbearer->remains_new);  		if (bcbearer->remains_new.count == bcbearer->remains.count) -			continue;	/* bearer pair doesn't add anything */ - -		if (p->publ.blocked || -		    p->media->send_msg(buf, &p->publ, &p->media->bcast_addr)) { -			/* unable to send on primary bearer */ -			if (!s || s->publ.blocked || -			    s->media->send_msg(buf, &s->publ, -					       &s->media->bcast_addr)) { -				/* unable to send on either bearer */ -				continue; -			} +			continue; /* Nothing added by bearer pair */ + +		if (bp_index == 0) { +			/* Use original buffer for first bearer */ +			tipc_bearer_send(b->identity, buf, &b->bcast_addr); +		} else { +			/* Avoid concurrent buffer access */ +			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); +			if (!tbuf) +				break; +			tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); +			kfree_skb(tbuf); /* Bearer keeps a clone */  		} +		/* Swap bearers for next packet */  		if (s) {  			bcbearer->bpairs[bp_index].primary = s;  			bcbearer->bpairs[bp_index].secondary = p;  		}  		if (bcbearer->remains_new.count == 0) -			return 0; +			break; /* All targets reached */  		bcbearer->remains = bcbearer->remains_new;  	} -	/* -	 * Unable to reach all targets (indicate success, since currently -	 * there isn't code in place to properly block & unblock the -	 * pseudo-bearer used by the broadcast link) -	 */ - -	return TIPC_OK; +	return 0;  }  /**   * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer   */ - -void tipc_bcbearer_sort(void) +void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)  { -	struct bcbearer_pair *bp_temp = bcbearer->bpairs_temp; -	struct bcbearer_pair *bp_curr; +	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; +	struct tipc_bcbearer_pair *bp_curr; +	struct tipc_bearer *b;  	int b_index;  	int pri; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock(); -	/* Group bearers by priority (can assume max of two per priority) */ +	if (action) +		tipc_nmap_add(nm_ptr, node); +	else +		tipc_nmap_remove(nm_ptr, node); +	/* Group bearers by priority (can assume max of two per priority) */  	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); +	rcu_read_lock();  	for (b_index = 0; b_index < MAX_BEARERS; b_index++) { -		struct bearer *b = &tipc_bearers[b_index]; - -		if (!b->active || !b->nodes.count) +		b = rcu_dereference_rtnl(bearer_list[b_index]); +		if (!b || !b->nodes.count)  			continue;  		if (!bp_temp[b->priority].primary) @@ -648,9 +708,9 @@ void tipc_bcbearer_sort(void)  		else  			bp_temp[b->priority].secondary = b;  	} +	rcu_read_unlock();  	/* Create array of bearer pairs for broadcasting */ -  	bp_curr = bcbearer->bpairs;  	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); @@ -674,75 +734,49 @@ void tipc_bcbearer_sort(void)  		bp_curr++;  	} -	spin_unlock_bh(&bc_lock); -} - -/** - * tipc_bcbearer_push - resolve bearer congestion - * - * Forces bclink to push out any unsent packets, until all packets are gone - * or congestion reoccurs. - * No locks set when function called - */ - -void tipc_bcbearer_push(void) -{ -	struct bearer *b_ptr; - -	spin_lock_bh(&bc_lock); -	b_ptr = &bcbearer->bearer; -	if (b_ptr->publ.blocked) { -		b_ptr->publ.blocked = 0; -		tipc_bearer_lock_push(b_ptr); -	} -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  }  int tipc_bclink_stats(char *buf, const u32 buf_size)  { -	struct print_buf pb; +	int ret; +	struct tipc_stats *s;  	if (!bcl)  		return 0; -	tipc_printbuf_init(&pb, buf, buf_size); - -	spin_lock_bh(&bc_lock); - -	tipc_printf(&pb, "Link <%s>\n" -			 "  Window:%u packets\n", -		    bcl->name, bcl->queue_limit[0]); -	tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n", -		    bcl->stats.recv_info, -		    bcl->stats.recv_fragments, -		    bcl->stats.recv_fragmented, -		    bcl->stats.recv_bundles, -		    bcl->stats.recv_bundled); -	tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n", -		    bcl->stats.sent_info, -		    bcl->stats.sent_fragments, -		    bcl->stats.sent_fragmented, -		    bcl->stats.sent_bundles, -		    bcl->stats.sent_bundled); -	tipc_printf(&pb, "  RX naks:%u defs:%u dups:%u\n", -		    bcl->stats.recv_nacks, -		    bcl->stats.deferred_recv, -		    bcl->stats.duplicates); -	tipc_printf(&pb, "  TX naks:%u acks:%u dups:%u\n", -		    bcl->stats.sent_nacks, -		    bcl->stats.sent_acks, -		    bcl->stats.retransmitted); -	tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n", -		    bcl->stats.bearer_congs, -		    bcl->stats.link_congs, -		    bcl->stats.max_queue_sz, -		    bcl->stats.queue_sz_counts -		    ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts) -		    : 0); - -	spin_unlock_bh(&bc_lock); -	return tipc_printbuf_validate(&pb); +	tipc_bclink_lock(); + +	s = &bcl->stats; + +	ret = tipc_snprintf(buf, buf_size, "Link <%s>\n" +			    "  Window:%u packets\n", +			    bcl->name, bcl->queue_limit[0]); +	ret += tipc_snprintf(buf + ret, buf_size - ret, +			     "  RX packets:%u fragments:%u/%u bundles:%u/%u\n", +			     s->recv_info, s->recv_fragments, +			     s->recv_fragmented, s->recv_bundles, +			     s->recv_bundled); +	ret += tipc_snprintf(buf + ret, buf_size - ret, +			     "  TX packets:%u fragments:%u/%u bundles:%u/%u\n", +			     s->sent_info, s->sent_fragments, +			     s->sent_fragmented, s->sent_bundles, +			     s->sent_bundled); +	ret += tipc_snprintf(buf + ret, buf_size - ret, +			     "  RX naks:%u defs:%u dups:%u\n", +			     s->recv_nacks, s->deferred_recv, s->duplicates); +	ret += tipc_snprintf(buf + ret, buf_size - ret, +			     "  TX naks:%u acks:%u dups:%u\n", +			     s->sent_nacks, s->sent_acks, s->retransmitted); +	ret += tipc_snprintf(buf + ret, buf_size - ret, +			     "  Congestion link:%u  Send queue max:%u avg:%u\n", +			     s->link_congs, s->max_queue_sz, +			     s->queue_sz_counts ? +			     (s->accu_queue_sz / s->queue_sz_counts) : 0); + +	tipc_bclink_unlock(); +	return ret;  }  int tipc_bclink_reset_stats(void) @@ -750,9 +784,9 @@ int tipc_bclink_reset_stats(void)  	if (!bcl)  		return -ENOPROTOOPT; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	memset(&bcl->stats, 0, sizeof(bcl->stats)); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  	return 0;  } @@ -763,75 +797,59 @@ int tipc_bclink_set_queue_limits(u32 limit)  	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))  		return -EINVAL; -	spin_lock_bh(&bc_lock); +	tipc_bclink_lock();  	tipc_link_set_queue_limits(bcl, limit); -	spin_unlock_bh(&bc_lock); +	tipc_bclink_unlock();  	return 0;  }  int tipc_bclink_init(void)  {  	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); +	if (!bcbearer) +		return -ENOMEM; +  	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); -	if (!bcbearer || !bclink) { - nomem: -		warn("Multicast link creation failed, no memory\n"); +	if (!bclink) {  		kfree(bcbearer); -		bcbearer = NULL; -		kfree(bclink); -		bclink = NULL;  		return -ENOMEM;  	} -	INIT_LIST_HEAD(&bcbearer->bearer.cong_links); +	bcl = &bclink->link;  	bcbearer->bearer.media = &bcbearer->media;  	bcbearer->media.send_msg = tipc_bcbearer_send; -	sprintf(bcbearer->media.name, "tipc-multicast"); +	sprintf(bcbearer->media.name, "tipc-broadcast"); -	bcl = &bclink->link; +	spin_lock_init(&bclink->lock);  	INIT_LIST_HEAD(&bcl->waiting_ports);  	bcl->next_out_no = 1;  	spin_lock_init(&bclink->node.lock);  	bcl->owner = &bclink->node;  	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;  	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); -	bcl->b_ptr = &bcbearer->bearer; +	bcl->bearer_id = MAX_BEARERS; +	rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);  	bcl->state = WORKING_WORKING;  	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); - -	if (BCLINK_LOG_BUF_SIZE) { -		char *pb = kmalloc(BCLINK_LOG_BUF_SIZE, GFP_ATOMIC); - -		if (!pb) -			goto nomem; -		tipc_printbuf_init(&bcl->print_buf, pb, BCLINK_LOG_BUF_SIZE); -	} -  	return 0;  }  void tipc_bclink_stop(void)  { -	spin_lock_bh(&bc_lock); -	if (bcbearer) { -		tipc_link_stop(bcl); -		if (BCLINK_LOG_BUF_SIZE) -			kfree(bcl->print_buf.buf); -		bcl = NULL; -		kfree(bclink); -		bclink = NULL; -		kfree(bcbearer); -		bcbearer = NULL; -	} -	spin_unlock_bh(&bc_lock); +	tipc_bclink_lock(); +	tipc_link_purge_queues(bcl); +	tipc_bclink_unlock(); + +	RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); +	synchronize_net(); +	kfree(bcbearer); +	kfree(bclink);  } -  /**   * tipc_nmap_add - add a node to a node map   */ - -void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)  {  	int n = tipc_node(node);  	int w = n / WSIZE; @@ -846,8 +864,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)  /**   * tipc_nmap_remove - remove a node from a node map   */ - -void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) +static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)  {  	int n = tipc_node(node);  	int w = n / WSIZE; @@ -865,7 +882,6 @@ void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)   * @nm_b: input node map B   * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)   */ -  static void tipc_nmap_diff(struct tipc_node_map *nm_a,  			   struct tipc_node_map *nm_b,  			   struct tipc_node_map *nm_diff) @@ -891,10 +907,9 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a,  /**   * tipc_port_list_add - add a port to a port list, ensuring no duplicates   */ - -void tipc_port_list_add(struct port_list *pl_ptr, u32 port) +void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)  { -	struct port_list *item = pl_ptr; +	struct tipc_port_list *item = pl_ptr;  	int i;  	int item_sz = PLSIZE;  	int cnt = pl_ptr->count; @@ -913,7 +928,7 @@ void tipc_port_list_add(struct port_list *pl_ptr, u32 port)  		if (!item->next) {  			item->next = kmalloc(sizeof(*item), GFP_ATOMIC);  			if (!item->next) { -				warn("Incomplete multicast delivery, no memory\n"); +				pr_warn("Incomplete multicast delivery, no memory\n");  				return;  			}  			item->next->next = NULL; @@ -925,15 +940,13 @@ void tipc_port_list_add(struct port_list *pl_ptr, u32 port)   * tipc_port_list_free - free dynamically created entries in port_list chain   *   */ - -void tipc_port_list_free(struct port_list *pl_ptr) +void tipc_port_list_free(struct tipc_port_list *pl_ptr)  { -	struct port_list *item; -	struct port_list *next; +	struct tipc_port_list *item; +	struct tipc_port_list *next;  	for (item = pl_ptr->next; item; item = next) {  		next = item->next;  		kfree(item);  	}  } -  | 
