diff options
Diffstat (limited to 'net/tipc/link.c')
| -rw-r--r-- | net/tipc/link.c | 104 | 
1 files changed, 60 insertions, 44 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index ebf338f7b14..5ed4b4f7452 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -92,7 +92,8 @@ static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);  static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);  static int  link_send_sections_long(struct tipc_port *sender,  				    struct iovec const *msg_sect, -				    u32 num_sect, u32 destnode); +				    u32 num_sect, unsigned int total_len, +				    u32 destnode);  static void link_check_defragm_bufs(struct link *l_ptr);  static void link_state_event(struct link *l_ptr, u32 event);  static void link_reset_statistics(struct link *l_ptr); @@ -842,6 +843,25 @@ static void link_add_to_outqueue(struct link *l_ptr,  		l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;  } +static void link_add_chain_to_outqueue(struct link *l_ptr, +				       struct sk_buff *buf_chain, +				       u32 long_msgno) +{ +	struct sk_buff *buf; +	struct tipc_msg *msg; + +	if (!l_ptr->next_out) +		l_ptr->next_out = buf_chain; +	while (buf_chain) { +		buf = buf_chain; +		buf_chain = buf_chain->next; + +		msg = buf_msg(buf); +		msg_set_long_msgno(msg, long_msgno); +		link_add_to_outqueue(l_ptr, buf, msg); +	} +} +  /*   * tipc_link_send_buf() is the 'full path' for messages, called from   * inside TIPC when the 'fast path' in tipc_send_buf @@ -864,8 +884,9 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)  	if (unlikely(queue_size >= queue_limit)) {  		if (imp <= TIPC_CRITICAL_IMPORTANCE) { -			return link_schedule_port(l_ptr, msg_origport(msg), -						  size); +			link_schedule_port(l_ptr, msg_origport(msg), size); +			buf_discard(buf); +			return -ELINKCONG;  		}  		buf_discard(buf);  		if (imp > CONN_MANAGER) { @@ -1042,6 +1063,7 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)  int tipc_link_send_sections_fast(struct tipc_port *sender,  				 struct iovec const *msg_sect,  				 const u32 num_sect, +				 unsigned int total_len,  				 u32 destaddr)  {  	struct tipc_msg *hdr = &sender->phdr; @@ -1057,8 +1079,8 @@ again:  	 * (Must not hold any locks while building message.)  	 */ -	res = tipc_msg_build(hdr, msg_sect, num_sect, sender->max_pkt, -			!sender->user_port, &buf); +	res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, +			     sender->max_pkt, !sender->user_port, &buf);  	read_lock_bh(&tipc_net_lock);  	node = tipc_node_find(destaddr); @@ -1069,8 +1091,6 @@ again:  			if (likely(buf)) {  				res = link_send_buf_fast(l_ptr, buf,  							 &sender->max_pkt); -				if (unlikely(res < 0)) -					buf_discard(buf);  exit:  				tipc_node_unlock(node);  				read_unlock_bh(&tipc_net_lock); @@ -1105,7 +1125,8 @@ exit:  				goto again;  			return link_send_sections_long(sender, msg_sect, -						       num_sect, destaddr); +						       num_sect, total_len, +						       destaddr);  		}  		tipc_node_unlock(node);  	} @@ -1117,7 +1138,7 @@ exit:  		return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);  	if (res >= 0)  		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, -						 TIPC_ERR_NO_NODE); +						 total_len, TIPC_ERR_NO_NODE);  	return res;  } @@ -1138,12 +1159,13 @@ exit:  static int link_send_sections_long(struct tipc_port *sender,  				   struct iovec const *msg_sect,  				   u32 num_sect, +				   unsigned int total_len,  				   u32 destaddr)  {  	struct link *l_ptr;  	struct tipc_node *node;  	struct tipc_msg *hdr = &sender->phdr; -	u32 dsz = msg_data_sz(hdr); +	u32 dsz = total_len;  	u32 max_pkt, fragm_sz, rest;  	struct tipc_msg fragm_hdr;  	struct sk_buff *buf, *buf_chain, *prev; @@ -1169,7 +1191,6 @@ again:  	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,  		 INT_H_SIZE, msg_destnode(hdr)); -	msg_set_link_selector(&fragm_hdr, sender->ref);  	msg_set_size(&fragm_hdr, max_pkt);  	msg_set_fragm_no(&fragm_hdr, 1); @@ -1271,28 +1292,15 @@ reject:  			buf_discard(buf_chain);  		}  		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, -						 TIPC_ERR_NO_NODE); +						 total_len, TIPC_ERR_NO_NODE);  	} -	/* Append whole chain to send queue: */ +	/* Append chain of fragments to send queue & send them */ -	buf = buf_chain; -	l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); -	if (!l_ptr->next_out) -		l_ptr->next_out = buf_chain; +	l_ptr->long_msg_seq_no++; +	link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); +	l_ptr->stats.sent_fragments += fragm_no;  	l_ptr->stats.sent_fragmented++; -	while (buf) { -		struct sk_buff *next = buf->next; -		struct tipc_msg *msg = buf_msg(buf); - -		l_ptr->stats.sent_fragments++; -		msg_set_long_msgno(msg, l_ptr->long_msg_seq_no); -		link_add_to_outqueue(l_ptr, buf, msg); -		buf = next; -	} - -	/* Send it, if possible: */ -  	tipc_link_push_queue(l_ptr);  	tipc_node_unlock(node);  	return dsz; @@ -2407,6 +2415,8 @@ void tipc_link_recv_bundle(struct sk_buff *buf)   */  static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)  { +	struct sk_buff *buf_chain = NULL; +	struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;  	struct tipc_msg *inmsg = buf_msg(buf);  	struct tipc_msg fragm_hdr;  	u32 insize = msg_size(inmsg); @@ -2415,7 +2425,7 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)  	u32 rest = insize;  	u32 pack_sz = l_ptr->max_pkt;  	u32 fragm_sz = pack_sz - INT_H_SIZE; -	u32 fragm_no = 1; +	u32 fragm_no = 0;  	u32 destaddr;  	if (msg_short(inmsg)) @@ -2427,10 +2437,6 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)  	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,  		 INT_H_SIZE, destaddr); -	msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg)); -	msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++)); -	msg_set_fragm_no(&fragm_hdr, fragm_no); -	l_ptr->stats.sent_fragmented++;  	/* Chop up message: */ @@ -2443,27 +2449,37 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)  		}  		fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);  		if (fragm == NULL) { -			warn("Link unable to fragment message\n"); -			dsz = -ENOMEM; -			goto exit; +			buf_discard(buf); +			while (buf_chain) { +				buf = buf_chain; +				buf_chain = buf_chain->next; +				buf_discard(buf); +			} +			return -ENOMEM;  		}  		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); +		fragm_no++; +		msg_set_fragm_no(&fragm_hdr, fragm_no);  		skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);  		skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,  					       fragm_sz); -		/*  Send queued messages first, if any: */ +		buf_chain_tail->next = fragm; +		buf_chain_tail = fragm; -		l_ptr->stats.sent_fragments++; -		tipc_link_send_buf(l_ptr, fragm); -		if (!tipc_link_is_up(l_ptr)) -			return dsz; -		msg_set_fragm_no(&fragm_hdr, ++fragm_no);  		rest -= fragm_sz;  		crs += fragm_sz;  		msg_set_type(&fragm_hdr, FRAGMENT);  	} -exit:  	buf_discard(buf); + +	/* Append chain of fragments to send queue & send them */ + +	l_ptr->long_msg_seq_no++; +	link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); +	l_ptr->stats.sent_fragments += fragm_no; +	l_ptr->stats.sent_fragmented++; +	tipc_link_push_queue(l_ptr); +  	return dsz;  }  | 
