diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/addr.h | 2 | ||||
| -rw-r--r-- | net/tipc/bcast.c | 41 | ||||
| -rw-r--r-- | net/tipc/bcast.h | 4 | ||||
| -rw-r--r-- | net/tipc/bearer.c | 103 | ||||
| -rw-r--r-- | net/tipc/bearer.h | 13 | ||||
| -rw-r--r-- | net/tipc/config.c | 114 | ||||
| -rw-r--r-- | net/tipc/config.h | 5 | ||||
| -rw-r--r-- | net/tipc/core.c | 108 | ||||
| -rw-r--r-- | net/tipc/core.h | 3 | ||||
| -rw-r--r-- | net/tipc/discover.c | 23 | ||||
| -rw-r--r-- | net/tipc/discover.h | 5 | ||||
| -rw-r--r-- | net/tipc/handler.c | 1 | ||||
| -rw-r--r-- | net/tipc/link.c | 627 | ||||
| -rw-r--r-- | net/tipc/link.h | 57 | ||||
| -rw-r--r-- | net/tipc/name_distr.c | 22 | ||||
| -rw-r--r-- | net/tipc/name_distr.h | 2 | ||||
| -rw-r--r-- | net/tipc/name_table.c | 40 | ||||
| -rw-r--r-- | net/tipc/net.c | 19 | ||||
| -rw-r--r-- | net/tipc/netlink.c | 8 | ||||
| -rw-r--r-- | net/tipc/node.c | 119 | ||||
| -rw-r--r-- | net/tipc/node.h | 6 | ||||
| -rw-r--r-- | net/tipc/port.c | 301 | ||||
| -rw-r--r-- | net/tipc/port.h | 130 | ||||
| -rw-r--r-- | net/tipc/ref.c | 30 | ||||
| -rw-r--r-- | net/tipc/ref.h | 1 | ||||
| -rw-r--r-- | net/tipc/server.c | 19 | ||||
| -rw-r--r-- | net/tipc/server.h | 2 | ||||
| -rw-r--r-- | net/tipc/socket.c | 426 | ||||
| -rw-r--r-- | net/tipc/socket.h | 72 | ||||
| -rw-r--r-- | net/tipc/subscr.c | 48 | 
30 files changed, 1135 insertions, 1216 deletions
| diff --git a/net/tipc/addr.h b/net/tipc/addr.h index 60b00ab93d7..a74acf9ee80 100644 --- a/net/tipc/addr.h +++ b/net/tipc/addr.h @@ -37,6 +37,8 @@  #ifndef _TIPC_ADDR_H  #define _TIPC_ADDR_H +#include "core.h" +  #define TIPC_ZONE_MASK		0xff000000u  #define TIPC_CLUSTER_MASK	0xfffff000u diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index bf860d9e75a..95ab5ef9292 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -41,9 +41,9 @@  #include "bcast.h"  #include "name_distr.h" -#define MAX_PKT_DEFAULT_MCAST 1500	/* bcast link max packet size (fixed) */ - -#define BCLINK_WIN_DEFAULT 20		/* bcast link window size (default) */ +#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */ +#define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */ +#define	BCBEARER		MAX_BEARERS  /**   * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link @@ -356,9 +356,9 @@ static void bclink_peek_nack(struct tipc_msg *msg)  }  /* - * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster + * tipc_bclink_xmit - broadcast a packet to all nodes in cluster   */ -int tipc_bclink_send_msg(struct sk_buff *buf) +int tipc_bclink_xmit(struct sk_buff *buf)  {  	int res; @@ -370,7 +370,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf)  		goto exit;  	} -	res = tipc_link_send_buf(bcl, buf); +	res = __tipc_link_xmit(bcl, buf);  	if (likely(res >= 0)) {  		bclink_set_last_sent();  		bcl->stats.queue_sz_counts++; @@ -399,19 +399,18 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)  	 */  	if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { -		tipc_link_send_proto_msg( -			node->active_links[node->addr & 1], -			STATE_MSG, 0, 0, 0, 0, 0); +		tipc_link_proto_xmit(node->active_links[node->addr & 1], +				     STATE_MSG, 0, 0, 0, 0, 0);  		bcl->stats.sent_acks++;  	}  }  /** - * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards + * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards   *   * tipc_net_lock is read_locked, no other locks set   */ -void tipc_bclink_recv_pkt(struct sk_buff *buf) +void tipc_bclink_rcv(struct sk_buff *buf)  {  	struct tipc_msg *msg = buf_msg(buf);  	struct tipc_node *node; @@ -468,7 +467,7 @@ receive:  			spin_unlock_bh(&bc_lock);  			tipc_node_unlock(node);  			if (likely(msg_mcast(msg))) -				tipc_port_recv_mcast(buf, NULL); +				tipc_port_mcast_rcv(buf, NULL);  			else  				kfree_skb(buf);  		} else if (msg_user(msg) == MSG_BUNDLER) { @@ -478,12 +477,12 @@ receive:  			bcl->stats.recv_bundled += msg_msgcnt(msg);  			spin_unlock_bh(&bc_lock);  			tipc_node_unlock(node); -			tipc_link_recv_bundle(buf); +			tipc_link_bundle_rcv(buf);  		} else if (msg_user(msg) == MSG_FRAGMENTER) {  			int ret; -			ret = tipc_link_recv_fragment(&node->bclink.reasm_head, -						      &node->bclink.reasm_tail, -						      &buf); +			ret = tipc_link_frag_rcv(&node->bclink.reasm_head, +						 &node->bclink.reasm_tail, +						 &buf);  			if (ret == LINK_REASM_ERROR)  				goto unlock;  			spin_lock_bh(&bc_lock); @@ -503,7 +502,7 @@ receive:  			bclink_accept_pkt(node, seqno);  			spin_unlock_bh(&bc_lock);  			tipc_node_unlock(node); -			tipc_named_recv(buf); +			tipc_named_rcv(buf);  		} else {  			spin_lock_bh(&bc_lock);  			bclink_accept_pkt(node, seqno); @@ -669,9 +668,8 @@ void tipc_bcbearer_sort(void)  	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));  	for (b_index = 0; b_index < MAX_BEARERS; b_index++) { -		struct tipc_bearer *b = &tipc_bearers[b_index]; - -		if (!b->active || !b->nodes.count) +		struct tipc_bearer *b = bearer_list[b_index]; +		if (!b || !b->nodes.count)  			continue;  		if (!bp_temp[b->priority].primary) @@ -785,8 +783,8 @@ void tipc_bclink_init(void)  	bcl->owner = &bclink->node;  	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;  	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); -	spin_lock_init(&bcbearer->bearer.lock);  	bcl->b_ptr = &bcbearer->bearer; +	bearer_list[BCBEARER] = &bcbearer->bearer;  	bcl->state = WORKING_WORKING;  	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);  } @@ -797,6 +795,7 @@ void tipc_bclink_stop(void)  	tipc_link_purge_queues(bcl);  	spin_unlock_bh(&bc_lock); +	bearer_list[BCBEARER] = NULL;  	memset(bclink, 0, sizeof(*bclink));  	memset(bcbearer, 0, sizeof(*bcbearer));  } diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 6ee587b469f..a80ef54b818 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -90,8 +90,8 @@ void tipc_bclink_add_node(u32 addr);  void tipc_bclink_remove_node(u32 addr);  struct tipc_node *tipc_bclink_retransmit_to(void);  void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); -int  tipc_bclink_send_msg(struct sk_buff *buf); -void tipc_bclink_recv_pkt(struct sk_buff *buf); +int  tipc_bclink_xmit(struct sk_buff *buf); +void tipc_bclink_rcv(struct sk_buff *buf);  u32  tipc_bclink_get_last_sent(void);  u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr);  void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index a38c89969c6..3fef7eb776d 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -49,9 +49,9 @@ static struct tipc_media * const media_info_array[] = {  	NULL  }; -struct tipc_bearer tipc_bearers[MAX_BEARERS]; +struct tipc_bearer *bearer_list[MAX_BEARERS + 1]; -static void bearer_disable(struct tipc_bearer *b_ptr); +static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down);  /**   * tipc_media_find - locates specified media object by name @@ -177,8 +177,9 @@ struct tipc_bearer *tipc_bearer_find(const char *name)  	struct tipc_bearer *b_ptr;  	u32 i; -	for (i = 0, b_ptr = tipc_bearers; i < MAX_BEARERS; i++, b_ptr++) { -		if (b_ptr->active && (!strcmp(b_ptr->name, name))) +	for (i = 0; i < MAX_BEARERS; i++) { +		b_ptr = bearer_list[i]; +		if (b_ptr && (!strcmp(b_ptr->name, name)))  			return b_ptr;  	}  	return NULL; @@ -200,8 +201,10 @@ struct sk_buff *tipc_bearer_get_names(void)  	read_lock_bh(&tipc_net_lock);  	for (i = 0; media_info_array[i] != NULL; i++) {  		for (j = 0; j < MAX_BEARERS; j++) { -			b = &tipc_bearers[j]; -			if (b->active && (b->media == media_info_array[i])) { +			b = bearer_list[j]; +			if (!b) +				continue; +			if (b->media == media_info_array[i]) {  				tipc_cfg_append_tlv(buf, TIPC_TLV_BEARER_NAME,  						    b->name,  						    strlen(b->name) + 1); @@ -284,16 +287,17 @@ restart:  	bearer_id = MAX_BEARERS;  	with_this_prio = 1;  	for (i = MAX_BEARERS; i-- != 0; ) { -		if (!tipc_bearers[i].active) { +		b_ptr = bearer_list[i]; +		if (!b_ptr) {  			bearer_id = i;  			continue;  		} -		if (!strcmp(name, tipc_bearers[i].name)) { +		if (!strcmp(name, b_ptr->name)) {  			pr_warn("Bearer <%s> rejected, already enabled\n",  				name);  			goto exit;  		} -		if ((tipc_bearers[i].priority == priority) && +		if ((b_ptr->priority == priority) &&  		    (++with_this_prio > 2)) {  			if (priority-- == 0) {  				pr_warn("Bearer <%s> rejected, duplicate priority\n", @@ -311,7 +315,11 @@ restart:  		goto exit;  	} -	b_ptr = &tipc_bearers[bearer_id]; +	b_ptr = kzalloc(sizeof(*b_ptr), GFP_ATOMIC); +	if (!b_ptr) { +		res = -ENOMEM; +		goto exit; +	}  	strcpy(b_ptr->name, name);  	b_ptr->media = m_ptr;  	res = m_ptr->enable_media(b_ptr); @@ -324,19 +332,20 @@ restart:  	b_ptr->identity = bearer_id;  	b_ptr->tolerance = m_ptr->tolerance;  	b_ptr->window = m_ptr->window; +	b_ptr->domain = disc_domain;  	b_ptr->net_plane = bearer_id + 'A'; -	b_ptr->active = 1;  	b_ptr->priority = priority; -	INIT_LIST_HEAD(&b_ptr->links); -	spin_lock_init(&b_ptr->lock); -	res = tipc_disc_create(b_ptr, &b_ptr->bcast_addr, disc_domain); +	res = tipc_disc_create(b_ptr, &b_ptr->bcast_addr);  	if (res) { -		bearer_disable(b_ptr); +		bearer_disable(b_ptr, false);  		pr_warn("Bearer <%s> rejected, discovery object creation failed\n",  			name);  		goto exit;  	} + +	bearer_list[bearer_id] = b_ptr; +  	pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",  		name,  		tipc_addr_string_fill(addr_string, disc_domain), priority); @@ -350,20 +359,11 @@ exit:   */  static int tipc_reset_bearer(struct tipc_bearer *b_ptr)  { -	struct tipc_link *l_ptr; -	struct tipc_link *temp_l_ptr; -  	read_lock_bh(&tipc_net_lock);  	pr_info("Resetting bearer <%s>\n", b_ptr->name); -	spin_lock_bh(&b_ptr->lock); -	list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { -		struct tipc_node *n_ptr = l_ptr->owner; - -		spin_lock_bh(&n_ptr->lock); -		tipc_link_reset(l_ptr); -		spin_unlock_bh(&n_ptr->lock); -	} -	spin_unlock_bh(&b_ptr->lock); +	tipc_disc_delete(b_ptr->link_req); +	tipc_link_reset_list(b_ptr->identity); +	tipc_disc_create(b_ptr, &b_ptr->bcast_addr);  	read_unlock_bh(&tipc_net_lock);  	return 0;  } @@ -373,26 +373,24 @@ static int tipc_reset_bearer(struct tipc_bearer *b_ptr)   *   * Note: This routine assumes caller holds tipc_net_lock.   */ -static void bearer_disable(struct tipc_bearer *b_ptr) +static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down)  { -	struct tipc_link *l_ptr; -	struct tipc_link *temp_l_ptr; -	struct tipc_link_req *temp_req; +	u32 i;  	pr_info("Disabling bearer <%s>\n", b_ptr->name); -	spin_lock_bh(&b_ptr->lock);  	b_ptr->media->disable_media(b_ptr); -	list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { -		tipc_link_delete(l_ptr); -	} -	temp_req = b_ptr->link_req; -	b_ptr->link_req = NULL; -	spin_unlock_bh(&b_ptr->lock); -	if (temp_req) -		tipc_disc_delete(temp_req); +	tipc_link_delete_list(b_ptr->identity, shutting_down); +	if (b_ptr->link_req) +		tipc_disc_delete(b_ptr->link_req); -	memset(b_ptr, 0, sizeof(struct tipc_bearer)); +	for (i = 0; i < MAX_BEARERS; i++) { +		if (b_ptr == bearer_list[i]) { +			bearer_list[i] = NULL; +			break; +		} +	} +	kfree(b_ptr);  }  int tipc_disable_bearer(const char *name) @@ -406,7 +404,7 @@ int tipc_disable_bearer(const char *name)  		pr_warn("Attempt to disable unknown bearer <%s>\n", name);  		res = -EINVAL;  	} else { -		bearer_disable(b_ptr); +		bearer_disable(b_ptr, false);  		res = 0;  	}  	write_unlock_bh(&tipc_net_lock); @@ -585,7 +583,11 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,  			break;  	case NETDEV_DOWN:  	case NETDEV_CHANGEMTU: +		tipc_reset_bearer(b_ptr); +		break;  	case NETDEV_CHANGEADDR: +		tipc_l2_media_addr_set(b_ptr, &b_ptr->addr, +				       (char *)dev->dev_addr);  		tipc_reset_bearer(b_ptr);  		break;  	case NETDEV_UNREGISTER: @@ -599,7 +601,7 @@ static int tipc_l2_device_event(struct notifier_block *nb, unsigned long evt,  }  static struct packet_type tipc_packet_type __read_mostly = { -	.type = __constant_htons(ETH_P_TIPC), +	.type = htons(ETH_P_TIPC),  	.func = tipc_l2_rcv_msg,  }; @@ -610,8 +612,13 @@ static struct notifier_block notifier = {  int tipc_bearer_setup(void)  { +	int err; + +	err = register_netdevice_notifier(¬ifier); +	if (err) +		return err;  	dev_add_pack(&tipc_packet_type); -	return register_netdevice_notifier(¬ifier); +	return 0;  }  void tipc_bearer_cleanup(void) @@ -622,10 +629,14 @@ void tipc_bearer_cleanup(void)  void tipc_bearer_stop(void)  { +	struct tipc_bearer *b_ptr;  	u32 i;  	for (i = 0; i < MAX_BEARERS; i++) { -		if (tipc_bearers[i].active) -			bearer_disable(&tipc_bearers[i]); +		b_ptr = bearer_list[i]; +		if (b_ptr) { +			bearer_disable(b_ptr, true); +			bearer_list[i] = NULL; +		}  	}  } diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 4f5db9ad5bf..ba48145e871 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -107,10 +107,8 @@ struct tipc_media {  /**   * struct tipc_bearer - Generic TIPC bearer structure - * @dev: ptr to associated network device - * @usr_handle: pointer to additional media-specific information about bearer + * @media_ptr: pointer to additional media-specific information about bearer   * @mtu: max packet size bearer can support - * @lock: spinlock for controlling access to bearer   * @addr: media-specific address associated with bearer   * @name: bearer name (format = media:interface)   * @media: ptr to media structure associated with bearer @@ -118,10 +116,9 @@ struct tipc_media {   * @priority: default link priority for bearer   * @window: default window size for bearer   * @tolerance: default link tolerance for bearer + * @domain: network domain to which links can be established   * @identity: array index of this bearer within TIPC bearer array   * @link_req: ptr to (optional) structure making periodic link setup requests - * @links: list of non-congested links associated with bearer - * @active: non-zero if bearer structure is represents a bearer   * @net_plane: network plane ('A' through 'H') currently associated with bearer   * @nodes: indicates which nodes in cluster can be reached through bearer   * @@ -134,16 +131,14 @@ struct tipc_bearer {  	u32 mtu;				/* initalized by media */  	struct tipc_media_addr addr;		/* initalized by media */  	char name[TIPC_MAX_BEARER_NAME]; -	spinlock_t lock;  	struct tipc_media *media;  	struct tipc_media_addr bcast_addr;  	u32 priority;  	u32 window;  	u32 tolerance; +	u32 domain;  	u32 identity;  	struct tipc_link_req *link_req; -	struct list_head links; -	int active;  	char net_plane;  	struct tipc_node_map nodes;  }; @@ -155,7 +150,7 @@ struct tipc_bearer_names {  struct tipc_link; -extern struct tipc_bearer tipc_bearers[]; +extern struct tipc_bearer *bearer_list[];  /*   * TIPC routines available to supported media types diff --git a/net/tipc/config.c b/net/tipc/config.c index c301a9a592d..4b981c05382 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -43,13 +43,11 @@  #define REPLY_TRUNCATED "<truncated>\n"  static DEFINE_MUTEX(config_mutex); -static struct tipc_server cfgsrv;  static const void *req_tlv_area;	/* request message TLV area */  static int req_tlv_space;		/* request message TLV area size */  static int rep_headroom;		/* reply message headroom to use */ -  struct sk_buff *tipc_cfg_reply_alloc(int payload_size)  {  	struct sk_buff *buf; @@ -181,19 +179,7 @@ static struct sk_buff *cfg_set_own_addr(void)  	if (tipc_own_addr)  		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  						   " (cannot change node address once assigned)"); -	tipc_core_start_net(addr); -	return tipc_cfg_reply_none(); -} - -static struct sk_buff *cfg_set_remote_mng(void) -{ -	u32 value; - -	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED)) -		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); - -	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area)); -	tipc_remote_management = (value != 0); +	tipc_net_start(addr);  	return tipc_cfg_reply_none();  } @@ -247,21 +233,10 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area  	/* Check command authorization */  	if (likely(in_own_node(orig_node))) {  		/* command is permitted */ -	} else if (cmd >= 0x8000) { +	} else {  		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  							  " (cannot be done remotely)");  		goto exit; -	} else if (!tipc_remote_management) { -		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NO_REMOTE); -		goto exit; -	} else if (cmd >= 0x4000) { -		u32 domain = 0; - -		if ((tipc_nametbl_translate(TIPC_ZM_SRV, 0, &domain) == 0) || -		    (domain != orig_node)) { -			rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_ZONE_MSTR); -			goto exit; -		}  	}  	/* Call appropriate processing routine */ @@ -310,18 +285,12 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area  	case TIPC_CMD_SET_NODE_ADDR:  		rep_tlv_buf = cfg_set_own_addr();  		break; -	case TIPC_CMD_SET_REMOTE_MNG: -		rep_tlv_buf = cfg_set_remote_mng(); -		break;  	case TIPC_CMD_SET_MAX_PORTS:  		rep_tlv_buf = cfg_set_max_ports();  		break;  	case TIPC_CMD_SET_NETID:  		rep_tlv_buf = cfg_set_netid();  		break; -	case TIPC_CMD_GET_REMOTE_MNG: -		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_remote_management); -		break;  	case TIPC_CMD_GET_MAX_PORTS:  		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_ports);  		break; @@ -345,6 +314,8 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area  	case TIPC_CMD_SET_MAX_PUBL:  	case TIPC_CMD_GET_MAX_PUBL:  	case TIPC_CMD_SET_LOG_SIZE: +	case TIPC_CMD_SET_REMOTE_MNG: +	case TIPC_CMD_GET_REMOTE_MNG:  	case TIPC_CMD_DUMP_LOG:  		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  							  " (obsolete command)"); @@ -369,80 +340,3 @@ exit:  	mutex_unlock(&config_mutex);  	return rep_tlv_buf;  } - -static void cfg_conn_msg_event(int conid, struct sockaddr_tipc *addr, -			       void *usr_data, void *buf, size_t len) -{ -	struct tipc_cfg_msg_hdr *req_hdr; -	struct tipc_cfg_msg_hdr *rep_hdr; -	struct sk_buff *rep_buf; -	int ret; - -	/* Validate configuration message header (ignore invalid message) */ -	req_hdr = (struct tipc_cfg_msg_hdr *)buf; -	if ((len < sizeof(*req_hdr)) || -	    (len != TCM_ALIGN(ntohl(req_hdr->tcm_len))) || -	    (ntohs(req_hdr->tcm_flags) != TCM_F_REQUEST)) { -		pr_warn("Invalid configuration message discarded\n"); -		return; -	} - -	/* Generate reply for request (if can't, return request) */ -	rep_buf = tipc_cfg_do_cmd(addr->addr.id.node, ntohs(req_hdr->tcm_type), -				  buf + sizeof(*req_hdr), -				  len - sizeof(*req_hdr), -				  BUF_HEADROOM + MAX_H_SIZE + sizeof(*rep_hdr)); -	if (rep_buf) { -		skb_push(rep_buf, sizeof(*rep_hdr)); -		rep_hdr = (struct tipc_cfg_msg_hdr *)rep_buf->data; -		memcpy(rep_hdr, req_hdr, sizeof(*rep_hdr)); -		rep_hdr->tcm_len = htonl(rep_buf->len); -		rep_hdr->tcm_flags &= htons(~TCM_F_REQUEST); - -		ret = tipc_conn_sendmsg(&cfgsrv, conid, addr, rep_buf->data, -					rep_buf->len); -		if (ret < 0) -			pr_err("Sending cfg reply message failed, no memory\n"); - -		kfree_skb(rep_buf); -	} -} - -static struct sockaddr_tipc cfgsrv_addr __read_mostly = { -	.family			= AF_TIPC, -	.addrtype		= TIPC_ADDR_NAMESEQ, -	.addr.nameseq.type	= TIPC_CFG_SRV, -	.addr.nameseq.lower	= 0, -	.addr.nameseq.upper	= 0, -	.scope			= TIPC_ZONE_SCOPE -}; - -static struct tipc_server cfgsrv __read_mostly = { -	.saddr			= &cfgsrv_addr, -	.imp			= TIPC_CRITICAL_IMPORTANCE, -	.type			= SOCK_RDM, -	.max_rcvbuf_size	= 64 * 1024, -	.name			= "cfg_server", -	.tipc_conn_recvmsg	= cfg_conn_msg_event, -	.tipc_conn_new		= NULL, -	.tipc_conn_shutdown	= NULL -}; - -int tipc_cfg_init(void) -{ -	return tipc_server_start(&cfgsrv); -} - -void tipc_cfg_reinit(void) -{ -	tipc_server_stop(&cfgsrv); - -	cfgsrv_addr.addr.nameseq.lower = tipc_own_addr; -	cfgsrv_addr.addr.nameseq.upper = tipc_own_addr; -	tipc_server_start(&cfgsrv); -} - -void tipc_cfg_stop(void) -{ -	tipc_server_stop(&cfgsrv); -} diff --git a/net/tipc/config.h b/net/tipc/config.h index 1f252f3fa05..47b1bf18161 100644 --- a/net/tipc/config.h +++ b/net/tipc/config.h @@ -64,9 +64,4 @@ static inline struct sk_buff *tipc_cfg_reply_ultra_string(char *string)  struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd,  				const void *req_tlv_area, int req_tlv_space,  				int headroom); - -int  tipc_cfg_init(void); -void tipc_cfg_reinit(void); -void tipc_cfg_stop(void); -  #endif diff --git a/net/tipc/core.c b/net/tipc/core.c index f9e88d8b04c..50d57429ebc 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -1,7 +1,7 @@  /*   * net/tipc/core.c: TIPC module code   * - * Copyright (c) 2003-2006, Ericsson AB + * Copyright (c) 2003-2006, 2013, Ericsson AB   * Copyright (c) 2005-2006, 2010-2013, Wind River Systems   * All rights reserved.   * @@ -50,7 +50,6 @@ int tipc_random __read_mostly;  u32 tipc_own_addr __read_mostly;  int tipc_max_ports __read_mostly;  int tipc_net_id __read_mostly; -int tipc_remote_management __read_mostly;  int sysctl_tipc_rmem[3] __read_mostly;	/* min/default/max */  /** @@ -77,39 +76,14 @@ struct sk_buff *tipc_buf_acquire(u32 size)  }  /** - * tipc_core_stop_net - shut down TIPC networking sub-systems - */ -static void tipc_core_stop_net(void) -{ -	tipc_net_stop(); -	tipc_bearer_cleanup(); -} - -/** - * start_net - start TIPC networking sub-systems - */ -int tipc_core_start_net(unsigned long addr) -{ -	int res; - -	tipc_net_start(addr); -	res = tipc_bearer_setup(); -	if (res < 0) -		goto err; -	return res; - -err: -	tipc_core_stop_net(); -	return res; -} - -/**   * tipc_core_stop - switch TIPC from SINGLE NODE to NOT RUNNING mode   */  static void tipc_core_stop(void)  { +	tipc_handler_stop(); +	tipc_net_stop(); +	tipc_bearer_cleanup();  	tipc_netlink_stop(); -	tipc_cfg_stop();  	tipc_subscr_stop();  	tipc_nametbl_stop();  	tipc_ref_table_stop(); @@ -122,30 +96,59 @@ static void tipc_core_stop(void)   */  static int tipc_core_start(void)  { -	int res; +	int err;  	get_random_bytes(&tipc_random, sizeof(tipc_random)); -	res = tipc_handler_start(); -	if (!res) -		res = tipc_ref_table_init(tipc_max_ports, tipc_random); -	if (!res) -		res = tipc_nametbl_init(); -	if (!res) -		res = tipc_netlink_start(); -	if (!res) -		res = tipc_socket_init(); -	if (!res) -		res = tipc_register_sysctl(); -	if (!res) -		res = tipc_subscr_start(); -	if (!res) -		res = tipc_cfg_init(); -	if (res) { -		tipc_handler_stop(); -		tipc_core_stop(); -	} -	return res; +	err = tipc_handler_start(); +	if (err) +		goto out_handler; + +	err = tipc_ref_table_init(tipc_max_ports, tipc_random); +	if (err) +		goto out_reftbl; + +	err = tipc_nametbl_init(); +	if (err) +		goto out_nametbl; + +	err = tipc_netlink_start(); +	if (err) +		goto out_netlink; + +	err = tipc_socket_init(); +	if (err) +		goto out_socket; + +	err = tipc_register_sysctl(); +	if (err) +		goto out_sysctl; + +	err = tipc_subscr_start(); +	if (err) +		goto out_subscr; + +	err = tipc_bearer_setup(); +	if (err) +		goto out_bearer; + +	return 0; +out_bearer: +	tipc_subscr_stop(); +out_subscr: +	tipc_unregister_sysctl(); +out_sysctl: +	tipc_socket_stop(); +out_socket: +	tipc_netlink_stop(); +out_netlink: +	tipc_nametbl_stop(); +out_nametbl: +	tipc_ref_table_stop(); +out_reftbl: +	tipc_handler_stop(); +out_handler: +	return err;  }  static int __init tipc_init(void) @@ -155,7 +158,6 @@ static int __init tipc_init(void)  	pr_info("Activated (version " TIPC_MOD_VER ")\n");  	tipc_own_addr = 0; -	tipc_remote_management = 1;  	tipc_max_ports = CONFIG_TIPC_PORTS;  	tipc_net_id = 4711; @@ -174,8 +176,6 @@ static int __init tipc_init(void)  static void __exit tipc_exit(void)  { -	tipc_handler_stop(); -	tipc_core_stop_net();  	tipc_core_stop();  	pr_info("Deactivated\n");  } diff --git a/net/tipc/core.h b/net/tipc/core.h index 1ff477b0450..8985bbcb942 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -79,7 +79,6 @@ int tipc_snprintf(char *buf, int len, const char *fmt, ...);  extern u32 tipc_own_addr __read_mostly;  extern int tipc_max_ports __read_mostly;  extern int tipc_net_id __read_mostly; -extern int tipc_remote_management __read_mostly;  extern int sysctl_tipc_rmem[3] __read_mostly;  /* @@ -90,7 +89,6 @@ extern int tipc_random __read_mostly;  /*   * Routines available to privileged subsystems   */ -int tipc_core_start_net(unsigned long);  int tipc_handler_start(void);  void tipc_handler_stop(void);  int tipc_netlink_start(void); @@ -192,6 +190,7 @@ static inline void k_term_timer(struct timer_list *timer)  struct tipc_skb_cb {  	void *handle; +	bool deferred;  };  #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 412ff41b861..542fe3413dc 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -48,7 +48,6 @@   * struct tipc_link_req - information about an ongoing link setup request   * @bearer: bearer issuing requests   * @dest: destination address for request messages - * @domain: network domain to which links can be established   * @num_nodes: number of nodes currently discovered (i.e. with an active link)   * @lock: spinlock for controlling access to requests   * @buf: request message to be (repeatedly) sent @@ -58,7 +57,6 @@  struct tipc_link_req {  	struct tipc_bearer *bearer;  	struct tipc_media_addr dest; -	u32 domain;  	int num_nodes;  	spinlock_t lock;  	struct sk_buff *buf; @@ -69,14 +67,13 @@ struct tipc_link_req {  /**   * tipc_disc_init_msg - initialize a link setup message   * @type: message type (request or response) - * @dest_domain: network domain of node(s) which should respond to message   * @b_ptr: ptr to bearer issuing message   */ -static struct sk_buff *tipc_disc_init_msg(u32 type, u32 dest_domain, -					  struct tipc_bearer *b_ptr) +static struct sk_buff *tipc_disc_init_msg(u32 type, struct tipc_bearer *b_ptr)  {  	struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE);  	struct tipc_msg *msg; +	u32 dest_domain = b_ptr->domain;  	if (buf) {  		msg = buf_msg(buf); @@ -110,11 +107,11 @@ static void disc_dupl_alert(struct tipc_bearer *b_ptr, u32 node_addr,  }  /** - * tipc_disc_recv_msg - handle incoming link setup message (request or response) + * tipc_disc_rcv - handle incoming link setup message (request or response)   * @buf: buffer containing message   * @b_ptr: bearer that message arrived on   */ -void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr) +void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr)  {  	struct tipc_node *n_ptr;  	struct tipc_link *link; @@ -149,7 +146,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)  	}  	if (!tipc_in_scope(dest, tipc_own_addr))  		return; -	if (!tipc_in_scope(b_ptr->link_req->domain, orig)) +	if (!tipc_in_scope(b_ptr->domain, orig))  		return;  	/* Locate structure corresponding to requesting node */ @@ -242,7 +239,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)  	link_fully_up = link_working_working(link);  	if ((type == DSC_REQ_MSG) && !link_fully_up) { -		rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr); +		rbuf = tipc_disc_init_msg(DSC_RESP_MSG, b_ptr);  		if (rbuf) {  			tipc_bearer_send(b_ptr, rbuf, &media_addr);  			kfree_skb(rbuf); @@ -306,7 +303,7 @@ static void disc_timeout(struct tipc_link_req *req)  	spin_lock_bh(&req->lock);  	/* Stop searching if only desired node has been found */ -	if (tipc_node(req->domain) && req->num_nodes) { +	if (tipc_node(req->bearer->domain) && req->num_nodes) {  		req->timer_intv = TIPC_LINK_REQ_INACTIVE;  		goto exit;  	} @@ -342,8 +339,7 @@ exit:   *   * Returns 0 if successful, otherwise -errno.   */ -int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, -		     u32 dest_domain) +int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest)  {  	struct tipc_link_req *req; @@ -351,7 +347,7 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest,  	if (!req)  		return -ENOMEM; -	req->buf = tipc_disc_init_msg(DSC_REQ_MSG, dest_domain, b_ptr); +	req->buf = tipc_disc_init_msg(DSC_REQ_MSG, b_ptr);  	if (!req->buf) {  		kfree(req);  		return -ENOMSG; @@ -359,7 +355,6 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest,  	memcpy(&req->dest, dest, sizeof(*dest));  	req->bearer = b_ptr; -	req->domain = dest_domain;  	req->num_nodes = 0;  	req->timer_intv = TIPC_LINK_REQ_INIT;  	spin_lock_init(&req->lock); diff --git a/net/tipc/discover.h b/net/tipc/discover.h index 75b67c403aa..07f34729459 100644 --- a/net/tipc/discover.h +++ b/net/tipc/discover.h @@ -39,11 +39,10 @@  struct tipc_link_req; -int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, -		     u32 dest_domain); +int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest);  void tipc_disc_delete(struct tipc_link_req *req);  void tipc_disc_add_dest(struct tipc_link_req *req);  void tipc_disc_remove_dest(struct tipc_link_req *req); -void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr); +void tipc_disc_rcv(struct sk_buff *buf, struct tipc_bearer *b_ptr);  #endif diff --git a/net/tipc/handler.c b/net/tipc/handler.c index e4bc8a29674..1fabf160501 100644 --- a/net/tipc/handler.c +++ b/net/tipc/handler.c @@ -58,7 +58,6 @@ unsigned int tipc_k_signal(Handler routine, unsigned long argument)  	spin_lock_bh(&qitem_lock);  	if (!handler_enabled) { -		pr_err("Signal request ignored by handler\n");  		spin_unlock_bh(&qitem_lock);  		return -ENOPROTOOPT;  	} diff --git a/net/tipc/link.c b/net/tipc/link.c index d4b5de41b68..c5190ab7529 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -77,19 +77,19 @@ static const char *link_unk_evt = "Unknown link event ";  static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,  				       struct sk_buff *buf); -static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf); -static int  tipc_link_tunnel_rcv(struct tipc_link **l_ptr, +static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf); +static int  tipc_link_tunnel_rcv(struct tipc_node *n_ptr,  				 struct sk_buff **buf);  static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); -static int  link_send_sections_long(struct tipc_port *sender, -				    struct iovec const *msg_sect, -				    unsigned int len, u32 destnode); +static int  tipc_link_iovec_long_xmit(struct tipc_port *sender, +				      struct iovec const *msg_sect, +				      unsigned int len, u32 destnode);  static void link_state_event(struct tipc_link *l_ptr, u32 event);  static void link_reset_statistics(struct tipc_link *l_ptr);  static void link_print(struct tipc_link *l_ptr, const char *str); -static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); -static void tipc_link_send_sync(struct tipc_link *l); -static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); +static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf); +static void tipc_link_sync_xmit(struct tipc_link *l); +static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);  /*   *  Simple link routines @@ -147,11 +147,6 @@ int tipc_link_is_active(struct tipc_link *l_ptr)  /**   * link_timeout - handle expiration of link timer   * @l_ptr: pointer to link - * - * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict - * with tipc_link_delete().  (There is no risk that the node will be deleted by - * another thread because tipc_link_delete() always cancels the link timer before - * tipc_node_delete() is called.)   */  static void link_timeout(struct tipc_link *l_ptr)  { @@ -213,8 +208,8 @@ static void link_set_timer(struct tipc_link *l_ptr, u32 time)   * Returns pointer to link.   */  struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, -			      struct tipc_bearer *b_ptr, -			      const struct tipc_media_addr *media_addr) +				   struct tipc_bearer *b_ptr, +				   const struct tipc_media_addr *media_addr)  {  	struct tipc_link *l_ptr;  	struct tipc_msg *msg; @@ -279,41 +274,44 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,  	k_init_timer(&l_ptr->timer, (Handler)link_timeout,  		     (unsigned long)l_ptr); -	list_add_tail(&l_ptr->link_list, &b_ptr->links);  	link_state_event(l_ptr, STARTING_EVT);  	return l_ptr;  } -/** - * tipc_link_delete - delete a link - * @l_ptr: pointer to link - * - * Note: 'tipc_net_lock' is write_locked, bearer is locked. - * This routine must not grab the node lock until after link timer cancellation - * to avoid a potential deadlock situation. - */ -void tipc_link_delete(struct tipc_link *l_ptr) +void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)  { -	if (!l_ptr) { -		pr_err("Attempt to delete non-existent link\n"); -		return; -	} - -	k_cancel_timer(&l_ptr->timer); +	struct tipc_link *l_ptr; +	struct tipc_node *n_ptr; -	tipc_node_lock(l_ptr->owner); -	tipc_link_reset(l_ptr); -	tipc_node_detach_link(l_ptr->owner, l_ptr); -	tipc_link_purge_queues(l_ptr); -	list_del_init(&l_ptr->link_list); -	tipc_node_unlock(l_ptr->owner); -	k_term_timer(&l_ptr->timer); -	kfree(l_ptr); +	rcu_read_lock(); +	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { +		spin_lock_bh(&n_ptr->lock); +		l_ptr = n_ptr->links[bearer_id]; +		if (l_ptr) { +			tipc_link_reset(l_ptr); +			if (shutting_down || !tipc_node_is_up(n_ptr)) { +				tipc_node_detach_link(l_ptr->owner, l_ptr); +				tipc_link_reset_fragments(l_ptr); +				spin_unlock_bh(&n_ptr->lock); + +				/* Nobody else can access this link now: */ +				del_timer_sync(&l_ptr->timer); +				kfree(l_ptr); +			} else { +				/* Detach/delete when failover is finished: */ +				l_ptr->flags |= LINK_STOPPED; +				spin_unlock_bh(&n_ptr->lock); +				del_timer_sync(&l_ptr->timer); +			} +			continue; +		} +		spin_unlock_bh(&n_ptr->lock); +	} +	rcu_read_unlock();  } -  /**   * link_schedule_port - schedule port for deferred sending   * @l_ptr: pointer to link @@ -330,8 +328,6 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)  	spin_lock_bh(&tipc_port_list_lock);  	p_ptr = tipc_port_lock(origport);  	if (p_ptr) { -		if (!p_ptr->wakeup) -			goto exit;  		if (!list_empty(&p_ptr->wait_list))  			goto exit;  		p_ptr->congested = 1; @@ -366,7 +362,7 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)  		list_del_init(&p_ptr->wait_list);  		spin_lock_bh(p_ptr->lock);  		p_ptr->congested = 0; -		p_ptr->wakeup(p_ptr); +		tipc_port_wakeup(p_ptr);  		win -= p_ptr->waiting_pkts;  		spin_unlock_bh(p_ptr->lock);  	} @@ -461,6 +457,21 @@ void tipc_link_reset(struct tipc_link *l_ptr)  	link_reset_statistics(l_ptr);  } +void tipc_link_reset_list(unsigned int bearer_id) +{ +	struct tipc_link *l_ptr; +	struct tipc_node *n_ptr; + +	rcu_read_lock(); +	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { +		spin_lock_bh(&n_ptr->lock); +		l_ptr = n_ptr->links[bearer_id]; +		if (l_ptr) +			tipc_link_reset(l_ptr); +		spin_unlock_bh(&n_ptr->lock); +	} +	rcu_read_unlock(); +}  static void link_activate(struct tipc_link *l_ptr)  { @@ -479,7 +490,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  	struct tipc_link *other;  	u32 cont_intv = l_ptr->continuity_interval; -	if (!l_ptr->started && (event != STARTING_EVT)) +	if (l_ptr->flags & LINK_STOPPED) +		return; + +	if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))  		return;		/* Not yet. */  	/* Check whether changeover is going on */ @@ -499,12 +513,12 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			if (l_ptr->next_in_no != l_ptr->checkpoint) {  				l_ptr->checkpoint = l_ptr->next_in_no;  				if (tipc_bclink_acks_missing(l_ptr->owner)) { -					tipc_link_send_proto_msg(l_ptr, STATE_MSG, -								 0, 0, 0, 0, 0); +					tipc_link_proto_xmit(l_ptr, STATE_MSG, +							     0, 0, 0, 0, 0);  					l_ptr->fsm_msg_cnt++;  				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { -					tipc_link_send_proto_msg(l_ptr, STATE_MSG, -								 1, 0, 0, 0, 0); +					tipc_link_proto_xmit(l_ptr, STATE_MSG, +							     1, 0, 0, 0, 0);  					l_ptr->fsm_msg_cnt++;  				}  				link_set_timer(l_ptr, cont_intv); @@ -512,7 +526,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			}  			l_ptr->state = WORKING_UNKNOWN;  			l_ptr->fsm_msg_cnt = 0; -			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv / 4);  			break; @@ -522,7 +536,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			tipc_link_reset(l_ptr);  			l_ptr->state = RESET_RESET;  			l_ptr->fsm_msg_cnt = 0; -			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, +					     0, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -544,7 +559,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			tipc_link_reset(l_ptr);  			l_ptr->state = RESET_RESET;  			l_ptr->fsm_msg_cnt = 0; -			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, +					     0, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -554,14 +570,14 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  				l_ptr->fsm_msg_cnt = 0;  				l_ptr->checkpoint = l_ptr->next_in_no;  				if (tipc_bclink_acks_missing(l_ptr->owner)) { -					tipc_link_send_proto_msg(l_ptr, STATE_MSG, -								 0, 0, 0, 0, 0); +					tipc_link_proto_xmit(l_ptr, STATE_MSG, +							     0, 0, 0, 0, 0);  					l_ptr->fsm_msg_cnt++;  				}  				link_set_timer(l_ptr, cont_intv);  			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { -				tipc_link_send_proto_msg(l_ptr, STATE_MSG, -							 1, 0, 0, 0, 0); +				tipc_link_proto_xmit(l_ptr, STATE_MSG, +						     1, 0, 0, 0, 0);  				l_ptr->fsm_msg_cnt++;  				link_set_timer(l_ptr, cont_intv / 4);  			} else {	/* Link has failed */ @@ -570,8 +586,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  				tipc_link_reset(l_ptr);  				l_ptr->state = RESET_UNKNOWN;  				l_ptr->fsm_msg_cnt = 0; -				tipc_link_send_proto_msg(l_ptr, RESET_MSG, -							 0, 0, 0, 0, 0); +				tipc_link_proto_xmit(l_ptr, RESET_MSG, +						     0, 0, 0, 0, 0);  				l_ptr->fsm_msg_cnt++;  				link_set_timer(l_ptr, cont_intv);  			} @@ -591,24 +607,25 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			l_ptr->state = WORKING_WORKING;  			l_ptr->fsm_msg_cnt = 0;  			link_activate(l_ptr); -			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			if (l_ptr->owner->working_links == 1) -				tipc_link_send_sync(l_ptr); +				tipc_link_sync_xmit(l_ptr);  			link_set_timer(l_ptr, cont_intv);  			break;  		case RESET_MSG:  			l_ptr->state = RESET_RESET;  			l_ptr->fsm_msg_cnt = 0; -			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, +					     1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break;  		case STARTING_EVT: -			l_ptr->started = 1; +			l_ptr->flags |= LINK_STARTED;  			/* fall through */  		case TIMEOUT_EVT: -			tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -626,16 +643,17 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)  			l_ptr->state = WORKING_WORKING;  			l_ptr->fsm_msg_cnt = 0;  			link_activate(l_ptr); -			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			if (l_ptr->owner->working_links == 1) -				tipc_link_send_sync(l_ptr); +				tipc_link_sync_xmit(l_ptr);  			link_set_timer(l_ptr, cont_intv);  			break;  		case RESET_MSG:  			break;  		case TIMEOUT_EVT: -			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, +					     0, 0, 0, 0, 0);  			l_ptr->fsm_msg_cnt++;  			link_set_timer(l_ptr, cont_intv);  			break; @@ -721,11 +739,11 @@ static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,  }  /* - * tipc_link_send_buf() is the 'full path' for messages, called from - * inside TIPC when the 'fast path' in tipc_send_buf + * tipc_link_xmit() is the 'full path' for messages, called from + * inside TIPC when the 'fast path' in tipc_send_xmit   * has failed, and from link_send()   */ -int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf) +int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)  {  	struct tipc_msg *msg = buf_msg(buf);  	u32 size = msg_size(msg); @@ -753,7 +771,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  	/* Fragmentation needed ? */  	if (size > max_packet) -		return link_send_long_buf(l_ptr, buf); +		return tipc_link_frag_xmit(l_ptr, buf);  	/* Packet can be queued or sent. */  	if (likely(!link_congested(l_ptr))) { @@ -797,11 +815,11 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  }  /* - * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has - * not been selected yet, and the the owner node is not locked + * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use + * has not been selected yet, and the the owner node is not locked   * Called by TIPC internal users, e.g. the name distributor   */ -int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) +int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)  {  	struct tipc_link *l_ptr;  	struct tipc_node *n_ptr; @@ -813,7 +831,7 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)  		tipc_node_lock(n_ptr);  		l_ptr = n_ptr->active_links[selector & 1];  		if (l_ptr) -			res = tipc_link_send_buf(l_ptr, buf); +			res = __tipc_link_xmit(l_ptr, buf);  		else  			kfree_skb(buf);  		tipc_node_unlock(n_ptr); @@ -825,14 +843,14 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)  }  /* - * tipc_link_send_sync - synchronize broadcast link endpoints. + * tipc_link_sync_xmit - synchronize broadcast link endpoints.   *   * Give a newly added peer node the sequence number where it should   * start receiving and acking broadcast packets.   *   * Called with node locked   */ -static void tipc_link_send_sync(struct tipc_link *l) +static void tipc_link_sync_xmit(struct tipc_link *l)  {  	struct sk_buff *buf;  	struct tipc_msg *msg; @@ -849,14 +867,14 @@ static void tipc_link_send_sync(struct tipc_link *l)  }  /* - * tipc_link_recv_sync - synchronize broadcast link endpoints. + * tipc_link_sync_rcv - synchronize broadcast link endpoints.   * Receive the sequence number where we should start receiving and   * acking broadcast packets from a newly added peer node, and open   * up for reception of such packets.   *   * Called with node locked   */ -static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf) +static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)  {  	struct tipc_msg *msg = buf_msg(buf); @@ -866,7 +884,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)  }  /* - * tipc_link_send_names - send name table entries to new neighbor + * tipc_link_names_xmit - send name table entries to new neighbor   *   * Send routine for bulk delivery of name table messages when contact   * with a new neighbor occurs. No link congestion checking is performed @@ -874,7 +892,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)   * small enough not to require fragmentation.   * Called without any locks held.   */ -void tipc_link_send_names(struct list_head *message_list, u32 dest) +void tipc_link_names_xmit(struct list_head *message_list, u32 dest)  {  	struct tipc_node *n_ptr;  	struct tipc_link *l_ptr; @@ -909,13 +927,13 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest)  }  /* - * link_send_buf_fast: Entry for data messages where the + * tipc_link_xmit_fast: Entry for data messages where the   * destination link is known and the header is complete,   * inclusive total message length. Very time critical.   * Link is locked. Returns user data length.   */ -static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf, -			      u32 *used_max_pkt) +static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf, +			       u32 *used_max_pkt)  {  	struct tipc_msg *msg = buf_msg(buf);  	int res = msg_data_sz(msg); @@ -931,18 +949,18 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,  		else  			*used_max_pkt = l_ptr->max_pkt;  	} -	return tipc_link_send_buf(l_ptr, buf);  /* All other cases */ +	return __tipc_link_xmit(l_ptr, buf);  /* All other cases */  }  /* - * tipc_link_send_sections_fast: Entry for messages where the + * tipc_link_iovec_xmit_fast: Entry for messages where the   * destination processor is known and the header is complete,   * except for total message length.   * Returns user data length or errno.   */ -int tipc_link_send_sections_fast(struct tipc_port *sender, -				 struct iovec const *msg_sect, -				 unsigned int len, u32 destaddr) +int tipc_link_iovec_xmit_fast(struct tipc_port *sender, +			      struct iovec const *msg_sect, +			      unsigned int len, u32 destaddr)  {  	struct tipc_msg *hdr = &sender->phdr;  	struct tipc_link *l_ptr; @@ -968,8 +986,8 @@ again:  		l_ptr = node->active_links[selector];  		if (likely(l_ptr)) {  			if (likely(buf)) { -				res = link_send_buf_fast(l_ptr, buf, -							 &sender->max_pkt); +				res = tipc_link_xmit_fast(l_ptr, buf, +							  &sender->max_pkt);  exit:  				tipc_node_unlock(node);  				read_unlock_bh(&tipc_net_lock); @@ -995,24 +1013,21 @@ exit:  			if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)  				goto again; -			return link_send_sections_long(sender, msg_sect, len, -						       destaddr); +			return tipc_link_iovec_long_xmit(sender, msg_sect, +							 len, destaddr);  		}  		tipc_node_unlock(node);  	}  	read_unlock_bh(&tipc_net_lock);  	/* Couldn't find a link to the destination node */ -	if (buf) -		return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); -	if (res >= 0) -		return tipc_port_reject_sections(sender, hdr, msg_sect, -						 len, TIPC_ERR_NO_NODE); -	return res; +	kfree_skb(buf); +	tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE); +	return -ENETUNREACH;  }  /* - * link_send_sections_long(): Entry for long messages where the + * tipc_link_iovec_long_xmit(): Entry for long messages where the   * destination node is known and the header is complete,   * inclusive total message length.   * Link and bearer congestion status have been checked to be ok, @@ -1025,9 +1040,9 @@ exit:   *   * Returns user data length or errno.   */ -static int link_send_sections_long(struct tipc_port *sender, -				   struct iovec const *msg_sect, -				   unsigned int len, u32 destaddr) +static int tipc_link_iovec_long_xmit(struct tipc_port *sender, +				     struct iovec const *msg_sect, +				     unsigned int len, u32 destaddr)  {  	struct tipc_link *l_ptr;  	struct tipc_node *node; @@ -1146,8 +1161,9 @@ error:  	} else {  reject:  		kfree_skb_list(buf_chain); -		return tipc_port_reject_sections(sender, hdr, msg_sect, -						 len, TIPC_ERR_NO_NODE); +		tipc_port_iovec_reject(sender, hdr, msg_sect, len, +				       TIPC_ERR_NO_NODE); +		return -ENETUNREACH;  	}  	/* Append chain of fragments to send queue & send them */ @@ -1391,6 +1407,12 @@ static int link_recv_buf_validate(struct sk_buff *buf)  	u32 hdr_size;  	u32 min_hdr_size; +	/* If this packet comes from the defer queue, the skb has already +	 * been validated +	 */ +	if (unlikely(TIPC_SKB_CB(buf)->deferred)) +		return 1; +  	if (unlikely(buf->len < MIN_H_SIZE))  		return 0; @@ -1435,15 +1457,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)  		u32 seq_no;  		u32 ackd;  		u32 released = 0; -		int type;  		head = head->next;  		buf->next = NULL; -		/* Ensure bearer is still enabled */ -		if (unlikely(!b_ptr->active)) -			goto discard; -  		/* Ensure message is well-formed */  		if (unlikely(!link_recv_buf_validate(buf)))  			goto discard; @@ -1457,9 +1474,9 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)  		if (unlikely(msg_non_seq(msg))) {  			if (msg_user(msg) ==  LINK_CONFIG) -				tipc_disc_recv_msg(buf, b_ptr); +				tipc_disc_rcv(buf, b_ptr);  			else -				tipc_bclink_recv_pkt(buf); +				tipc_bclink_rcv(buf);  			continue;  		} @@ -1483,7 +1500,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)  		if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&  			msg_user(msg) == LINK_PROTOCOL &&  			(msg_type(msg) == RESET_MSG || -					msg_type(msg) == ACTIVATE_MSG) && +			 msg_type(msg) == ACTIVATE_MSG) &&  			!msg_redundant_link(msg))  			n_ptr->block_setup &= ~WAIT_PEER_DOWN; @@ -1502,7 +1519,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)  		while ((crs != l_ptr->next_out) &&  		       less_eq(buf_seqno(crs), ackd)) {  			struct sk_buff *next = crs->next; -  			kfree_skb(crs);  			crs = next;  			released++; @@ -1515,18 +1531,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)  		/* Try sending any messages link endpoint has pending */  		if (unlikely(l_ptr->next_out))  			tipc_link_push_queue(l_ptr); +  		if (unlikely(!list_empty(&l_ptr->waiting_ports)))  			tipc_link_wakeup_ports(l_ptr, 0); +  		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {  			l_ptr->stats.sent_acks++; -			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);  		} -		/* Now (finally!) process the incoming message */ -protocol_check: +		/* Process the incoming packet */  		if (unlikely(!link_working_working(l_ptr))) {  			if (msg_user(msg) == LINK_PROTOCOL) { -				link_recv_proto_msg(l_ptr, buf); +				tipc_link_proto_rcv(l_ptr, buf);  				head = link_insert_deferred_queue(l_ptr, head);  				tipc_node_unlock(n_ptr);  				continue; @@ -1555,67 +1572,65 @@ protocol_check:  		l_ptr->next_in_no++;  		if (unlikely(l_ptr->oldest_deferred_in))  			head = link_insert_deferred_queue(l_ptr, head); -deliver: -		if (likely(msg_isdata(msg))) { -			tipc_node_unlock(n_ptr); -			tipc_port_recv_msg(buf); -			continue; + +		/* Deliver packet/message to correct user: */ +		if (unlikely(msg_user(msg) ==  CHANGEOVER_PROTOCOL)) { +			if (!tipc_link_tunnel_rcv(n_ptr, &buf)) { +				tipc_node_unlock(n_ptr); +				continue; +			} +			msg = buf_msg(buf); +		} else if (msg_user(msg) == MSG_FRAGMENTER) { +			int rc; + +			l_ptr->stats.recv_fragments++; +			rc = tipc_link_frag_rcv(&l_ptr->reasm_head, +						&l_ptr->reasm_tail, +						&buf); +			if (rc == LINK_REASM_COMPLETE) { +				l_ptr->stats.recv_fragmented++; +				msg = buf_msg(buf); +			} else { +				if (rc == LINK_REASM_ERROR) +					tipc_link_reset(l_ptr); +				tipc_node_unlock(n_ptr); +				continue; +			}  		} +  		switch (msg_user(msg)) { -			int ret; +		case TIPC_LOW_IMPORTANCE: +		case TIPC_MEDIUM_IMPORTANCE: +		case TIPC_HIGH_IMPORTANCE: +		case TIPC_CRITICAL_IMPORTANCE: +			tipc_node_unlock(n_ptr); +			tipc_port_rcv(buf); +			continue;  		case MSG_BUNDLER:  			l_ptr->stats.recv_bundles++;  			l_ptr->stats.recv_bundled += msg_msgcnt(msg);  			tipc_node_unlock(n_ptr); -			tipc_link_recv_bundle(buf); +			tipc_link_bundle_rcv(buf);  			continue;  		case NAME_DISTRIBUTOR:  			n_ptr->bclink.recv_permitted = true;  			tipc_node_unlock(n_ptr); -			tipc_named_recv(buf); -			continue; -		case BCAST_PROTOCOL: -			tipc_link_recv_sync(n_ptr, buf); -			tipc_node_unlock(n_ptr); +			tipc_named_rcv(buf);  			continue;  		case CONN_MANAGER:  			tipc_node_unlock(n_ptr); -			tipc_port_recv_proto_msg(buf); -			continue; -		case MSG_FRAGMENTER: -			l_ptr->stats.recv_fragments++; -			ret = tipc_link_recv_fragment(&l_ptr->reasm_head, -						      &l_ptr->reasm_tail, -						      &buf); -			if (ret == LINK_REASM_COMPLETE) { -				l_ptr->stats.recv_fragmented++; -				msg = buf_msg(buf); -				goto deliver; -			} -			if (ret == LINK_REASM_ERROR) -				tipc_link_reset(l_ptr); -			tipc_node_unlock(n_ptr); +			tipc_port_proto_rcv(buf);  			continue; -		case CHANGEOVER_PROTOCOL: -			type = msg_type(msg); -			if (tipc_link_tunnel_rcv(&l_ptr, &buf)) { -				msg = buf_msg(buf); -				seq_no = msg_seqno(msg); -				if (type == ORIGINAL_MSG) -					goto deliver; -				goto protocol_check; -			} +		case BCAST_PROTOCOL: +			tipc_link_sync_rcv(n_ptr, buf);  			break;  		default:  			kfree_skb(buf); -			buf = NULL;  			break;  		}  		tipc_node_unlock(n_ptr); -		tipc_net_route_msg(buf);  		continue;  unlock_discard: -  		tipc_node_unlock(n_ptr);  discard:  		kfree_skb(buf); @@ -1682,7 +1697,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,  	u32 seq_no = buf_seqno(buf);  	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { -		link_recv_proto_msg(l_ptr, buf); +		tipc_link_proto_rcv(l_ptr, buf);  		return;  	} @@ -1703,8 +1718,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,  				&l_ptr->newest_deferred_in, buf)) {  		l_ptr->deferred_inqueue_sz++;  		l_ptr->stats.deferred_recv++; +		TIPC_SKB_CB(buf)->deferred = true;  		if ((l_ptr->deferred_inqueue_sz % 16) == 1) -			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);  	} else  		l_ptr->stats.duplicates++;  } @@ -1712,9 +1728,8 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,  /*   * Send protocol message to the other endpoint.   */ -void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, -			      int probe_msg, u32 gap, u32 tolerance, -			      u32 priority, u32 ack_mtu) +void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, +			  u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)  {  	struct sk_buff *buf = NULL;  	struct tipc_msg *msg = l_ptr->pmsg; @@ -1813,7 +1828,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,   * Note that network plane id propagates through the network, and may   * change at any time. The node with lowest address rules   */ -static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) +static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)  {  	u32 rec_gap = 0;  	u32 max_pkt_info; @@ -1932,8 +1947,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)  						      msg_last_bcast(msg));  		if (rec_gap || (msg_probe(msg))) { -			tipc_link_send_proto_msg(l_ptr, STATE_MSG, -						 0, rec_gap, 0, 0, max_pkt_ack); +			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0, +					     0, max_pkt_ack);  		}  		if (msg_seq_gap(msg)) {  			l_ptr->stats.recv_nacks++; @@ -1972,7 +1987,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,  	}  	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);  	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); -	tipc_link_send_buf(tunnel, buf); +	__tipc_link_xmit(tunnel, buf);  } @@ -2005,7 +2020,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)  		if (buf) {  			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);  			msg_set_size(&tunnel_hdr, INT_H_SIZE); -			tipc_link_send_buf(tunnel, buf); +			__tipc_link_xmit(tunnel, buf);  		} else {  			pr_warn("%sunable to send changeover msg\n",  				link_co_err); @@ -2039,7 +2054,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)  	}  } -/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a +/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a   * duplicate of the first link's send queue via the new link. This way, we   * are guaranteed that currently queued packets from a socket are delivered   * before future traffic from the same socket, even if this is using the @@ -2048,7 +2063,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)   * and sequence order is preserved per sender/receiver socket pair.   * Owner node is locked.   */ -void tipc_link_dup_send_queue(struct tipc_link *l_ptr, +void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,  			      struct tipc_link *tunnel)  {  	struct sk_buff *iter; @@ -2078,7 +2093,7 @@ void tipc_link_dup_send_queue(struct tipc_link *l_ptr,  		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);  		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,  					       length); -		tipc_link_send_buf(tunnel, outbuf); +		__tipc_link_xmit(tunnel, outbuf);  		if (!tipc_link_is_up(l_ptr))  			return;  		iter = iter->next; @@ -2105,89 +2120,114 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)  	return eb;  } -/*  tipc_link_tunnel_rcv(): Receive a tunneled packet, sent - *  via other link as result of a failover (ORIGINAL_MSG) or - *  a new active link (DUPLICATE_MSG). Failover packets are - *  returned to the active link for delivery upwards. + + +/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. + * Owner node is locked. + */ +static void tipc_link_dup_rcv(struct tipc_link *l_ptr, +			      struct sk_buff *t_buf) +{ +	struct sk_buff *buf; + +	if (!tipc_link_is_up(l_ptr)) +		return; + +	buf = buf_extract(t_buf, INT_H_SIZE); +	if (buf == NULL) { +		pr_warn("%sfailed to extract inner dup pkt\n", link_co_err); +		return; +	} + +	/* Add buffer to deferred queue, if applicable: */ +	link_handle_out_of_seq_msg(l_ptr, buf); +} + +/*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet   *  Owner node is locked.   */ -static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, -				struct sk_buff **buf) +static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, +					      struct sk_buff *t_buf)  { -	struct sk_buff *tunnel_buf = *buf; -	struct tipc_link *dest_link; +	struct tipc_msg *t_msg = buf_msg(t_buf); +	struct sk_buff *buf = NULL;  	struct tipc_msg *msg; -	struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); -	u32 msg_typ = msg_type(tunnel_msg); -	u32 msg_count = msg_msgcnt(tunnel_msg); -	u32 bearer_id = msg_bearer_id(tunnel_msg); -	if (bearer_id >= MAX_BEARERS) -		goto exit; -	dest_link = (*l_ptr)->owner->links[bearer_id]; -	if (!dest_link) -		goto exit; -	if (dest_link == *l_ptr) { -		pr_err("Unexpected changeover message on link <%s>\n", -		       (*l_ptr)->name); -		goto exit; -	} -	*l_ptr = dest_link; -	msg = msg_get_wrapped(tunnel_msg); +	if (tipc_link_is_up(l_ptr)) +		tipc_link_reset(l_ptr); -	if (msg_typ == DUPLICATE_MSG) { -		if (less(msg_seqno(msg), mod(dest_link->next_in_no))) -			goto exit; -		*buf = buf_extract(tunnel_buf, INT_H_SIZE); -		if (*buf == NULL) { -			pr_warn("%sduplicate msg dropped\n", link_co_err); +	/* First failover packet? */ +	if (l_ptr->exp_msg_count == START_CHANGEOVER) +		l_ptr->exp_msg_count = msg_msgcnt(t_msg); + +	/* Should there be an inner packet? */ +	if (l_ptr->exp_msg_count) { +		l_ptr->exp_msg_count--; +		buf = buf_extract(t_buf, INT_H_SIZE); +		if (buf == NULL) { +			pr_warn("%sno inner failover pkt\n", link_co_err);  			goto exit;  		} -		kfree_skb(tunnel_buf); -		return 1; -	} +		msg = buf_msg(buf); -	/* First original message ?: */ -	if (tipc_link_is_up(dest_link)) { -		pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg, -			dest_link->name); -		tipc_link_reset(dest_link); -		dest_link->exp_msg_count = msg_count; -		if (!msg_count) -			goto exit; -	} else if (dest_link->exp_msg_count == START_CHANGEOVER) { -		dest_link->exp_msg_count = msg_count; -		if (!msg_count) +		if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) { +			kfree_skb(buf); +			buf = NULL;  			goto exit; +		} +		if (msg_user(msg) == MSG_FRAGMENTER) { +			l_ptr->stats.recv_fragments++; +			tipc_link_frag_rcv(&l_ptr->reasm_head, +					   &l_ptr->reasm_tail, +					   &buf); +		}  	} +exit: +	if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) { +		tipc_node_detach_link(l_ptr->owner, l_ptr); +		kfree(l_ptr); +	} +	return buf; +} + +/*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent + *  via other link as result of a failover (ORIGINAL_MSG) or + *  a new active link (DUPLICATE_MSG). Failover packets are + *  returned to the active link for delivery upwards. + *  Owner node is locked. + */ +static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, +				struct sk_buff **buf) +{ +	struct sk_buff *t_buf = *buf; +	struct tipc_link *l_ptr; +	struct tipc_msg *t_msg = buf_msg(t_buf); +	u32 bearer_id = msg_bearer_id(t_msg); + +	*buf = NULL; -	/* Receive original message */ -	if (dest_link->exp_msg_count == 0) { -		pr_warn("%sgot too many tunnelled messages\n", link_co_err); +	if (bearer_id >= MAX_BEARERS)  		goto exit; -	} -	dest_link->exp_msg_count--; -	if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { + +	l_ptr = n_ptr->links[bearer_id]; +	if (!l_ptr)  		goto exit; -	} else { -		*buf = buf_extract(tunnel_buf, INT_H_SIZE); -		if (*buf != NULL) { -			kfree_skb(tunnel_buf); -			return 1; -		} else { -			pr_warn("%soriginal msg dropped\n", link_co_err); -		} -	} + +	if (msg_type(t_msg) == DUPLICATE_MSG) +		tipc_link_dup_rcv(l_ptr, t_buf); +	else if (msg_type(t_msg) == ORIGINAL_MSG) +		*buf = tipc_link_failover_rcv(l_ptr, t_buf); +	else +		pr_warn("%sunknown tunnel pkt received\n", link_co_err);  exit: -	*buf = NULL; -	kfree_skb(tunnel_buf); -	return 0; +	kfree_skb(t_buf); +	return *buf != NULL;  }  /*   *  Bundler functionality:   */ -void tipc_link_recv_bundle(struct sk_buff *buf) +void tipc_link_bundle_rcv(struct sk_buff *buf)  {  	u32 msgcount = msg_msgcnt(buf_msg(buf));  	u32 pos = INT_H_SIZE; @@ -2210,11 +2250,11 @@ void tipc_link_recv_bundle(struct sk_buff *buf)   */  /* - * link_send_long_buf: Entry for buffers needing fragmentation. + * tipc_link_frag_xmit: Entry for buffers needing fragmentation.   * The buffer is complete, inclusive total message length.   * Returns user data length.   */ -static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf) +static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)  {  	struct sk_buff *buf_chain = NULL;  	struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; @@ -2277,12 +2317,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)  	return dsz;  } -/* - * tipc_link_recv_fragment(): Called with node lock on. Returns +/* tipc_link_frag_rcv(): Called with node lock on. Returns   * the reassembled buffer if message is complete.   */ -int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail, -			    struct sk_buff **fbuf) +int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail, +		       struct sk_buff **fbuf)  {  	struct sk_buff *frag = *fbuf;  	struct tipc_msg *msg = buf_msg(frag); @@ -2296,6 +2335,7 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,  			goto out_free;  		*head = frag;  		skb_frag_list_init(*head); +		*fbuf = NULL;  		return 0;  	} else if (*head &&  		   skb_try_coalesce(*head, frag, &headstolen, &delta)) { @@ -2315,10 +2355,12 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,  		*tail = *head = NULL;  		return LINK_REASM_COMPLETE;  	} +	*fbuf = NULL;  	return 0;  out_free:  	pr_warn_ratelimited("Link unable to reassemble fragmented message\n");  	kfree_skb(*fbuf); +	*fbuf = NULL;  	return LINK_REASM_ERROR;  } @@ -2352,35 +2394,41 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)  	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;  } -/** - * link_find_link - locate link by name - * @name: ptr to link name string - * @node: ptr to area to be filled with ptr to associated node - * +/* tipc_link_find_owner - locate owner node of link by link's name + * @name: pointer to link name string + * @bearer_id: pointer to index in 'node->links' array where the link was found.   * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;   * this also prevents link deletion.   * - * Returns pointer to link (or 0 if invalid link name). + * Returns pointer to node owning the link, or 0 if no matching link is found.   */ -static struct tipc_link *link_find_link(const char *name, -					struct tipc_node **node) +static struct tipc_node *tipc_link_find_owner(const char *link_name, +					      unsigned int *bearer_id)  {  	struct tipc_link *l_ptr;  	struct tipc_node *n_ptr; +	struct tipc_node *found_node = 0;  	int i; -	list_for_each_entry(n_ptr, &tipc_node_list, list) { +	*bearer_id = 0; +	rcu_read_lock(); +	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { +		tipc_node_lock(n_ptr);  		for (i = 0; i < MAX_BEARERS; i++) {  			l_ptr = n_ptr->links[i]; -			if (l_ptr && !strcmp(l_ptr->name, name)) -				goto found; +			if (l_ptr && !strcmp(l_ptr->name, link_name)) { +				*bearer_id = i; +				found_node = n_ptr; +				break; +			}  		} +		tipc_node_unlock(n_ptr); +		if (found_node) +			break;  	} -	l_ptr = NULL; -	n_ptr = NULL; -found: -	*node = n_ptr; -	return l_ptr; +	rcu_read_unlock(); + +	return found_node;  }  /** @@ -2422,32 +2470,33 @@ static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)  	struct tipc_link *l_ptr;  	struct tipc_bearer *b_ptr;  	struct tipc_media *m_ptr; +	int bearer_id;  	int res = 0; -	l_ptr = link_find_link(name, &node); -	if (l_ptr) { -		/* -		 * acquire node lock for tipc_link_send_proto_msg(). -		 * see "TIPC locking policy" in net.c. -		 */ +	node = tipc_link_find_owner(name, &bearer_id); +	if (node) {  		tipc_node_lock(node); -		switch (cmd) { -		case TIPC_CMD_SET_LINK_TOL: -			link_set_supervision_props(l_ptr, new_value); -			tipc_link_send_proto_msg(l_ptr, -				STATE_MSG, 0, 0, new_value, 0, 0); -			break; -		case TIPC_CMD_SET_LINK_PRI: -			l_ptr->priority = new_value; -			tipc_link_send_proto_msg(l_ptr, -				STATE_MSG, 0, 0, 0, new_value, 0); -			break; -		case TIPC_CMD_SET_LINK_WINDOW: -			tipc_link_set_queue_limits(l_ptr, new_value); -			break; -		default: -			res = -EINVAL; -			break; +		l_ptr = node->links[bearer_id]; + +		if (l_ptr) { +			switch (cmd) { +			case TIPC_CMD_SET_LINK_TOL: +				link_set_supervision_props(l_ptr, new_value); +				tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, +						     new_value, 0, 0); +				break; +			case TIPC_CMD_SET_LINK_PRI: +				l_ptr->priority = new_value; +				tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, +						     0, new_value, 0); +				break; +			case TIPC_CMD_SET_LINK_WINDOW: +				tipc_link_set_queue_limits(l_ptr, new_value); +				break; +			default: +				res = -EINVAL; +				break; +			}  		}  		tipc_node_unlock(node);  		return res; @@ -2542,6 +2591,7 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_  	char *link_name;  	struct tipc_link *l_ptr;  	struct tipc_node *node; +	unsigned int bearer_id;  	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))  		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); @@ -2552,15 +2602,19 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_  			return tipc_cfg_reply_error_string("link not found");  		return tipc_cfg_reply_none();  	} -  	read_lock_bh(&tipc_net_lock); -	l_ptr = link_find_link(link_name, &node); -	if (!l_ptr) { +	node = tipc_link_find_owner(link_name, &bearer_id); +	if (!node) {  		read_unlock_bh(&tipc_net_lock);  		return tipc_cfg_reply_error_string("link not found");  	} -  	tipc_node_lock(node); +	l_ptr = node->links[bearer_id]; +	if (!l_ptr) { +		tipc_node_unlock(node); +		read_unlock_bh(&tipc_net_lock); +		return tipc_cfg_reply_error_string("link not found"); +	}  	link_reset_statistics(l_ptr);  	tipc_node_unlock(node);  	read_unlock_bh(&tipc_net_lock); @@ -2590,18 +2644,27 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)  	struct tipc_node *node;  	char *status;  	u32 profile_total = 0; +	unsigned int bearer_id;  	int ret;  	if (!strcmp(name, tipc_bclink_name))  		return tipc_bclink_stats(buf, buf_size);  	read_lock_bh(&tipc_net_lock); -	l = link_find_link(name, &node); -	if (!l) { +	node = tipc_link_find_owner(name, &bearer_id); +	if (!node) {  		read_unlock_bh(&tipc_net_lock);  		return 0;  	}  	tipc_node_lock(node); + +	l = node->links[bearer_id]; +	if (!l) { +		tipc_node_unlock(node); +		read_unlock_bh(&tipc_net_lock); +		return 0; +	} +  	s = &l->stats;  	if (tipc_link_is_active(l)) diff --git a/net/tipc/link.h b/net/tipc/link.h index 3b6aa65b608..8c0b49b5b2e 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -1,7 +1,7 @@  /*   * net/tipc/link.h: Include file for TIPC link code   * - * Copyright (c) 1995-2006, Ericsson AB + * Copyright (c) 1995-2006, 2013, Ericsson AB   * Copyright (c) 2004-2005, 2010-2011, Wind River Systems   * All rights reserved.   * @@ -40,27 +40,28 @@  #include "msg.h"  #include "node.h" -/* - * Link reassembly status codes +/* Link reassembly status codes   */  #define LINK_REASM_ERROR	-1  #define LINK_REASM_COMPLETE	1 -/* - * Out-of-range value for link sequence numbers +/* Out-of-range value for link sequence numbers   */  #define INVALID_LINK_SEQ 0x10000 -/* - * Link states +/* Link working states   */  #define WORKING_WORKING 560810u  #define WORKING_UNKNOWN 560811u  #define RESET_UNKNOWN   560812u  #define RESET_RESET     560813u -/* - * Starting value for maximum packet size negotiation on unicast links +/* Link endpoint execution states + */ +#define LINK_STARTED    0x0001 +#define LINK_STOPPED    0x0002 + +/* Starting value for maximum packet size negotiation on unicast links   * (unless bearer MTU is less)   */  #define MAX_PKT_DEFAULT 1500 @@ -102,8 +103,7 @@ struct tipc_stats {   * @media_addr: media address to use when sending messages over link   * @timer: link timer   * @owner: pointer to peer node - * @link_list: adjacent links in bearer's list of links - * @started: indicates if link has been started + * @flags: execution state flags for link endpoint instance   * @checkpoint: reference point for triggering link continuity checking   * @peer_session: link session # being used by peer end of link   * @peer_bearer_id: bearer id used by link's peer endpoint @@ -149,10 +149,9 @@ struct tipc_link {  	struct tipc_media_addr media_addr;  	struct timer_list timer;  	struct tipc_node *owner; -	struct list_head link_list;  	/* Management and link supervision data */ -	int started; +	unsigned int flags;  	u32 checkpoint;  	u32 peer_session;  	u32 peer_bearer_id; @@ -215,10 +214,9 @@ struct tipc_port;  struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,  			      struct tipc_bearer *b_ptr,  			      const struct tipc_media_addr *media_addr); -void tipc_link_delete(struct tipc_link *l_ptr); +void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down);  void tipc_link_failover_send_queue(struct tipc_link *l_ptr); -void tipc_link_dup_send_queue(struct tipc_link *l_ptr, -			      struct tipc_link *dest); +void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest);  void tipc_link_reset_fragments(struct tipc_link *l_ptr);  int tipc_link_is_up(struct tipc_link *l_ptr);  int tipc_link_is_active(struct tipc_link *l_ptr); @@ -231,23 +229,24 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area,  struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,  					  int req_tlv_space);  void tipc_link_reset(struct tipc_link *l_ptr); -int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector); -void tipc_link_send_names(struct list_head *message_list, u32 dest); +void tipc_link_reset_list(unsigned int bearer_id); +int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); +void tipc_link_names_xmit(struct list_head *message_list, u32 dest); +int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);  int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);  u32 tipc_link_get_max_pkt(u32 dest, u32 selector); -int tipc_link_send_sections_fast(struct tipc_port *sender, -				 struct iovec const *msg_sect, -				 unsigned int len, u32 destnode); -void tipc_link_recv_bundle(struct sk_buff *buf); -int  tipc_link_recv_fragment(struct sk_buff **reasm_head, -			     struct sk_buff **reasm_tail, -			     struct sk_buff **fbuf); -void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob, -			      u32 gap, u32 tolerance, u32 priority, -			      u32 acked_mtu); +int tipc_link_iovec_xmit_fast(struct tipc_port *sender, +			      struct iovec const *msg_sect, +			      unsigned int len, u32 destnode); +void tipc_link_bundle_rcv(struct sk_buff *buf); +int tipc_link_frag_rcv(struct sk_buff **reasm_head, +		       struct sk_buff **reasm_tail, +		       struct sk_buff **fbuf); +void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, +			  u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);  void tipc_link_push_queue(struct tipc_link *l_ptr);  u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, -		   struct sk_buff *buf); +			struct sk_buff *buf);  void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all);  void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);  void tipc_link_retransmit(struct tipc_link *l_ptr, diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index e0d08055754..aff8041dc15 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -131,16 +131,24 @@ static void named_cluster_distribute(struct sk_buff *buf)  {  	struct sk_buff *buf_copy;  	struct tipc_node *n_ptr; +	struct tipc_link *l_ptr; -	list_for_each_entry(n_ptr, &tipc_node_list, list) { -		if (tipc_node_active_links(n_ptr)) { +	rcu_read_lock(); +	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { +		spin_lock_bh(&n_ptr->lock); +		l_ptr = n_ptr->active_links[n_ptr->addr & 1]; +		if (l_ptr) {  			buf_copy = skb_copy(buf, GFP_ATOMIC); -			if (!buf_copy) +			if (!buf_copy) { +				spin_unlock_bh(&n_ptr->lock);  				break; +			}  			msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); -			tipc_link_send(buf_copy, n_ptr->addr, n_ptr->addr); +			__tipc_link_xmit(l_ptr, buf_copy);  		} +		spin_unlock_bh(&n_ptr->lock);  	} +	rcu_read_unlock();  	kfree_skb(buf);  } @@ -262,7 +270,7 @@ void tipc_named_node_up(unsigned long nodearg)  	named_distribute(&message_list, node, &publ_zone, max_item_buf);  	read_unlock_bh(&tipc_nametbl_lock); -	tipc_link_send_names(&message_list, node); +	tipc_link_names_xmit(&message_list, node);  }  /** @@ -293,9 +301,9 @@ static void named_purge_publ(struct publication *publ)  }  /** - * tipc_named_recv - process name table update message sent by another node + * tipc_named_rcv - process name table update message sent by another node   */ -void tipc_named_recv(struct sk_buff *buf) +void tipc_named_rcv(struct sk_buff *buf)  {  	struct publication *publ;  	struct tipc_msg *msg = buf_msg(buf); diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h index 1e41bdd4f25..9b312ccfd43 100644 --- a/net/tipc/name_distr.h +++ b/net/tipc/name_distr.h @@ -42,7 +42,7 @@  void tipc_named_publish(struct publication *publ);  void tipc_named_withdraw(struct publication *publ);  void tipc_named_node_up(unsigned long node); -void tipc_named_recv(struct sk_buff *buf); +void tipc_named_rcv(struct sk_buff *buf);  void tipc_named_reinit(void);  #endif diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 92a1533af4e..042e8e3cabc 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -941,20 +941,48 @@ int tipc_nametbl_init(void)  	return 0;  } -void tipc_nametbl_stop(void) +/** + * tipc_purge_publications - remove all publications for a given type + * + * tipc_nametbl_lock must be held when calling this function + */ +static void tipc_purge_publications(struct name_seq *seq)  { -	u32 i; +	struct publication *publ, *safe; +	struct sub_seq *sseq; +	struct name_info *info; -	if (!table.types) +	if (!seq->sseqs) { +		nameseq_delete_empty(seq);  		return; +	} +	sseq = seq->sseqs; +	info = sseq->info; +	list_for_each_entry_safe(publ, safe, &info->zone_list, zone_list) { +		tipc_nametbl_remove_publ(publ->type, publ->lower, publ->node, +					 publ->ref, publ->key); +	} +} + +void tipc_nametbl_stop(void) +{ +	u32 i; +	struct name_seq *seq; +	struct hlist_head *seq_head; +	struct hlist_node *safe; -	/* Verify name table is empty, then release it */ +	/* Verify name table is empty and purge any lingering +	 * publications, then release the name table +	 */  	write_lock_bh(&tipc_nametbl_lock);  	for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {  		if (hlist_empty(&table.types[i]))  			continue; -		pr_err("nametbl_stop(): orphaned hash chain detected\n"); -		break; +		seq_head = &table.types[i]; +		hlist_for_each_entry_safe(seq, safe, seq_head, ns_list) { +			tipc_purge_publications(seq); +		} +		continue;  	}  	kfree(table.types);  	table.types = NULL; diff --git a/net/tipc/net.c b/net/tipc/net.c index 7d305ecc09c..0374a817631 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -146,19 +146,19 @@ void tipc_net_route_msg(struct sk_buff *buf)  	if (tipc_in_scope(dnode, tipc_own_addr)) {  		if (msg_isdata(msg)) {  			if (msg_mcast(msg)) -				tipc_port_recv_mcast(buf, NULL); +				tipc_port_mcast_rcv(buf, NULL);  			else if (msg_destport(msg)) -				tipc_port_recv_msg(buf); +				tipc_port_rcv(buf);  			else  				net_route_named_msg(buf);  			return;  		}  		switch (msg_user(msg)) {  		case NAME_DISTRIBUTOR: -			tipc_named_recv(buf); +			tipc_named_rcv(buf);  			break;  		case CONN_MANAGER: -			tipc_port_recv_proto_msg(buf); +			tipc_port_proto_rcv(buf);  			break;  		default:  			kfree_skb(buf); @@ -168,7 +168,7 @@ void tipc_net_route_msg(struct sk_buff *buf)  	/* Handle message for another node */  	skb_trim(buf, msg_size(msg)); -	tipc_link_send(buf, dnode, msg_link_selector(msg)); +	tipc_link_xmit(buf, dnode, msg_link_selector(msg));  }  void tipc_net_start(u32 addr) @@ -182,8 +182,6 @@ void tipc_net_start(u32 addr)  	tipc_bclink_init();  	write_unlock_bh(&tipc_net_lock); -	tipc_cfg_reinit(); -  	pr_info("Started in network mode\n");  	pr_info("Own node address %s, network identity %u\n",  		tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id); @@ -191,15 +189,14 @@ void tipc_net_start(u32 addr)  void tipc_net_stop(void)  { -	struct tipc_node *node, *t_node; -  	if (!tipc_own_addr)  		return; +  	write_lock_bh(&tipc_net_lock);  	tipc_bearer_stop();  	tipc_bclink_stop(); -	list_for_each_entry_safe(node, t_node, &tipc_node_list, list) -		tipc_node_delete(node); +	tipc_node_stop();  	write_unlock_bh(&tipc_net_lock); +  	pr_info("Left network mode\n");  } diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c index 9f72a637636..3aaf73de9e2 100644 --- a/net/tipc/netlink.c +++ b/net/tipc/netlink.c @@ -83,8 +83,6 @@ static struct genl_ops tipc_genl_ops[] = {  	},  }; -static int tipc_genl_family_registered; -  int tipc_netlink_start(void)  {  	int res; @@ -94,16 +92,10 @@ int tipc_netlink_start(void)  		pr_err("Failed to register netlink interface\n");  		return res;  	} - -	tipc_genl_family_registered = 1;  	return 0;  }  void tipc_netlink_stop(void)  { -	if (!tipc_genl_family_registered) -		return; -  	genl_unregister_family(&tipc_genl_family); -	tipc_genl_family_registered = 0;  } diff --git a/net/tipc/node.c b/net/tipc/node.c index efe4d41bf11..1d3a4999a70 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -2,7 +2,7 @@   * net/tipc/node.c: TIPC node management routines   *   * Copyright (c) 2000-2006, 2012 Ericsson AB - * Copyright (c) 2005-2006, 2010-2011, Wind River Systems + * Copyright (c) 2005-2006, 2010-2014, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -44,13 +44,11 @@  static void node_lost_contact(struct tipc_node *n_ptr);  static void node_established_contact(struct tipc_node *n_ptr); -static DEFINE_SPINLOCK(node_create_lock); -  static struct hlist_head node_htable[NODE_HTABLE_SIZE];  LIST_HEAD(tipc_node_list);  static u32 tipc_num_nodes; - -static atomic_t tipc_num_links = ATOMIC_INIT(0); +static u32 tipc_num_links; +static DEFINE_SPINLOCK(node_list_lock);  /*   * A trivial power-of-two bitmask technique is used for speed, since this @@ -73,37 +71,26 @@ struct tipc_node *tipc_node_find(u32 addr)  	if (unlikely(!in_own_cluster_exact(addr)))  		return NULL; -	hlist_for_each_entry(node, &node_htable[tipc_hashfn(addr)], hash) { -		if (node->addr == addr) +	rcu_read_lock(); +	hlist_for_each_entry_rcu(node, &node_htable[tipc_hashfn(addr)], hash) { +		if (node->addr == addr) { +			rcu_read_unlock();  			return node; +		}  	} +	rcu_read_unlock();  	return NULL;  } -/** - * tipc_node_create - create neighboring node - * - * Currently, this routine is called by neighbor discovery code, which holds - * net_lock for reading only.  We must take node_create_lock to ensure a node - * isn't created twice if two different bearers discover the node at the same - * time.  (It would be preferable to switch to holding net_lock in write mode, - * but this is a non-trivial change.) - */  struct tipc_node *tipc_node_create(u32 addr)  {  	struct tipc_node *n_ptr, *temp_node; -	spin_lock_bh(&node_create_lock); - -	n_ptr = tipc_node_find(addr); -	if (n_ptr) { -		spin_unlock_bh(&node_create_lock); -		return n_ptr; -	} +	spin_lock_bh(&node_list_lock);  	n_ptr = kzalloc(sizeof(*n_ptr), GFP_ATOMIC);  	if (!n_ptr) { -		spin_unlock_bh(&node_create_lock); +		spin_unlock_bh(&node_list_lock);  		pr_warn("Node creation failed, no memory\n");  		return NULL;  	} @@ -114,31 +101,41 @@ struct tipc_node *tipc_node_create(u32 addr)  	INIT_LIST_HEAD(&n_ptr->list);  	INIT_LIST_HEAD(&n_ptr->nsub); -	hlist_add_head(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); +	hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); -	list_for_each_entry(temp_node, &tipc_node_list, list) { +	list_for_each_entry_rcu(temp_node, &tipc_node_list, list) {  		if (n_ptr->addr < temp_node->addr)  			break;  	} -	list_add_tail(&n_ptr->list, &temp_node->list); +	list_add_tail_rcu(&n_ptr->list, &temp_node->list);  	n_ptr->block_setup = WAIT_PEER_DOWN;  	n_ptr->signature = INVALID_NODE_SIG;  	tipc_num_nodes++; -	spin_unlock_bh(&node_create_lock); +	spin_unlock_bh(&node_list_lock);  	return n_ptr;  } -void tipc_node_delete(struct tipc_node *n_ptr) +static void tipc_node_delete(struct tipc_node *n_ptr)  { -	list_del(&n_ptr->list); -	hlist_del(&n_ptr->hash); -	kfree(n_ptr); +	list_del_rcu(&n_ptr->list); +	hlist_del_rcu(&n_ptr->hash); +	kfree_rcu(n_ptr, rcu);  	tipc_num_nodes--;  } +void tipc_node_stop(void) +{ +	struct tipc_node *node, *t_node; + +	spin_lock_bh(&node_list_lock); +	list_for_each_entry_safe(node, t_node, &tipc_node_list, list) +		tipc_node_delete(node); +	spin_unlock_bh(&node_list_lock); +} +  /**   * tipc_node_link_up - handle addition of link   * @@ -162,7 +159,7 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr)  		pr_info("New link <%s> becomes standby\n", l_ptr->name);  		return;  	} -	tipc_link_dup_send_queue(active[0], l_ptr); +	tipc_link_dup_queue_xmit(active[0], l_ptr);  	if (l_ptr->priority == active[0]->priority) {  		active[0] = l_ptr;  		return; @@ -243,15 +240,25 @@ int tipc_node_is_up(struct tipc_node *n_ptr)  void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)  {  	n_ptr->links[l_ptr->b_ptr->identity] = l_ptr; -	atomic_inc(&tipc_num_links); +	spin_lock_bh(&node_list_lock); +	tipc_num_links++; +	spin_unlock_bh(&node_list_lock);  	n_ptr->link_cnt++;  }  void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)  { -	n_ptr->links[l_ptr->b_ptr->identity] = NULL; -	atomic_dec(&tipc_num_links); -	n_ptr->link_cnt--; +	int i; + +	for (i = 0; i < MAX_BEARERS; i++) { +		if (l_ptr != n_ptr->links[i]) +			continue; +		n_ptr->links[i] = NULL; +		spin_lock_bh(&node_list_lock); +		tipc_num_links--; +		spin_unlock_bh(&node_list_lock); +		n_ptr->link_cnt--; +	}  }  static void node_established_contact(struct tipc_node *n_ptr) @@ -335,27 +342,28 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)  		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE  						   " (network address)"); -	read_lock_bh(&tipc_net_lock); +	spin_lock_bh(&node_list_lock);  	if (!tipc_num_nodes) { -		read_unlock_bh(&tipc_net_lock); +		spin_unlock_bh(&node_list_lock);  		return tipc_cfg_reply_none();  	}  	/* For now, get space for all other nodes */  	payload_size = TLV_SPACE(sizeof(node_info)) * tipc_num_nodes;  	if (payload_size > 32768u) { -		read_unlock_bh(&tipc_net_lock); +		spin_unlock_bh(&node_list_lock);  		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  						   " (too many nodes)");  	} +	spin_unlock_bh(&node_list_lock); +  	buf = tipc_cfg_reply_alloc(payload_size); -	if (!buf) { -		read_unlock_bh(&tipc_net_lock); +	if (!buf)  		return NULL; -	}  	/* Add TLVs for all nodes in scope */ -	list_for_each_entry(n_ptr, &tipc_node_list, list) { +	rcu_read_lock(); +	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {  		if (!tipc_in_scope(domain, n_ptr->addr))  			continue;  		node_info.addr = htonl(n_ptr->addr); @@ -363,8 +371,7 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)  		tipc_cfg_append_tlv(buf, TIPC_TLV_NODE_INFO,  				    &node_info, sizeof(node_info));  	} - -	read_unlock_bh(&tipc_net_lock); +	rcu_read_unlock();  	return buf;  } @@ -387,21 +394,19 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)  	if (!tipc_own_addr)  		return tipc_cfg_reply_none(); -	read_lock_bh(&tipc_net_lock); - +	spin_lock_bh(&node_list_lock);  	/* Get space for all unicast links + broadcast link */ -	payload_size = TLV_SPACE(sizeof(link_info)) * -		(atomic_read(&tipc_num_links) + 1); +	payload_size = TLV_SPACE((sizeof(link_info)) * (tipc_num_links + 1));  	if (payload_size > 32768u) { -		read_unlock_bh(&tipc_net_lock); +		spin_unlock_bh(&node_list_lock);  		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  						   " (too many links)");  	} +	spin_unlock_bh(&node_list_lock); +  	buf = tipc_cfg_reply_alloc(payload_size); -	if (!buf) { -		read_unlock_bh(&tipc_net_lock); +	if (!buf)  		return NULL; -	}  	/* Add TLV for broadcast link */  	link_info.dest = htonl(tipc_cluster_mask(tipc_own_addr)); @@ -410,7 +415,8 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)  	tipc_cfg_append_tlv(buf, TIPC_TLV_LINK_INFO, &link_info, sizeof(link_info));  	/* Add TLVs for any other links in scope */ -	list_for_each_entry(n_ptr, &tipc_node_list, list) { +	rcu_read_lock(); +	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {  		u32 i;  		if (!tipc_in_scope(domain, n_ptr->addr)) @@ -427,7 +433,6 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)  		}  		tipc_node_unlock(n_ptr);  	} - -	read_unlock_bh(&tipc_net_lock); +	rcu_read_unlock();  	return buf;  } diff --git a/net/tipc/node.h b/net/tipc/node.h index 63e2e8ead2f..7cbb8cec1a9 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -2,7 +2,7 @@   * net/tipc/node.h: Include file for TIPC node management routines   *   * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005, 2010-2011, Wind River Systems + * Copyright (c) 2005, 2010-2014, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -66,6 +66,7 @@   * @link_cnt: number of links to node   * @signature: node instance identifier   * @bclink: broadcast-related info + * @rcu: rcu struct for tipc_node   *    @acked: sequence # of last outbound b'cast message acknowledged by node   *    @last_in: sequence # of last in-sequence b'cast message received from node   *    @last_sent: sequence # of last b'cast message sent by node @@ -89,6 +90,7 @@ struct tipc_node {  	int working_links;  	int block_setup;  	u32 signature; +	struct rcu_head rcu;  	struct {  		u32 acked;  		u32 last_in; @@ -107,7 +109,7 @@ extern struct list_head tipc_node_list;  struct tipc_node *tipc_node_find(u32 addr);  struct tipc_node *tipc_node_create(u32 addr); -void tipc_node_delete(struct tipc_node *n_ptr); +void tipc_node_stop(void);  void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);  void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);  void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr); diff --git a/net/tipc/port.c b/net/tipc/port.c index b742b265452..5c14c7801ee 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -1,7 +1,7 @@  /*   * net/tipc/port.c: TIPC port code   * - * Copyright (c) 1992-2007, Ericsson AB + * Copyright (c) 1992-2007, 2014, Ericsson AB   * Copyright (c) 2004-2008, 2010-2013, Wind River Systems   * All rights reserved.   * @@ -38,6 +38,7 @@  #include "config.h"  #include "port.h"  #include "name_table.h" +#include "socket.h"  /* Connection management: */  #define PROBING_INTERVAL 3600000	/* [ms] => 1 h */ @@ -54,17 +55,6 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);  static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);  static void port_timeout(unsigned long ref); - -static u32 port_peernode(struct tipc_port *p_ptr) -{ -	return msg_destnode(&p_ptr->phdr); -} - -static u32 port_peerport(struct tipc_port *p_ptr) -{ -	return msg_destport(&p_ptr->phdr); -} -  /**   * tipc_port_peer_msg - verify message was sent by connected port's peer   * @@ -76,33 +66,32 @@ int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)  	u32 peernode;  	u32 orignode; -	if (msg_origport(msg) != port_peerport(p_ptr)) +	if (msg_origport(msg) != tipc_port_peerport(p_ptr))  		return 0;  	orignode = msg_orignode(msg); -	peernode = port_peernode(p_ptr); +	peernode = tipc_port_peernode(p_ptr);  	return (orignode == peernode) ||  		(!orignode && (peernode == tipc_own_addr)) ||  		(!peernode && (orignode == tipc_own_addr));  }  /** - * tipc_multicast - send a multicast message to local and remote destinations + * tipc_port_mcast_xmit - send a multicast message to local and remote + * destinations   */ -int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, -		   struct iovec const *msg_sect, unsigned int len) +int tipc_port_mcast_xmit(struct tipc_port *oport, +			 struct tipc_name_seq const *seq, +			 struct iovec const *msg_sect, +			 unsigned int len)  {  	struct tipc_msg *hdr;  	struct sk_buff *buf;  	struct sk_buff *ibuf = NULL;  	struct tipc_port_list dports = {0, NULL, }; -	struct tipc_port *oport = tipc_port_deref(ref);  	int ext_targets;  	int res; -	if (unlikely(!oport)) -		return -EINVAL; -  	/* Create multicast message */  	hdr = &oport->phdr;  	msg_set_type(hdr, TIPC_MCAST_MSG); @@ -131,7 +120,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,  				return -ENOMEM;  			}  		} -		res = tipc_bclink_send_msg(buf); +		res = tipc_bclink_xmit(buf);  		if ((res < 0) && (dports.count != 0))  			kfree_skb(ibuf);  	} else { @@ -140,7 +129,7 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,  	if (res >= 0) {  		if (ibuf) -			tipc_port_recv_mcast(ibuf, &dports); +			tipc_port_mcast_rcv(ibuf, &dports);  	} else {  		tipc_port_list_free(&dports);  	} @@ -148,11 +137,11 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,  }  /** - * tipc_port_recv_mcast - deliver multicast message to all destination ports + * tipc_port_mcast_rcv - deliver multicast message to all destination ports   *   * If there is no port list, perform a lookup to create one   */ -void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp) +void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)  {  	struct tipc_msg *msg;  	struct tipc_port_list dports = {0, NULL, }; @@ -176,7 +165,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)  		msg_set_destnode(msg, tipc_own_addr);  		if (dp->count == 1) {  			msg_set_destport(msg, dp->ports[0]); -			tipc_port_recv_msg(buf); +			tipc_port_rcv(buf);  			tipc_port_list_free(dp);  			return;  		} @@ -191,7 +180,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)  			if ((index == 0) && (cnt != 0))  				item = item->next;  			msg_set_destport(buf_msg(b), item->ports[index]); -			tipc_port_recv_msg(b); +			tipc_port_rcv(b);  		}  	}  exit: @@ -199,40 +188,32 @@ exit:  	tipc_port_list_free(dp);  } -/** - * tipc_createport - create a generic TIPC port + +void tipc_port_wakeup(struct tipc_port *port) +{ +	tipc_sock_wakeup(tipc_port_to_sock(port)); +} + +/* tipc_port_init - intiate TIPC port and lock it   * - * Returns pointer to (locked) TIPC port, or NULL if unable to create it + * Returns obtained reference if initialization is successful, zero otherwise   */ -struct tipc_port *tipc_createport(struct sock *sk, -				  u32 (*dispatcher)(struct tipc_port *, -				  struct sk_buff *), -				  void (*wakeup)(struct tipc_port *), -				  const u32 importance) +u32 tipc_port_init(struct tipc_port *p_ptr, +		   const unsigned int importance)  { -	struct tipc_port *p_ptr;  	struct tipc_msg *msg;  	u32 ref; -	p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); -	if (!p_ptr) { -		pr_warn("Port creation failed, no memory\n"); -		return NULL; -	}  	ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);  	if (!ref) { -		pr_warn("Port creation failed, ref. table exhausted\n"); -		kfree(p_ptr); -		return NULL; +		pr_warn("Port registration failed, ref. table exhausted\n"); +		return 0;  	} -	p_ptr->sk = sk;  	p_ptr->max_pkt = MAX_PKT_DEFAULT;  	p_ptr->ref = ref;  	INIT_LIST_HEAD(&p_ptr->wait_list);  	INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); -	p_ptr->dispatcher = dispatcher; -	p_ptr->wakeup = wakeup;  	k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);  	INIT_LIST_HEAD(&p_ptr->publications);  	INIT_LIST_HEAD(&p_ptr->port_list); @@ -248,10 +229,10 @@ struct tipc_port *tipc_createport(struct sock *sk,  	msg_set_origport(msg, ref);  	list_add_tail(&p_ptr->port_list, &ports);  	spin_unlock_bh(&tipc_port_list_lock); -	return p_ptr; +	return ref;  } -int tipc_deleteport(struct tipc_port *p_ptr) +void tipc_port_destroy(struct tipc_port *p_ptr)  {  	struct sk_buff *buf = NULL; @@ -272,67 +253,7 @@ int tipc_deleteport(struct tipc_port *p_ptr)  	list_del(&p_ptr->wait_list);  	spin_unlock_bh(&tipc_port_list_lock);  	k_term_timer(&p_ptr->timer); -	kfree(p_ptr);  	tipc_net_route_msg(buf); -	return 0; -} - -static int port_unreliable(struct tipc_port *p_ptr) -{ -	return msg_src_droppable(&p_ptr->phdr); -} - -int tipc_portunreliable(u32 ref, unsigned int *isunreliable) -{ -	struct tipc_port *p_ptr; - -	p_ptr = tipc_port_lock(ref); -	if (!p_ptr) -		return -EINVAL; -	*isunreliable = port_unreliable(p_ptr); -	tipc_port_unlock(p_ptr); -	return 0; -} - -int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) -{ -	struct tipc_port *p_ptr; - -	p_ptr = tipc_port_lock(ref); -	if (!p_ptr) -		return -EINVAL; -	msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0)); -	tipc_port_unlock(p_ptr); -	return 0; -} - -static int port_unreturnable(struct tipc_port *p_ptr) -{ -	return msg_dest_droppable(&p_ptr->phdr); -} - -int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) -{ -	struct tipc_port *p_ptr; - -	p_ptr = tipc_port_lock(ref); -	if (!p_ptr) -		return -EINVAL; -	*isunrejectable = port_unreturnable(p_ptr); -	tipc_port_unlock(p_ptr); -	return 0; -} - -int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) -{ -	struct tipc_port *p_ptr; - -	p_ptr = tipc_port_lock(ref); -	if (!p_ptr) -		return -EINVAL; -	msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0)); -	tipc_port_unlock(p_ptr); -	return 0;  }  /* @@ -350,8 +271,8 @@ static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,  	if (buf) {  		msg = buf_msg(buf);  		tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE, -			      port_peernode(p_ptr)); -		msg_set_destport(msg, port_peerport(p_ptr)); +			      tipc_port_peernode(p_ptr)); +		msg_set_destport(msg, tipc_port_peerport(p_ptr));  		msg_set_origport(msg, p_ptr->ref);  		msg_set_msgcnt(msg, ack);  	} @@ -422,17 +343,17 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)  	/* send returned message & dispose of rejected message */  	src_node = msg_prevnode(msg);  	if (in_own_node(src_node)) -		tipc_port_recv_msg(rbuf); +		tipc_port_rcv(rbuf);  	else -		tipc_link_send(rbuf, src_node, msg_link_selector(rmsg)); +		tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));  exit:  	kfree_skb(buf);  	return data_sz;  } -int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, -			      struct iovec const *msg_sect, unsigned int len, -			      int err) +int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr, +			   struct iovec const *msg_sect, unsigned int len, +			   int err)  {  	struct sk_buff *buf;  	int res; @@ -519,7 +440,7 @@ static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 er  	return buf;  } -void tipc_port_recv_proto_msg(struct sk_buff *buf) +void tipc_port_proto_rcv(struct sk_buff *buf)  {  	struct tipc_msg *msg = buf_msg(buf);  	struct tipc_port *p_ptr; @@ -547,13 +468,12 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)  	/* Process protocol message sent by peer */  	switch (msg_type(msg)) {  	case CONN_ACK: -		wakeable = tipc_port_congested(p_ptr) && p_ptr->congested && -			p_ptr->wakeup; +		wakeable = tipc_port_congested(p_ptr) && p_ptr->congested;  		p_ptr->acked += msg_msgcnt(msg);  		if (!tipc_port_congested(p_ptr)) {  			p_ptr->congested = 0;  			if (wakeable) -				p_ptr->wakeup(p_ptr); +				tipc_port_wakeup(p_ptr);  		}  		break;  	case CONN_PROBE: @@ -584,8 +504,8 @@ static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)  		ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref);  	if (p_ptr->connected) { -		u32 dport = port_peerport(p_ptr); -		u32 destnode = port_peernode(p_ptr); +		u32 dport = tipc_port_peerport(p_ptr); +		u32 destnode = tipc_port_peernode(p_ptr);  		ret += tipc_snprintf(buf + ret, len - ret,  				     " connected to <%u.%u.%u:%u>", @@ -673,34 +593,6 @@ void tipc_acknowledge(u32 ref, u32 ack)  	tipc_net_route_msg(buf);  } -int tipc_portimportance(u32 ref, unsigned int *importance) -{ -	struct tipc_port *p_ptr; - -	p_ptr = tipc_port_lock(ref); -	if (!p_ptr) -		return -EINVAL; -	*importance = (unsigned int)msg_importance(&p_ptr->phdr); -	tipc_port_unlock(p_ptr); -	return 0; -} - -int tipc_set_portimportance(u32 ref, unsigned int imp) -{ -	struct tipc_port *p_ptr; - -	if (imp > TIPC_CRITICAL_IMPORTANCE) -		return -EINVAL; - -	p_ptr = tipc_port_lock(ref); -	if (!p_ptr) -		return -EINVAL; -	msg_set_importance(&p_ptr->phdr, (u32)imp); -	tipc_port_unlock(p_ptr); -	return 0; -} - -  int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,  		 struct tipc_name_seq const *seq)  { @@ -760,7 +652,7 @@ int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope,  	return res;  } -int tipc_connect(u32 ref, struct tipc_portid const *peer) +int tipc_port_connect(u32 ref, struct tipc_portid const *peer)  {  	struct tipc_port *p_ptr;  	int res; @@ -768,17 +660,17 @@ int tipc_connect(u32 ref, struct tipc_portid const *peer)  	p_ptr = tipc_port_lock(ref);  	if (!p_ptr)  		return -EINVAL; -	res = __tipc_connect(ref, p_ptr, peer); +	res = __tipc_port_connect(ref, p_ptr, peer);  	tipc_port_unlock(p_ptr);  	return res;  }  /* - * __tipc_connect - connect to a remote peer + * __tipc_port_connect - connect to a remote peer   *   * Port must be locked.   */ -int __tipc_connect(u32 ref, struct tipc_port *p_ptr, +int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,  			struct tipc_portid const *peer)  {  	struct tipc_msg *msg; @@ -815,7 +707,7 @@ exit:   *   * Port must be locked.   */ -int __tipc_disconnect(struct tipc_port *tp_ptr) +int __tipc_port_disconnect(struct tipc_port *tp_ptr)  {  	if (tp_ptr->connected) {  		tp_ptr->connected = 0; @@ -828,10 +720,10 @@ int __tipc_disconnect(struct tipc_port *tp_ptr)  }  /* - * tipc_disconnect(): Disconnect port form peer. + * tipc_port_disconnect(): Disconnect port form peer.   *                    This is a node local operation.   */ -int tipc_disconnect(u32 ref) +int tipc_port_disconnect(u32 ref)  {  	struct tipc_port *p_ptr;  	int res; @@ -839,15 +731,15 @@ int tipc_disconnect(u32 ref)  	p_ptr = tipc_port_lock(ref);  	if (!p_ptr)  		return -EINVAL; -	res = __tipc_disconnect(p_ptr); +	res = __tipc_port_disconnect(p_ptr);  	tipc_port_unlock(p_ptr);  	return res;  }  /* - * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect + * tipc_port_shutdown(): Send a SHUTDOWN msg to peer and disconnect   */ -int tipc_shutdown(u32 ref) +int tipc_port_shutdown(u32 ref)  {  	struct tipc_port *p_ptr;  	struct sk_buff *buf = NULL; @@ -859,13 +751,13 @@ int tipc_shutdown(u32 ref)  	buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);  	tipc_port_unlock(p_ptr);  	tipc_net_route_msg(buf); -	return tipc_disconnect(ref); +	return tipc_port_disconnect(ref);  }  /** - * tipc_port_recv_msg - receive message from lower layer and deliver to port user + * tipc_port_rcv - receive message from lower layer and deliver to port user   */ -int tipc_port_recv_msg(struct sk_buff *buf) +int tipc_port_rcv(struct sk_buff *buf)  {  	struct tipc_port *p_ptr;  	struct tipc_msg *msg = buf_msg(buf); @@ -882,7 +774,7 @@ int tipc_port_recv_msg(struct sk_buff *buf)  	/* validate destination & pass to port, otherwise reject message */  	p_ptr = tipc_port_lock(destport);  	if (likely(p_ptr)) { -		err = p_ptr->dispatcher(p_ptr, buf); +		err = tipc_sk_rcv(&tipc_port_to_sock(p_ptr)->sk, buf);  		tipc_port_unlock(p_ptr);  		if (likely(!err))  			return dsz; @@ -894,43 +786,43 @@ int tipc_port_recv_msg(struct sk_buff *buf)  }  /* - *  tipc_port_recv_sections(): Concatenate and deliver sectioned - *                        message for this node. + *  tipc_port_iovec_rcv: Concatenate and deliver sectioned + *                       message for this node.   */ -static int tipc_port_recv_sections(struct tipc_port *sender, -				   struct iovec const *msg_sect, -				   unsigned int len) +static int tipc_port_iovec_rcv(struct tipc_port *sender, +			       struct iovec const *msg_sect, +			       unsigned int len)  {  	struct sk_buff *buf;  	int res;  	res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);  	if (likely(buf)) -		tipc_port_recv_msg(buf); +		tipc_port_rcv(buf);  	return res;  }  /**   * tipc_send - send message sections on connection   */ -int tipc_send(u32 ref, struct iovec const *msg_sect, unsigned int len) +int tipc_send(struct tipc_port *p_ptr, +	      struct iovec const *msg_sect, +	      unsigned int len)  { -	struct tipc_port *p_ptr;  	u32 destnode;  	int res; -	p_ptr = tipc_port_deref(ref); -	if (!p_ptr || !p_ptr->connected) +	if (!p_ptr->connected)  		return -EINVAL;  	p_ptr->congested = 1;  	if (!tipc_port_congested(p_ptr)) { -		destnode = port_peernode(p_ptr); +		destnode = tipc_port_peernode(p_ptr);  		if (likely(!in_own_node(destnode))) -			res = tipc_link_send_sections_fast(p_ptr, msg_sect, -							   len, destnode); +			res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, +							destnode);  		else -			res = tipc_port_recv_sections(p_ptr, msg_sect, len); +			res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);  		if (likely(res != -ELINKCONG)) {  			p_ptr->congested = 0; @@ -939,7 +831,7 @@ int tipc_send(u32 ref, struct iovec const *msg_sect, unsigned int len)  			return res;  		}  	} -	if (port_unreliable(p_ptr)) { +	if (tipc_port_unreliable(p_ptr)) {  		p_ptr->congested = 0;  		return len;  	} @@ -949,17 +841,18 @@ int tipc_send(u32 ref, struct iovec const *msg_sect, unsigned int len)  /**   * tipc_send2name - send message sections to port name   */ -int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain, -		   struct iovec const *msg_sect, unsigned int len) +int tipc_send2name(struct tipc_port *p_ptr, +		   struct tipc_name const *name, +		   unsigned int domain, +		   struct iovec const *msg_sect, +		   unsigned int len)  { -	struct tipc_port *p_ptr;  	struct tipc_msg *msg;  	u32 destnode = domain;  	u32 destport;  	int res; -	p_ptr = tipc_port_deref(ref); -	if (!p_ptr || p_ptr->connected) +	if (p_ptr->connected)  		return -EINVAL;  	msg = &p_ptr->phdr; @@ -974,39 +867,39 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,  	if (likely(destport || destnode)) {  		if (likely(in_own_node(destnode))) -			res = tipc_port_recv_sections(p_ptr, msg_sect, len); +			res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);  		else if (tipc_own_addr) -			res = tipc_link_send_sections_fast(p_ptr, msg_sect, -							   len, destnode); +			res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, +							destnode);  		else -			res = tipc_port_reject_sections(p_ptr, msg, msg_sect, -							len, TIPC_ERR_NO_NODE); +			res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, +						     len, TIPC_ERR_NO_NODE);  		if (likely(res != -ELINKCONG)) {  			if (res > 0)  				p_ptr->sent++;  			return res;  		} -		if (port_unreliable(p_ptr)) { +		if (tipc_port_unreliable(p_ptr))  			return len; -		} +  		return -ELINKCONG;  	} -	return tipc_port_reject_sections(p_ptr, msg, msg_sect, len, -					 TIPC_ERR_NO_NAME); +	return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len, +				      TIPC_ERR_NO_NAME);  }  /**   * tipc_send2port - send message sections to port identity   */ -int tipc_send2port(u32 ref, struct tipc_portid const *dest, -		   struct iovec const *msg_sect, unsigned int len) +int tipc_send2port(struct tipc_port *p_ptr, +		   struct tipc_portid const *dest, +		   struct iovec const *msg_sect, +		   unsigned int len)  { -	struct tipc_port *p_ptr;  	struct tipc_msg *msg;  	int res; -	p_ptr = tipc_port_deref(ref); -	if (!p_ptr || p_ptr->connected) +	if (p_ptr->connected)  		return -EINVAL;  	msg = &p_ptr->phdr; @@ -1017,20 +910,20 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,  	msg_set_hdr_sz(msg, BASIC_H_SIZE);  	if (in_own_node(dest->node)) -		res =  tipc_port_recv_sections(p_ptr, msg_sect, len); +		res =  tipc_port_iovec_rcv(p_ptr, msg_sect, len);  	else if (tipc_own_addr) -		res = tipc_link_send_sections_fast(p_ptr, msg_sect, len, -						   dest->node); +		res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len, +						dest->node);  	else -		res = tipc_port_reject_sections(p_ptr, msg, msg_sect, len, +		res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,  						TIPC_ERR_NO_NODE);  	if (likely(res != -ELINKCONG)) {  		if (res > 0)  			p_ptr->sent++;  		return res;  	} -	if (port_unreliable(p_ptr)) { +	if (tipc_port_unreliable(p_ptr))  		return len; -	} +  	return -ELINKCONG;  } diff --git a/net/tipc/port.h b/net/tipc/port.h index 34f12bd4074..a00397393bd 100644 --- a/net/tipc/port.h +++ b/net/tipc/port.h @@ -1,7 +1,7 @@  /*   * net/tipc/port.h: Include file for TIPC port code   * - * Copyright (c) 1994-2007, Ericsson AB + * Copyright (c) 1994-2007, 2014, Ericsson AB   * Copyright (c) 2004-2007, 2010-2013, Wind River Systems   * All rights reserved.   * @@ -48,7 +48,6 @@  /**   * struct tipc_port - TIPC port structure - * @sk: pointer to socket handle   * @lock: pointer to spinlock for controlling access to port   * @connected: non-zero if port is currently connected to a peer port   * @conn_type: TIPC type used when connection was established @@ -60,8 +59,6 @@   * @ref: unique reference to port in TIPC object registry   * @phdr: preformatted message header used when sending messages   * @port_list: adjacent ports in TIPC's global list of ports - * @dispatcher: ptr to routine which handles received messages - * @wakeup: ptr to routine to call when port is no longer congested   * @wait_list: adjacent ports in list of ports waiting on link congestion   * @waiting_pkts:   * @sent: # of non-empty messages sent by port @@ -74,7 +71,6 @@   * @subscription: "node down" subscription used to terminate failed connections   */  struct tipc_port { -	struct sock *sk;  	spinlock_t *lock;  	int connected;  	u32 conn_type; @@ -86,8 +82,6 @@ struct tipc_port {  	u32 ref;  	struct tipc_msg phdr;  	struct list_head port_list; -	u32 (*dispatcher)(struct tipc_port *, struct sk_buff *); -	void (*wakeup)(struct tipc_port *);  	struct list_head wait_list;  	u32 waiting_pkts;  	u32 sent; @@ -106,68 +100,71 @@ struct tipc_port_list;  /*   * TIPC port manipulation routines   */ -struct tipc_port *tipc_createport(struct sock *sk, -				  u32 (*dispatcher)(struct tipc_port *, -				  struct sk_buff *), -				  void (*wakeup)(struct tipc_port *), -				  const u32 importance); +u32 tipc_port_init(struct tipc_port *p_ptr, +		   const unsigned int importance);  int tipc_reject_msg(struct sk_buff *buf, u32 err);  void tipc_acknowledge(u32 port_ref, u32 ack); -int tipc_deleteport(struct tipc_port *p_ptr); - -int tipc_portimportance(u32 portref, unsigned int *importance); -int tipc_set_portimportance(u32 portref, unsigned int importance); - -int tipc_portunreliable(u32 portref, unsigned int *isunreliable); -int tipc_set_portunreliable(u32 portref, unsigned int isunreliable); - -int tipc_portunreturnable(u32 portref, unsigned int *isunreturnable); -int tipc_set_portunreturnable(u32 portref, unsigned int isunreturnable); +void tipc_port_destroy(struct tipc_port *p_ptr);  int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,  		 struct tipc_name_seq const *name_seq); +  int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope,  		  struct tipc_name_seq const *name_seq); -int tipc_connect(u32 portref, struct tipc_portid const *port); +int tipc_port_connect(u32 portref, struct tipc_portid const *port); -int tipc_disconnect(u32 portref); +int tipc_port_disconnect(u32 portref); -int tipc_shutdown(u32 ref); +int tipc_port_shutdown(u32 ref); +void tipc_port_wakeup(struct tipc_port *port);  /*   * The following routines require that the port be locked on entry   */ -int __tipc_disconnect(struct tipc_port *tp_ptr); -int __tipc_connect(u32 ref, struct tipc_port *p_ptr, +int __tipc_port_disconnect(struct tipc_port *tp_ptr); +int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,  		   struct tipc_portid const *peer);  int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);  /*   * TIPC messaging routines   */ -int tipc_port_recv_msg(struct sk_buff *buf); -int tipc_send(u32 portref, struct iovec const *msg_sect, unsigned int len); - -int tipc_send2name(u32 portref, struct tipc_name const *name, u32 domain, -		   struct iovec const *msg_sect, unsigned int len); +int tipc_port_rcv(struct sk_buff *buf); + +int tipc_send(struct tipc_port *port, +	      struct iovec const *msg_sect, +	      unsigned int len); + +int tipc_send2name(struct tipc_port *port, +		   struct tipc_name const *name, +		   u32 domain, +		   struct iovec const *msg_sect, +		   unsigned int len); + +int tipc_send2port(struct tipc_port *port, +		   struct tipc_portid const *dest, +		   struct iovec const *msg_sect, +		   unsigned int len); + +int tipc_port_mcast_xmit(struct tipc_port *port, +			 struct tipc_name_seq const *seq, +			 struct iovec const *msg, +			 unsigned int len); + +int tipc_port_iovec_reject(struct tipc_port *p_ptr, +			   struct tipc_msg *hdr, +			   struct iovec const *msg_sect, +			   unsigned int len, +			   int err); -int tipc_send2port(u32 portref, struct tipc_portid const *dest, -		   struct iovec const *msg_sect, unsigned int len); - -int tipc_multicast(u32 portref, struct tipc_name_seq const *seq, -		   struct iovec const *msg, unsigned int len); - -int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr, -			      struct iovec const *msg_sect, unsigned int len, -			      int err);  struct sk_buff *tipc_port_get_ports(void); -void tipc_port_recv_proto_msg(struct sk_buff *buf); -void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp); +void tipc_port_proto_rcv(struct sk_buff *buf); +void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);  void tipc_port_reinit(void);  /** @@ -188,14 +185,53 @@ static inline void tipc_port_unlock(struct tipc_port *p_ptr)  	spin_unlock_bh(p_ptr->lock);  } -static inline struct tipc_port *tipc_port_deref(u32 ref) +static inline int tipc_port_congested(struct tipc_port *p_ptr)  { -	return (struct tipc_port *)tipc_ref_deref(ref); +	return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2);  } -static inline int tipc_port_congested(struct tipc_port *p_ptr) + +static inline u32 tipc_port_peernode(struct tipc_port *p_ptr)  { -	return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2); +	return msg_destnode(&p_ptr->phdr); +} + +static inline u32 tipc_port_peerport(struct tipc_port *p_ptr) +{ +	return msg_destport(&p_ptr->phdr); +} + +static inline  bool tipc_port_unreliable(struct tipc_port *port) +{ +	return msg_src_droppable(&port->phdr) != 0; +} + +static inline void tipc_port_set_unreliable(struct tipc_port *port, +					    bool unreliable) +{ +	msg_set_src_droppable(&port->phdr, unreliable ? 1 : 0); +} + +static inline bool tipc_port_unreturnable(struct tipc_port *port) +{ +	return msg_dest_droppable(&port->phdr) != 0; +} + +static inline void tipc_port_set_unreturnable(struct tipc_port *port, +					     bool unreturnable) +{ +	msg_set_dest_droppable(&port->phdr, unreturnable ? 1 : 0); +} + + +static inline int tipc_port_importance(struct tipc_port *port) +{ +	return msg_importance(&port->phdr); +} + +static inline void tipc_port_set_importance(struct tipc_port *port, int imp) +{ +	msg_set_importance(&port->phdr, (u32)imp);  }  #endif diff --git a/net/tipc/ref.c b/net/tipc/ref.c index 2a2a938dc22..3d4ecd754ee 100644 --- a/net/tipc/ref.c +++ b/net/tipc/ref.c @@ -89,7 +89,7 @@ struct ref_table {  static struct ref_table tipc_ref_table; -static DEFINE_RWLOCK(ref_table_lock); +static DEFINE_SPINLOCK(ref_table_lock);  /**   * tipc_ref_table_init - create reference table for objects @@ -126,9 +126,6 @@ int tipc_ref_table_init(u32 requested_size, u32 start)   */  void tipc_ref_table_stop(void)  { -	if (!tipc_ref_table.entries) -		return; -  	vfree(tipc_ref_table.entries);  	tipc_ref_table.entries = NULL;  } @@ -162,7 +159,7 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)  	}  	/* take a free entry, if available; otherwise initialize a new entry */ -	write_lock_bh(&ref_table_lock); +	spin_lock_bh(&ref_table_lock);  	if (tipc_ref_table.first_free) {  		index = tipc_ref_table.first_free;  		entry = &(tipc_ref_table.entries[index]); @@ -178,7 +175,7 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)  	} else {  		ref = 0;  	} -	write_unlock_bh(&ref_table_lock); +	spin_unlock_bh(&ref_table_lock);  	/*  	 * Grab the lock so no one else can modify this entry @@ -219,7 +216,7 @@ void tipc_ref_discard(u32 ref)  	index = ref & index_mask;  	entry = &(tipc_ref_table.entries[index]); -	write_lock_bh(&ref_table_lock); +	spin_lock_bh(&ref_table_lock);  	if (!entry->object) {  		pr_err("Attempt to discard ref. to non-existent obj\n"); @@ -245,7 +242,7 @@ void tipc_ref_discard(u32 ref)  	tipc_ref_table.last_free = index;  exit: -	write_unlock_bh(&ref_table_lock); +	spin_unlock_bh(&ref_table_lock);  }  /** @@ -267,20 +264,3 @@ void *tipc_ref_lock(u32 ref)  	}  	return NULL;  } - - -/** - * tipc_ref_deref - return pointer referenced object (without locking it) - */ -void *tipc_ref_deref(u32 ref) -{ -	if (likely(tipc_ref_table.entries)) { -		struct reference *entry; - -		entry = &tipc_ref_table.entries[ref & -						tipc_ref_table.index_mask]; -		if (likely(entry->ref == ref)) -			return entry->object; -	} -	return NULL; -} diff --git a/net/tipc/ref.h b/net/tipc/ref.h index 5bc8e7ab84d..d01aa1df63b 100644 --- a/net/tipc/ref.h +++ b/net/tipc/ref.h @@ -44,6 +44,5 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock);  void tipc_ref_discard(u32 ref);  void *tipc_ref_lock(u32 ref); -void *tipc_ref_deref(u32 ref);  #endif diff --git a/net/tipc/server.c b/net/tipc/server.c index b635ca347a8..646a930eefb 100644 --- a/net/tipc/server.c +++ b/net/tipc/server.c @@ -87,7 +87,6 @@ static void tipc_clean_outqueues(struct tipc_conn *con);  static void tipc_conn_kref_release(struct kref *kref)  {  	struct tipc_conn *con = container_of(kref, struct tipc_conn, kref); -	struct tipc_server *s = con->server;  	if (con->sock) {  		tipc_sock_release_local(con->sock); @@ -95,10 +94,6 @@ static void tipc_conn_kref_release(struct kref *kref)  	}  	tipc_clean_outqueues(con); - -	if (con->conid) -		s->tipc_conn_shutdown(con->conid, con->usr_data); -  	kfree(con);  } @@ -181,6 +176,9 @@ static void tipc_close_conn(struct tipc_conn *con)  	struct tipc_server *s = con->server;  	if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { +		if (con->conid) +			s->tipc_conn_shutdown(con->conid, con->usr_data); +  		spin_lock_bh(&s->idr_lock);  		idr_remove(&s->conn_idr, con->conid);  		s->idr_in_use--; @@ -429,10 +427,12 @@ int tipc_conn_sendmsg(struct tipc_server *s, int conid,  	list_add_tail(&e->list, &con->outqueue);  	spin_unlock_bh(&con->outqueue_lock); -	if (test_bit(CF_CONNECTED, &con->flags)) +	if (test_bit(CF_CONNECTED, &con->flags)) {  		if (!queue_work(s->send_wq, &con->swork))  			conn_put(con); - +	} else { +		conn_put(con); +	}  	return 0;  } @@ -573,7 +573,6 @@ int tipc_server_start(struct tipc_server *s)  		kmem_cache_destroy(s->rcvbuf_cache);  		return ret;  	} -	s->enabled = 1;  	return ret;  } @@ -583,10 +582,6 @@ void tipc_server_stop(struct tipc_server *s)  	int total = 0;  	int id; -	if (!s->enabled) -		return; - -	s->enabled = 0;  	spin_lock_bh(&s->idr_lock);  	for (id = 0; total < s->idr_in_use; id++) {  		con = idr_find(&s->conn_idr, id); diff --git a/net/tipc/server.h b/net/tipc/server.h index 98b23f20bc0..be817b0b547 100644 --- a/net/tipc/server.h +++ b/net/tipc/server.h @@ -56,7 +56,6 @@   * @name: server name   * @imp: message importance   * @type: socket type - * @enabled: identify whether server is launched or not   */  struct tipc_server {  	struct idr conn_idr; @@ -74,7 +73,6 @@ struct tipc_server {  	const char name[TIPC_SERVER_NAME_LEN];  	int imp;  	int type; -	int enabled;  };  int tipc_conn_sendmsg(struct tipc_server *s, int conid, diff --git a/net/tipc/socket.c b/net/tipc/socket.c index aab4948f0af..29b7f26a12c 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@  /*   * net/tipc/socket.c: TIPC socket API   * - * Copyright (c) 2001-2007, 2012 Ericsson AB + * Copyright (c) 2001-2007, 2012-2014, Ericsson AB   * Copyright (c) 2004-2008, 2010-2013, Wind River Systems   * All rights reserved.   * @@ -38,30 +38,17 @@  #include "port.h"  #include <linux/export.h> -#include <net/sock.h>  #define SS_LISTENING	-1	/* socket is listening */  #define SS_READY	-2	/* socket is connectionless */  #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */ -struct tipc_sock { -	struct sock sk; -	struct tipc_port *p; -	struct tipc_portid peer_name; -	unsigned int conn_timeout; -}; - -#define tipc_sk(sk) ((struct tipc_sock *)(sk)) -#define tipc_sk_port(sk) (tipc_sk(sk)->p) -  static int backlog_rcv(struct sock *sk, struct sk_buff *skb); -static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); -static void wakeupdispatch(struct tipc_port *tport);  static void tipc_data_ready(struct sock *sk, int len);  static void tipc_write_space(struct sock *sk); -static int release(struct socket *sock); -static int accept(struct socket *sock, struct socket *new_sock, int flags); +static int tipc_release(struct socket *sock); +static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);  static const struct proto_ops packet_ops;  static const struct proto_ops stream_ops; @@ -70,8 +57,6 @@ static const struct proto_ops msg_ops;  static struct proto tipc_proto;  static struct proto tipc_proto_kern; -static int sockets_enabled; -  /*   * Revised TIPC socket locking policy:   * @@ -117,6 +102,8 @@ static int sockets_enabled;   *   - port reference   */ +#include "socket.h" +  /**   * advance_rx_queue - discard first buffer in socket receive queue   * @@ -152,13 +139,15 @@ static void reject_rx_queue(struct sock *sk)   *   * Returns 0 on success, errno otherwise   */ -static int tipc_sk_create(struct net *net, struct socket *sock, int protocol, -			  int kern) +static int tipc_sk_create(struct net *net, struct socket *sock, +			  int protocol, int kern)  {  	const struct proto_ops *ops;  	socket_state state;  	struct sock *sk; -	struct tipc_port *tp_ptr; +	struct tipc_sock *tsk; +	struct tipc_port *port; +	u32 ref;  	/* Validate arguments */  	if (unlikely(protocol != 0)) @@ -191,10 +180,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,  	if (sk == NULL)  		return -ENOMEM; -	/* Allocate TIPC port for socket to use */ -	tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch, -				 TIPC_LOW_IMPORTANCE); -	if (unlikely(!tp_ptr)) { +	tsk = tipc_sk(sk); +	port = &tsk->port; + +	ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE); +	if (!ref) { +		pr_warn("Socket registration failed, ref. table exhausted\n");  		sk_free(sk);  		return -ENOMEM;  	} @@ -208,17 +199,14 @@ static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,  	sk->sk_rcvbuf = sysctl_tipc_rmem[1];  	sk->sk_data_ready = tipc_data_ready;  	sk->sk_write_space = tipc_write_space; -	tipc_sk(sk)->p = tp_ptr;  	tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; - -	spin_unlock_bh(tp_ptr->lock); +	tipc_port_unlock(port);  	if (sock->state == SS_READY) { -		tipc_set_portunreturnable(tp_ptr->ref, 1); +		tipc_port_set_unreturnable(port, true);  		if (sock->type == SOCK_DGRAM) -			tipc_set_portunreliable(tp_ptr->ref, 1); +			tipc_port_set_unreliable(port, true);  	} -  	return 0;  } @@ -256,7 +244,7 @@ int tipc_sock_create_local(int type, struct socket **res)   */  void tipc_sock_release_local(struct socket *sock)  { -	release(sock); +	tipc_release(sock);  	sock->ops = NULL;  	sock_release(sock);  } @@ -282,7 +270,7 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,  	if (ret < 0)  		return ret; -	ret = accept(sock, *newsock, flags); +	ret = tipc_accept(sock, *newsock, flags);  	if (ret < 0) {  		sock_release(*newsock);  		return ret; @@ -292,7 +280,7 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,  }  /** - * release - destroy a TIPC socket + * tipc_release - destroy a TIPC socket   * @sock: socket to destroy   *   * This routine cleans up any messages that are still queued on the socket. @@ -307,10 +295,11 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,   *   * Returns 0 on success, errno otherwise   */ -static int release(struct socket *sock) +static int tipc_release(struct socket *sock)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport; +	struct tipc_sock *tsk; +	struct tipc_port *port;  	struct sk_buff *buf;  	int res; @@ -321,7 +310,8 @@ static int release(struct socket *sock)  	if (sk == NULL)  		return 0; -	tport = tipc_sk_port(sk); +	tsk = tipc_sk(sk); +	port = &tsk->port;  	lock_sock(sk);  	/* @@ -338,17 +328,16 @@ static int release(struct socket *sock)  			if ((sock->state == SS_CONNECTING) ||  			    (sock->state == SS_CONNECTED)) {  				sock->state = SS_DISCONNECTING; -				tipc_disconnect(tport->ref); +				tipc_port_disconnect(port->ref);  			}  			tipc_reject_msg(buf, TIPC_ERR_NO_PORT);  		}  	} -	/* -	 * Delete TIPC port; this ensures no more messages are queued -	 * (also disconnects an active connection & sends a 'FIN-' to peer) +	/* Destroy TIPC port; also disconnects an active connection and +	 * sends a 'FIN-' to peer.  	 */ -	res = tipc_deleteport(tport); +	tipc_port_destroy(port);  	/* Discard any remaining (connection-based) messages in receive queue */  	__skb_queue_purge(&sk->sk_receive_queue); @@ -364,7 +353,7 @@ static int release(struct socket *sock)  }  /** - * bind - associate or disassocate TIPC name(s) with a socket + * tipc_bind - associate or disassocate TIPC name(s) with a socket   * @sock: socket structure   * @uaddr: socket address describing name(s) and desired operation   * @uaddr_len: size of socket address data structure @@ -378,16 +367,17 @@ static int release(struct socket *sock)   * NOTE: This routine doesn't need to take the socket lock since it doesn't   *       access any non-constant socket information.   */ -static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) +static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, +		     int uaddr_len)  {  	struct sock *sk = sock->sk;  	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; -	struct tipc_port *tport = tipc_sk_port(sock->sk); +	struct tipc_sock *tsk = tipc_sk(sk);  	int res = -EINVAL;  	lock_sock(sk);  	if (unlikely(!uaddr_len)) { -		res = tipc_withdraw(tport, 0, NULL); +		res = tipc_withdraw(&tsk->port, 0, NULL);  		goto exit;  	} @@ -415,15 +405,15 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)  	}  	res = (addr->scope > 0) ? -		tipc_publish(tport, addr->scope, &addr->addr.nameseq) : -		tipc_withdraw(tport, -addr->scope, &addr->addr.nameseq); +		tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) : +		tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);  exit:  	release_sock(sk);  	return res;  }  /** - * get_name - get port ID of socket or peer socket + * tipc_getname - get port ID of socket or peer socket   * @sock: socket structure   * @uaddr: area for returned socket address   * @uaddr_len: area for returned length of socket address @@ -435,21 +425,21 @@ exit:   *       accesses socket information that is unchanging (or which changes in   *       a completely predictable manner).   */ -static int get_name(struct socket *sock, struct sockaddr *uaddr, -		    int *uaddr_len, int peer) +static int tipc_getname(struct socket *sock, struct sockaddr *uaddr, +			int *uaddr_len, int peer)  {  	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; -	struct tipc_sock *tsock = tipc_sk(sock->sk); +	struct tipc_sock *tsk = tipc_sk(sock->sk);  	memset(addr, 0, sizeof(*addr));  	if (peer) {  		if ((sock->state != SS_CONNECTED) &&  			((peer != 2) || (sock->state != SS_DISCONNECTING)))  			return -ENOTCONN; -		addr->addr.id.ref = tsock->peer_name.ref; -		addr->addr.id.node = tsock->peer_name.node; +		addr->addr.id.ref = tipc_port_peerport(&tsk->port); +		addr->addr.id.node = tipc_port_peernode(&tsk->port);  	} else { -		addr->addr.id.ref = tsock->p->ref; +		addr->addr.id.ref = tsk->port.ref;  		addr->addr.id.node = tipc_own_addr;  	} @@ -463,7 +453,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,  }  /** - * poll - read and possibly block on pollmask + * tipc_poll - read and possibly block on pollmask   * @file: file structure associated with the socket   * @sock: socket for which to calculate the poll bits   * @wait: ??? @@ -502,22 +492,23 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,   * imply that the operation will succeed, merely that it should be performed   * and will not block.   */ -static unsigned int poll(struct file *file, struct socket *sock, -			 poll_table *wait) +static unsigned int tipc_poll(struct file *file, struct socket *sock, +			      poll_table *wait)  {  	struct sock *sk = sock->sk; +	struct tipc_sock *tsk = tipc_sk(sk);  	u32 mask = 0;  	sock_poll_wait(file, sk_sleep(sk), wait);  	switch ((int)sock->state) {  	case SS_UNCONNECTED: -		if (!tipc_sk_port(sk)->congested) +		if (!tsk->port.congested)  			mask |= POLLOUT;  		break;  	case SS_READY:  	case SS_CONNECTED: -		if (!tipc_sk_port(sk)->congested) +		if (!tsk->port.congested)  			mask |= POLLOUT;  		/* fall thru' */  	case SS_CONNECTING: @@ -567,7 +558,7 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)  static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk);  	DEFINE_WAIT(wait);  	int done; @@ -583,14 +574,15 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)  			return sock_intr_errno(*timeo_p);  		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); -		done = sk_wait_event(sk, timeo_p, !tport->congested); +		done = sk_wait_event(sk, timeo_p, !tsk->port.congested);  		finish_wait(sk_sleep(sk), &wait);  	} while (!done);  	return 0;  } +  /** - * send_msg - send message in connectionless manner + * tipc_sendmsg - send message in connectionless manner   * @iocb: if NULL, indicates that socket lock is already held   * @sock: socket structure   * @m: message to send @@ -603,11 +595,12 @@ static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)   *   * Returns the number of bytes sent on success, or errno otherwise   */ -static int send_msg(struct kiocb *iocb, struct socket *sock, -		    struct msghdr *m, size_t total_len) +static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock, +			struct msghdr *m, size_t total_len)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_port *port = &tsk->port;  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);  	int needs_conn;  	long timeo; @@ -634,13 +627,13 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,  			res = -EISCONN;  			goto exit;  		} -		if (tport->published) { +		if (tsk->port.published) {  			res = -EOPNOTSUPP;  			goto exit;  		}  		if (dest->addrtype == TIPC_ADDR_NAME) { -			tport->conn_type = dest->addr.name.name.type; -			tport->conn_instance = dest->addr.name.name.instance; +			tsk->port.conn_type = dest->addr.name.name.type; +			tsk->port.conn_instance = dest->addr.name.name.instance;  		}  		/* Abort any pending connection attempts (very unlikely) */ @@ -653,13 +646,13 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,  			res = dest_name_check(dest, m);  			if (res)  				break; -			res = tipc_send2name(tport->ref, +			res = tipc_send2name(port,  					     &dest->addr.name.name,  					     dest->addr.name.domain,  					     m->msg_iov,  					     total_len);  		} else if (dest->addrtype == TIPC_ADDR_ID) { -			res = tipc_send2port(tport->ref, +			res = tipc_send2port(port,  					     &dest->addr.id,  					     m->msg_iov,  					     total_len); @@ -671,10 +664,10 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,  			res = dest_name_check(dest, m);  			if (res)  				break; -			res = tipc_multicast(tport->ref, -					     &dest->addr.nameseq, -					     m->msg_iov, -					     total_len); +			res = tipc_port_mcast_xmit(port, +						   &dest->addr.nameseq, +						   m->msg_iov, +						   total_len);  		}  		if (likely(res != -ELINKCONG)) {  			if (needs_conn && (res >= 0)) @@ -695,7 +688,8 @@ exit:  static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_port *port = &tsk->port;  	DEFINE_WAIT(wait);  	int done; @@ -714,14 +708,14 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)  		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);  		done = sk_wait_event(sk, timeo_p, -				     (!tport->congested || !tport->connected)); +				     (!port->congested || !port->connected));  		finish_wait(sk_sleep(sk), &wait);  	} while (!done);  	return 0;  }  /** - * send_packet - send a connection-oriented message + * tipc_send_packet - send a connection-oriented message   * @iocb: if NULL, indicates that socket lock is already held   * @sock: socket structure   * @m: message to send @@ -731,18 +725,18 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)   *   * Returns the number of bytes sent on success, or errno otherwise   */ -static int send_packet(struct kiocb *iocb, struct socket *sock, -		       struct msghdr *m, size_t total_len) +static int tipc_send_packet(struct kiocb *iocb, struct socket *sock, +			    struct msghdr *m, size_t total_len)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk);  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);  	int res = -EINVAL;  	long timeo;  	/* Handle implied connection establishment */  	if (unlikely(dest)) -		return send_msg(iocb, sock, m, total_len); +		return tipc_sendmsg(iocb, sock, m, total_len);  	if (total_len > TIPC_MAX_USER_MSG_SIZE)  		return -EMSGSIZE; @@ -760,7 +754,7 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,  	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);  	do { -		res = tipc_send(tport->ref, m->msg_iov, total_len); +		res = tipc_send(&tsk->port, m->msg_iov, total_len);  		if (likely(res != -ELINKCONG))  			break;  		res = tipc_wait_for_sndpkt(sock, &timeo); @@ -774,7 +768,7 @@ exit:  }  /** - * send_stream - send stream-oriented data + * tipc_send_stream - send stream-oriented data   * @iocb: (unused)   * @sock: socket structure   * @m: data to send @@ -785,11 +779,11 @@ exit:   * Returns the number of bytes sent on success (or partial success),   * or errno if no data sent   */ -static int send_stream(struct kiocb *iocb, struct socket *sock, -		       struct msghdr *m, size_t total_len) +static int tipc_send_stream(struct kiocb *iocb, struct socket *sock, +			    struct msghdr *m, size_t total_len)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk);  	struct msghdr my_msg;  	struct iovec my_iov;  	struct iovec *curr_iov; @@ -806,7 +800,7 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,  	/* Handle special cases where there is no connection */  	if (unlikely(sock->state != SS_CONNECTED)) {  		if (sock->state == SS_UNCONNECTED) -			res = send_packet(NULL, sock, m, total_len); +			res = tipc_send_packet(NULL, sock, m, total_len);  		else  			res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;  		goto exit; @@ -837,21 +831,22 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,  	my_msg.msg_name = NULL;  	bytes_sent = 0; -	hdr_size = msg_hdr_sz(&tport->phdr); +	hdr_size = msg_hdr_sz(&tsk->port.phdr);  	while (curr_iovlen--) {  		curr_start = curr_iov->iov_base;  		curr_left = curr_iov->iov_len;  		while (curr_left) { -			bytes_to_send = tport->max_pkt - hdr_size; +			bytes_to_send = tsk->port.max_pkt - hdr_size;  			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)  				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;  			if (curr_left < bytes_to_send)  				bytes_to_send = curr_left;  			my_iov.iov_base = curr_start;  			my_iov.iov_len = bytes_to_send; -			res = send_packet(NULL, sock, &my_msg, bytes_to_send); +			res = tipc_send_packet(NULL, sock, &my_msg, +					       bytes_to_send);  			if (res < 0) {  				if (bytes_sent)  					res = bytes_sent; @@ -872,27 +867,25 @@ exit:  /**   * auto_connect - complete connection setup to a remote port - * @sock: socket structure + * @tsk: tipc socket structure   * @msg: peer's response message   *   * Returns 0 on success, errno otherwise   */ -static int auto_connect(struct socket *sock, struct tipc_msg *msg) +static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)  { -	struct tipc_sock *tsock = tipc_sk(sock->sk); -	struct tipc_port *p_ptr; +	struct tipc_port *port = &tsk->port; +	struct socket *sock = tsk->sk.sk_socket; +	struct tipc_portid peer; -	tsock->peer_name.ref = msg_origport(msg); -	tsock->peer_name.node = msg_orignode(msg); -	p_ptr = tipc_port_deref(tsock->p->ref); -	if (!p_ptr) -		return -EINVAL; +	peer.ref = msg_origport(msg); +	peer.node = msg_orignode(msg); -	__tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name); +	__tipc_port_connect(port->ref, port, &peer);  	if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)  		return -EINVAL; -	msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg)); +	msg_set_importance(&port->phdr, (u32)msg_importance(msg));  	sock->state = SS_CONNECTED;  	return 0;  } @@ -999,7 +992,7 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)  	for (;;) {  		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); -		if (skb_queue_empty(&sk->sk_receive_queue)) { +		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {  			if (sock->state == SS_DISCONNECTING) {  				err = -ENOTCONN;  				break; @@ -1023,7 +1016,7 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)  }  /** - * recv_msg - receive packet-oriented message + * tipc_recvmsg - receive packet-oriented message   * @iocb: (unused)   * @m: descriptor for message info   * @buf_len: total size of user buffer area @@ -1034,11 +1027,12 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)   *   * Returns size of returned message data, errno otherwise   */ -static int recv_msg(struct kiocb *iocb, struct socket *sock, -		    struct msghdr *m, size_t buf_len, int flags) +static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock, +			struct msghdr *m, size_t buf_len, int flags)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_port *port = &tsk->port;  	struct sk_buff *buf;  	struct tipc_msg *msg;  	long timeo; @@ -1081,7 +1075,7 @@ restart:  	set_orig_addr(m, msg);  	/* Capture ancillary data (optional) */ -	res = anc_data_recv(m, msg, tport); +	res = anc_data_recv(m, msg, port);  	if (res)  		goto exit; @@ -1107,8 +1101,8 @@ restart:  	/* Consume received message (optional) */  	if (likely(!(flags & MSG_PEEK))) {  		if ((sock->state != SS_READY) && -		    (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) -			tipc_acknowledge(tport->ref, tport->conn_unacked); +		    (++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) +			tipc_acknowledge(port->ref, port->conn_unacked);  		advance_rx_queue(sk);  	}  exit: @@ -1117,7 +1111,7 @@ exit:  }  /** - * recv_stream - receive stream-oriented data + * tipc_recv_stream - receive stream-oriented data   * @iocb: (unused)   * @m: descriptor for message info   * @buf_len: total size of user buffer area @@ -1128,11 +1122,12 @@ exit:   *   * Returns size of returned message data, errno otherwise   */ -static int recv_stream(struct kiocb *iocb, struct socket *sock, -		       struct msghdr *m, size_t buf_len, int flags) +static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock, +			    struct msghdr *m, size_t buf_len, int flags)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_port *port = &tsk->port;  	struct sk_buff *buf;  	struct tipc_msg *msg;  	long timeo; @@ -1177,7 +1172,7 @@ restart:  	/* Optionally capture sender's address & ancillary data of first msg */  	if (sz_copied == 0) {  		set_orig_addr(m, msg); -		res = anc_data_recv(m, msg, tport); +		res = anc_data_recv(m, msg, port);  		if (res)  			goto exit;  	} @@ -1215,8 +1210,8 @@ restart:  	/* Consume received message (optional) */  	if (likely(!(flags & MSG_PEEK))) { -		if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) -			tipc_acknowledge(tport->ref, tport->conn_unacked); +		if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) +			tipc_acknowledge(port->ref, port->conn_unacked);  		advance_rx_queue(sk);  	} @@ -1268,17 +1263,19 @@ static void tipc_data_ready(struct sock *sk, int len)  /**   * filter_connect - Handle all incoming messages for a connection-based socket - * @tsock: TIPC socket + * @tsk: TIPC socket   * @msg: message   *   * Returns TIPC error status code and socket error status code   * once it encounters some errors   */ -static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf) +static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)  { -	struct socket *sock = tsock->sk.sk_socket; +	struct sock *sk = &tsk->sk; +	struct tipc_port *port = &tsk->port; +	struct socket *sock = sk->sk_socket;  	struct tipc_msg *msg = buf_msg(*buf); -	struct sock *sk = &tsock->sk; +  	u32 retval = TIPC_ERR_NO_PORT;  	int res; @@ -1288,10 +1285,10 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)  	switch ((int)sock->state) {  	case SS_CONNECTED:  		/* Accept only connection-based messages sent by peer */ -		if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) { +		if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {  			if (unlikely(msg_errcode(msg))) {  				sock->state = SS_DISCONNECTING; -				__tipc_disconnect(tsock->p); +				__tipc_port_disconnect(port);  			}  			retval = TIPC_OK;  		} @@ -1308,7 +1305,7 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)  		if (unlikely(!msg_connected(msg)))  			break; -		res = auto_connect(sock, msg); +		res = auto_connect(tsk, msg);  		if (res) {  			sock->state = SS_DISCONNECTING;  			sk->sk_err = -res; @@ -1387,6 +1384,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)  static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)  {  	struct socket *sock = sk->sk_socket; +	struct tipc_sock *tsk = tipc_sk(sk);  	struct tipc_msg *msg = buf_msg(buf);  	unsigned int limit = rcvbuf_limit(sk, buf);  	u32 res = TIPC_OK; @@ -1399,7 +1397,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)  		if (msg_connected(msg))  			return TIPC_ERR_NO_PORT;  	} else { -		res = filter_connect(tipc_sk(sk), &buf); +		res = filter_connect(tsk, &buf);  		if (res != TIPC_OK || buf == NULL)  			return res;  	} @@ -1437,17 +1435,16 @@ static int backlog_rcv(struct sock *sk, struct sk_buff *buf)  }  /** - * dispatch - handle incoming message - * @tport: TIPC port that received message + * tipc_sk_rcv - handle incoming message + * @sk:  socket receiving message   * @buf: message   *   * Called with port lock already taken.   *   * Returns TIPC error status code (TIPC_OK if message is not to be rejected)   */ -static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) +u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf)  { -	struct sock *sk = tport->sk;  	u32 res;  	/* @@ -1470,19 +1467,6 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)  	return res;  } -/** - * wakeupdispatch - wake up port after congestion - * @tport: port to wakeup - * - * Called with port lock already taken. - */ -static void wakeupdispatch(struct tipc_port *tport) -{ -	struct sock *sk = tport->sk; - -	sk->sk_write_space(sk); -} -  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)  {  	struct sock *sk = sock->sk; @@ -1506,7 +1490,7 @@ static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)  }  /** - * connect - establish a connection to another TIPC port + * tipc_connect - establish a connection to another TIPC port   * @sock: socket structure   * @dest: socket address for destination port   * @destlen: size of socket address data structure @@ -1514,8 +1498,8 @@ static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)   *   * Returns 0 on success, errno otherwise   */ -static int connect(struct socket *sock, struct sockaddr *dest, int destlen, -		   int flags) +static int tipc_connect(struct socket *sock, struct sockaddr *dest, +			int destlen, int flags)  {  	struct sock *sk = sock->sk;  	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; @@ -1556,7 +1540,7 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,  		if (!timeout)  			m.msg_flags = MSG_DONTWAIT; -		res = send_msg(NULL, sock, &m, 0); +		res = tipc_sendmsg(NULL, sock, &m, 0);  		if ((res < 0) && (res != -EWOULDBLOCK))  			goto exit; @@ -1587,13 +1571,13 @@ exit:  }  /** - * listen - allow socket to listen for incoming connections + * tipc_listen - allow socket to listen for incoming connections   * @sock: socket structure   * @len: (unused)   *   * Returns 0 on success, errno otherwise   */ -static int listen(struct socket *sock, int len) +static int tipc_listen(struct socket *sock, int len)  {  	struct sock *sk = sock->sk;  	int res; @@ -1625,7 +1609,7 @@ static int tipc_wait_for_accept(struct socket *sock, long timeo)  	for (;;) {  		prepare_to_wait_exclusive(sk_sleep(sk), &wait,  					  TASK_INTERRUPTIBLE); -		if (skb_queue_empty(&sk->sk_receive_queue)) { +		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {  			release_sock(sk);  			timeo = schedule_timeout(timeo);  			lock_sock(sk); @@ -1648,20 +1632,20 @@ static int tipc_wait_for_accept(struct socket *sock, long timeo)  }  /** - * accept - wait for connection request + * tipc_accept - wait for connection request   * @sock: listening socket   * @newsock: new socket that is to be connected   * @flags: file-related flags associated with socket   *   * Returns 0 on success, errno otherwise   */ -static int accept(struct socket *sock, struct socket *new_sock, int flags) +static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)  {  	struct sock *new_sk, *sk = sock->sk;  	struct sk_buff *buf; -	struct tipc_sock *new_tsock; -	struct tipc_port *new_tport; +	struct tipc_port *new_port;  	struct tipc_msg *msg; +	struct tipc_portid peer;  	u32 new_ref;  	long timeo;  	int res; @@ -1672,7 +1656,6 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  		res = -EINVAL;  		goto exit;  	} -  	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);  	res = tipc_wait_for_accept(sock, timeo);  	if (res) @@ -1685,9 +1668,8 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  		goto exit;  	new_sk = new_sock->sk; -	new_tsock = tipc_sk(new_sk); -	new_tport = new_tsock->p; -	new_ref = new_tport->ref; +	new_port = &tipc_sk(new_sk)->port; +	new_ref = new_port->ref;  	msg = buf_msg(buf);  	/* we lock on new_sk; but lockdep sees the lock on sk */ @@ -1700,15 +1682,15 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  	reject_rx_queue(new_sk);  	/* Connect new socket to it's peer */ -	new_tsock->peer_name.ref = msg_origport(msg); -	new_tsock->peer_name.node = msg_orignode(msg); -	tipc_connect(new_ref, &new_tsock->peer_name); +	peer.ref = msg_origport(msg); +	peer.node = msg_orignode(msg); +	tipc_port_connect(new_ref, &peer);  	new_sock->state = SS_CONNECTED; -	tipc_set_portimportance(new_ref, msg_importance(msg)); +	tipc_port_set_importance(new_port, msg_importance(msg));  	if (msg_named(msg)) { -		new_tport->conn_type = msg_nametype(msg); -		new_tport->conn_instance = msg_nameinst(msg); +		new_port->conn_type = msg_nametype(msg); +		new_port->conn_instance = msg_nameinst(msg);  	}  	/* @@ -1719,21 +1701,20 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  		struct msghdr m = {NULL,};  		advance_rx_queue(sk); -		send_packet(NULL, new_sock, &m, 0); +		tipc_send_packet(NULL, new_sock, &m, 0);  	} else {  		__skb_dequeue(&sk->sk_receive_queue);  		__skb_queue_head(&new_sk->sk_receive_queue, buf);  		skb_set_owner_r(buf, new_sk);  	}  	release_sock(new_sk); -  exit:  	release_sock(sk);  	return res;  }  /** - * shutdown - shutdown socket connection + * tipc_shutdown - shutdown socket connection   * @sock: socket structure   * @how: direction to close (must be SHUT_RDWR)   * @@ -1741,10 +1722,11 @@ exit:   *   * Returns 0 on success, errno otherwise   */ -static int shutdown(struct socket *sock, int how) +static int tipc_shutdown(struct socket *sock, int how)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_port *port = &tsk->port;  	struct sk_buff *buf;  	int res; @@ -1765,10 +1747,10 @@ restart:  				kfree_skb(buf);  				goto restart;  			} -			tipc_disconnect(tport->ref); +			tipc_port_disconnect(port->ref);  			tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);  		} else { -			tipc_shutdown(tport->ref); +			tipc_port_shutdown(port->ref);  		}  		sock->state = SS_DISCONNECTING; @@ -1794,7 +1776,7 @@ restart:  }  /** - * setsockopt - set socket option + * tipc_setsockopt - set socket option   * @sock: socket structure   * @lvl: option level   * @opt: option identifier @@ -1806,11 +1788,12 @@ restart:   *   * Returns 0 on success, errno otherwise   */ -static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov, -		      unsigned int ol) +static int tipc_setsockopt(struct socket *sock, int lvl, int opt, +			   char __user *ov, unsigned int ol)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_port *port = &tsk->port;  	u32 value;  	int res; @@ -1828,16 +1811,16 @@ static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov,  	switch (opt) {  	case TIPC_IMPORTANCE: -		res = tipc_set_portimportance(tport->ref, value); +		tipc_port_set_importance(port, value);  		break;  	case TIPC_SRC_DROPPABLE:  		if (sock->type != SOCK_STREAM) -			res = tipc_set_portunreliable(tport->ref, value); +			tipc_port_set_unreliable(port, value);  		else  			res = -ENOPROTOOPT;  		break;  	case TIPC_DEST_DROPPABLE: -		res = tipc_set_portunreturnable(tport->ref, value); +		tipc_port_set_unreturnable(port, value);  		break;  	case TIPC_CONN_TIMEOUT:  		tipc_sk(sk)->conn_timeout = value; @@ -1853,7 +1836,7 @@ static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov,  }  /** - * getsockopt - get socket option + * tipc_getsockopt - get socket option   * @sock: socket structure   * @lvl: option level   * @opt: option identifier @@ -1865,11 +1848,12 @@ static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov,   *   * Returns 0 on success, errno otherwise   */ -static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov, -		      int __user *ol) +static int tipc_getsockopt(struct socket *sock, int lvl, int opt, +			   char __user *ov, int __user *ol)  {  	struct sock *sk = sock->sk; -	struct tipc_port *tport = tipc_sk_port(sk); +	struct tipc_sock *tsk = tipc_sk(sk); +	struct tipc_port *port = &tsk->port;  	int len;  	u32 value;  	int res; @@ -1886,13 +1870,13 @@ static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov,  	switch (opt) {  	case TIPC_IMPORTANCE: -		res = tipc_portimportance(tport->ref, &value); +		value = tipc_port_importance(port);  		break;  	case TIPC_SRC_DROPPABLE: -		res = tipc_portunreliable(tport->ref, &value); +		value = tipc_port_unreliable(port);  		break;  	case TIPC_DEST_DROPPABLE: -		res = tipc_portunreturnable(tport->ref, &value); +		value = tipc_port_unreturnable(port);  		break;  	case TIPC_CONN_TIMEOUT:  		value = tipc_sk(sk)->conn_timeout; @@ -1927,20 +1911,20 @@ static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov,  static const struct proto_ops msg_ops = {  	.owner		= THIS_MODULE,  	.family		= AF_TIPC, -	.release	= release, -	.bind		= bind, -	.connect	= connect, +	.release	= tipc_release, +	.bind		= tipc_bind, +	.connect	= tipc_connect,  	.socketpair	= sock_no_socketpair,  	.accept		= sock_no_accept, -	.getname	= get_name, -	.poll		= poll, +	.getname	= tipc_getname, +	.poll		= tipc_poll,  	.ioctl		= sock_no_ioctl,  	.listen		= sock_no_listen, -	.shutdown	= shutdown, -	.setsockopt	= setsockopt, -	.getsockopt	= getsockopt, -	.sendmsg	= send_msg, -	.recvmsg	= recv_msg, +	.shutdown	= tipc_shutdown, +	.setsockopt	= tipc_setsockopt, +	.getsockopt	= tipc_getsockopt, +	.sendmsg	= tipc_sendmsg, +	.recvmsg	= tipc_recvmsg,  	.mmap		= sock_no_mmap,  	.sendpage	= sock_no_sendpage  }; @@ -1948,20 +1932,20 @@ static const struct proto_ops msg_ops = {  static const struct proto_ops packet_ops = {  	.owner		= THIS_MODULE,  	.family		= AF_TIPC, -	.release	= release, -	.bind		= bind, -	.connect	= connect, +	.release	= tipc_release, +	.bind		= tipc_bind, +	.connect	= tipc_connect,  	.socketpair	= sock_no_socketpair, -	.accept		= accept, -	.getname	= get_name, -	.poll		= poll, +	.accept		= tipc_accept, +	.getname	= tipc_getname, +	.poll		= tipc_poll,  	.ioctl		= sock_no_ioctl, -	.listen		= listen, -	.shutdown	= shutdown, -	.setsockopt	= setsockopt, -	.getsockopt	= getsockopt, -	.sendmsg	= send_packet, -	.recvmsg	= recv_msg, +	.listen		= tipc_listen, +	.shutdown	= tipc_shutdown, +	.setsockopt	= tipc_setsockopt, +	.getsockopt	= tipc_getsockopt, +	.sendmsg	= tipc_send_packet, +	.recvmsg	= tipc_recvmsg,  	.mmap		= sock_no_mmap,  	.sendpage	= sock_no_sendpage  }; @@ -1969,20 +1953,20 @@ static const struct proto_ops packet_ops = {  static const struct proto_ops stream_ops = {  	.owner		= THIS_MODULE,  	.family		= AF_TIPC, -	.release	= release, -	.bind		= bind, -	.connect	= connect, +	.release	= tipc_release, +	.bind		= tipc_bind, +	.connect	= tipc_connect,  	.socketpair	= sock_no_socketpair, -	.accept		= accept, -	.getname	= get_name, -	.poll		= poll, +	.accept		= tipc_accept, +	.getname	= tipc_getname, +	.poll		= tipc_poll,  	.ioctl		= sock_no_ioctl, -	.listen		= listen, -	.shutdown	= shutdown, -	.setsockopt	= setsockopt, -	.getsockopt	= getsockopt, -	.sendmsg	= send_stream, -	.recvmsg	= recv_stream, +	.listen		= tipc_listen, +	.shutdown	= tipc_shutdown, +	.setsockopt	= tipc_setsockopt, +	.getsockopt	= tipc_getsockopt, +	.sendmsg	= tipc_send_stream, +	.recvmsg	= tipc_recv_stream,  	.mmap		= sock_no_mmap,  	.sendpage	= sock_no_sendpage  }; @@ -2027,8 +2011,6 @@ int tipc_socket_init(void)  		proto_unregister(&tipc_proto);  		goto out;  	} - -	sockets_enabled = 1;   out:  	return res;  } @@ -2038,10 +2020,6 @@ int tipc_socket_init(void)   */  void tipc_socket_stop(void)  { -	if (!sockets_enabled) -		return; - -	sockets_enabled = 0;  	sock_unregister(tipc_family_ops.family);  	proto_unregister(&tipc_proto);  } diff --git a/net/tipc/socket.h b/net/tipc/socket.h new file mode 100644 index 00000000000..74e5c7f195a --- /dev/null +++ b/net/tipc/socket.h @@ -0,0 +1,72 @@ +/* net/tipc/socket.h: Include file for TIPC socket code + * + * Copyright (c) 2014, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + *    notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + *    notice, this list of conditions and the following disclaimer in the + *    documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + *    contributors may be used to endorse or promote products derived from + *    this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_SOCK_H +#define _TIPC_SOCK_H + +#include "port.h" +#include <net/sock.h> + +/** + * struct tipc_sock - TIPC socket structure + * @sk: socket - interacts with 'port' and with user via the socket API + * @port: port - interacts with 'sk' and with the rest of the TIPC stack + * @peer_name: the peer of the connection, if any + * @conn_timeout: the time we can wait for an unresponded setup request + */ + +struct tipc_sock { +	struct sock sk; +	struct tipc_port port; +	unsigned int conn_timeout; +}; + +static inline struct tipc_sock *tipc_sk(const struct sock *sk) +{ +	return container_of(sk, struct tipc_sock, sk); +} + +static inline struct tipc_sock *tipc_port_to_sock(const struct tipc_port *port) +{ +	return container_of(port, struct tipc_sock, port); +} + +static inline void tipc_sock_wakeup(struct tipc_sock *tsk) +{ +	tsk->sk.sk_write_space(&tsk->sk); +} + +u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf); + +#endif diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 7cb0bd5b117..642437231ad 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -96,20 +96,16 @@ static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,  {  	struct tipc_subscriber *subscriber = sub->subscriber;  	struct kvec msg_sect; -	int ret;  	msg_sect.iov_base = (void *)&sub->evt;  	msg_sect.iov_len = sizeof(struct tipc_event); -  	sub->evt.event = htohl(event, sub->swap);  	sub->evt.found_lower = htohl(found_lower, sub->swap);  	sub->evt.found_upper = htohl(found_upper, sub->swap);  	sub->evt.port.ref = htohl(port_ref, sub->swap);  	sub->evt.port.node = htohl(node, sub->swap); -	ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, -				msg_sect.iov_base, msg_sect.iov_len); -	if (ret < 0) -		pr_err("Sending subscription event failed, no memory\n"); +	tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, msg_sect.iov_base, +			  msg_sect.iov_len);  }  /** @@ -153,14 +149,6 @@ static void subscr_timeout(struct tipc_subscription *sub)  	/* The spin lock per subscriber is used to protect its members */  	spin_lock_bh(&subscriber->lock); -	/* Validate if the connection related to the subscriber is -	 * closed (in case subscriber is terminating) -	 */ -	if (subscriber->conid == 0) { -		spin_unlock_bh(&subscriber->lock); -		return; -	} -  	/* Validate timeout (in case subscription is being cancelled) */  	if (sub->timeout == TIPC_WAIT_FOREVER) {  		spin_unlock_bh(&subscriber->lock); @@ -215,9 +203,6 @@ static void subscr_release(struct tipc_subscriber *subscriber)  	spin_lock_bh(&subscriber->lock); -	/* Invalidate subscriber reference */ -	subscriber->conid = 0; -  	/* Destroy any existing subscriptions for subscriber */  	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,  				 subscription_list) { @@ -278,9 +263,9 @@ static void subscr_cancel(struct tipc_subscr *s,   *   * Called with subscriber lock held.   */ -static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s, -					     struct tipc_subscriber *subscriber) -{ +static int subscr_subscribe(struct tipc_subscr *s, +			    struct tipc_subscriber *subscriber, +			    struct tipc_subscription **sub_p) {  	struct tipc_subscription *sub;  	int swap; @@ -291,23 +276,21 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,  	if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {  		s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);  		subscr_cancel(s, subscriber); -		return NULL; +		return 0;  	}  	/* Refuse subscription if global limit exceeded */  	if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {  		pr_warn("Subscription rejected, limit reached (%u)\n",  			TIPC_MAX_SUBSCRIPTIONS); -		subscr_terminate(subscriber); -		return NULL; +		return -EINVAL;  	}  	/* Allocate subscription object */  	sub = kmalloc(sizeof(*sub), GFP_ATOMIC);  	if (!sub) {  		pr_warn("Subscription rejected, no memory\n"); -		subscr_terminate(subscriber); -		return NULL; +		return -ENOMEM;  	}  	/* Initialize subscription object */ @@ -321,8 +304,7 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,  	    (sub->seq.lower > sub->seq.upper)) {  		pr_warn("Subscription rejected, illegal request\n");  		kfree(sub); -		subscr_terminate(subscriber); -		return NULL; +		return -EINVAL;  	}  	INIT_LIST_HEAD(&sub->nameseq_list);  	list_add(&sub->subscription_list, &subscriber->subscription_list); @@ -335,8 +317,8 @@ static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,  			     (Handler)subscr_timeout, (unsigned long)sub);  		k_start_timer(&sub->timer, sub->timeout);  	} - -	return sub; +	*sub_p = sub; +	return 0;  }  /* Handle one termination request for the subscriber */ @@ -350,10 +332,14 @@ static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,  				  void *usr_data, void *buf, size_t len)  {  	struct tipc_subscriber *subscriber = usr_data; -	struct tipc_subscription *sub; +	struct tipc_subscription *sub = NULL;  	spin_lock_bh(&subscriber->lock); -	sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber); +	if (subscr_subscribe((struct tipc_subscr *)buf, subscriber, &sub) < 0) { +		spin_unlock_bh(&subscriber->lock); +		subscr_terminate(subscriber); +		return; +	}  	if (sub)  		tipc_nametbl_subscribe(sub);  	spin_unlock_bh(&subscriber->lock); | 
