diff options
Diffstat (limited to 'net/tipc')
| -rw-r--r-- | net/tipc/bcast.c | 14 | ||||
| -rw-r--r-- | net/tipc/bearer.c | 8 | ||||
| -rw-r--r-- | net/tipc/cluster.c | 4 | ||||
| -rw-r--r-- | net/tipc/config.c | 11 | ||||
| -rw-r--r-- | net/tipc/core.c | 13 | ||||
| -rw-r--r-- | net/tipc/core.h | 126 | ||||
| -rw-r--r-- | net/tipc/dbg.c | 231 | ||||
| -rw-r--r-- | net/tipc/dbg.h | 12 | ||||
| -rw-r--r-- | net/tipc/discover.c | 14 | ||||
| -rw-r--r-- | net/tipc/discover.h | 2 | ||||
| -rw-r--r-- | net/tipc/eth_media.c | 10 | ||||
| -rw-r--r-- | net/tipc/link.c | 98 | ||||
| -rw-r--r-- | net/tipc/msg.c | 13 | ||||
| -rw-r--r-- | net/tipc/msg.h | 42 | ||||
| -rw-r--r-- | net/tipc/name_distr.c | 6 | ||||
| -rw-r--r-- | net/tipc/name_table.c | 55 | ||||
| -rw-r--r-- | net/tipc/net.c | 14 | ||||
| -rw-r--r-- | net/tipc/net.h | 2 | ||||
| -rw-r--r-- | net/tipc/netlink.c | 16 | ||||
| -rw-r--r-- | net/tipc/node.c | 55 | ||||
| -rw-r--r-- | net/tipc/port.c | 115 | ||||
| -rw-r--r-- | net/tipc/ref.c | 14 | ||||
| -rw-r--r-- | net/tipc/socket.c | 62 | ||||
| -rw-r--r-- | net/tipc/subscr.c | 249 | ||||
| -rw-r--r-- | net/tipc/subscr.h | 34 | ||||
| -rw-r--r-- | net/tipc/user_reg.c | 14 | 
26 files changed, 687 insertions, 547 deletions
| diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index e7880172ef1..b1ff16aa4bd 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -276,7 +276,7 @@ static void bclink_send_nack(struct node *n_ptr)  	if (buf) {  		msg = buf_msg(buf);  		msg_init(msg, BCAST_PROTOCOL, STATE_MSG, -			 TIPC_OK, INT_H_SIZE, n_ptr->addr); +			 INT_H_SIZE, n_ptr->addr);  		msg_set_mc_netid(msg, tipc_net_id);  		msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));  		msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); @@ -571,7 +571,7 @@ static int tipc_bcbearer_send(struct sk_buff *buf,  		assert(tipc_cltr_bcast_nodes.count != 0);  		bcbuf_set_acks(buf, tipc_cltr_bcast_nodes.count);  		msg = buf_msg(buf); -		msg_set_non_seq(msg); +		msg_set_non_seq(msg, 1);  		msg_set_mc_netid(msg, tipc_net_id);  	} @@ -611,7 +611,7 @@ swap:  		bcbearer->bpairs[bp_index].secondary = p;  update:  		if (bcbearer->remains_new.count == 0) -			return TIPC_OK; +			return 0;  		bcbearer->remains = bcbearer->remains_new;  	} @@ -620,7 +620,7 @@ update:  	bcbearer->bearer.publ.blocked = 1;  	bcl->stats.bearer_congs++; -	return ~TIPC_OK; +	return 1;  }  /** @@ -756,7 +756,7 @@ int tipc_bclink_reset_stats(void)  	spin_lock_bh(&bc_lock);  	memset(&bcl->stats, 0, sizeof(bcl->stats));  	spin_unlock_bh(&bc_lock); -	return TIPC_OK; +	return 0;  }  int tipc_bclink_set_queue_limits(u32 limit) @@ -769,7 +769,7 @@ int tipc_bclink_set_queue_limits(u32 limit)  	spin_lock_bh(&bc_lock);  	tipc_link_set_queue_limits(bcl, limit);  	spin_unlock_bh(&bc_lock); -	return TIPC_OK; +	return 0;  }  int tipc_bclink_init(void) @@ -810,7 +810,7 @@ int tipc_bclink_init(void)  		tipc_printbuf_init(&bcl->print_buf, pb, BCLINK_LOG_BUF_SIZE);  	} -	return TIPC_OK; +	return 0;  }  void tipc_bclink_stop(void) diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 271a375b49b..6a9aba3edd0 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -370,7 +370,7 @@ void tipc_bearer_remove_dest(struct bearer *b_ptr, u32 dest)   */  static int bearer_push(struct bearer *b_ptr)  { -	u32 res = TIPC_OK; +	u32 res = 0;  	struct link *ln, *tln;  	if (b_ptr->publ.blocked) @@ -607,7 +607,7 @@ int tipc_block_bearer(const char *name)  	}  	spin_unlock_bh(&b_ptr->publ.lock);  	read_unlock_bh(&tipc_net_lock); -	return TIPC_OK; +	return 0;  }  /** @@ -645,7 +645,7 @@ static int bearer_disable(const char *name)  	}  	spin_unlock_bh(&b_ptr->publ.lock);  	memset(b_ptr, 0, sizeof(struct bearer)); -	return TIPC_OK; +	return 0;  }  int tipc_disable_bearer(const char *name) @@ -668,7 +668,7 @@ int tipc_bearer_init(void)  	tipc_bearers = kcalloc(MAX_BEARERS, sizeof(struct bearer), GFP_ATOMIC);  	media_list = kcalloc(MAX_MEDIA, sizeof(struct media), GFP_ATOMIC);  	if (tipc_bearers && media_list) { -		res = TIPC_OK; +		res = 0;  	} else {  		kfree(tipc_bearers);  		kfree(media_list); diff --git a/net/tipc/cluster.c b/net/tipc/cluster.c index 4bb3404f610..46ee6c58532 100644 --- a/net/tipc/cluster.c +++ b/net/tipc/cluster.c @@ -238,7 +238,7 @@ static struct sk_buff *tipc_cltr_prepare_routing_msg(u32 data_size, u32 dest)  	if (buf) {  		msg = buf_msg(buf);  		memset((char *)msg, 0, size); -		msg_init(msg, ROUTE_DISTRIBUTOR, 0, TIPC_OK, INT_H_SIZE, dest); +		msg_init(msg, ROUTE_DISTRIBUTOR, 0, INT_H_SIZE, dest);  	}  	return buf;  } @@ -571,6 +571,6 @@ exit:  int tipc_cltr_init(void)  {  	tipc_highest_allowed_slave = LOWEST_SLAVE + tipc_max_slaves; -	return tipc_cltr_create(tipc_own_addr) ? TIPC_OK : -ENOMEM; +	return tipc_cltr_create(tipc_own_addr) ? 0 : -ENOMEM;  } diff --git a/net/tipc/config.c b/net/tipc/config.c index c71337a22d3..ca3544d030c 100644 --- a/net/tipc/config.c +++ b/net/tipc/config.c @@ -2,7 +2,7 @@   * net/tipc/config.c: TIPC configuration management code   *   * Copyright (c) 2002-2006, Ericsson AB - * Copyright (c) 2004-2006, Wind River Systems + * Copyright (c) 2004-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -293,7 +293,6 @@ static struct sk_buff *cfg_set_own_addr(void)  	if (tipc_mode == TIPC_NET_MODE)  		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  						   " (cannot change node address once assigned)"); -	tipc_own_addr = addr;  	/*  	 * Must release all spinlocks before calling start_net() because @@ -306,7 +305,7 @@ static struct sk_buff *cfg_set_own_addr(void)  	 */  	spin_unlock_bh(&config_lock); -	tipc_core_start_net(); +	tipc_core_start_net(addr);  	spin_lock_bh(&config_lock);  	return tipc_cfg_reply_none();  } @@ -529,7 +528,7 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area  		break;  #endif  	case TIPC_CMD_SET_LOG_SIZE: -		rep_tlv_buf = tipc_log_resize(req_tlv_area, req_tlv_space); +		rep_tlv_buf = tipc_log_resize_cmd(req_tlv_area, req_tlv_space);  		break;  	case TIPC_CMD_DUMP_LOG:  		rep_tlv_buf = tipc_log_dump(); @@ -602,6 +601,10 @@ struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd, const void *request_area  	case TIPC_CMD_GET_NETID:  		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_net_id);  		break; +	case TIPC_CMD_NOT_NET_ADMIN: +		rep_tlv_buf = +			tipc_cfg_reply_error_string(TIPC_CFG_NOT_NET_ADMIN); +		break;  	default:  		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  							  " (unknown command)"); diff --git a/net/tipc/core.c b/net/tipc/core.c index 740aac5cdfb..3256bd7d398 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -49,7 +49,7 @@  #include "config.h" -#define TIPC_MOD_VER "1.6.3" +#define TIPC_MOD_VER "1.6.4"  #ifndef CONFIG_TIPC_ZONES  #define CONFIG_TIPC_ZONES 3 @@ -117,11 +117,11 @@ void tipc_core_stop_net(void)   * start_net - start TIPC networking sub-systems   */ -int tipc_core_start_net(void) +int tipc_core_start_net(unsigned long addr)  {  	int res; -	if ((res = tipc_net_start()) || +	if ((res = tipc_net_start(addr)) ||  	    (res = tipc_eth_media_start())) {  		tipc_core_stop_net();  	} @@ -164,8 +164,7 @@ int tipc_core_start(void)  	tipc_mode = TIPC_NODE_MODE;  	if ((res = tipc_handler_start()) || -	    (res = tipc_ref_table_init(tipc_max_ports + tipc_max_subscriptions, -				       tipc_random)) || +	    (res = tipc_ref_table_init(tipc_max_ports, tipc_random)) ||  	    (res = tipc_reg_start()) ||  	    (res = tipc_nametbl_init()) ||  	    (res = tipc_k_signal((Handler)tipc_subscr_start, 0)) || @@ -182,7 +181,7 @@ static int __init tipc_init(void)  {  	int res; -	tipc_log_reinit(CONFIG_TIPC_LOG); +	tipc_log_resize(CONFIG_TIPC_LOG);  	info("Activated (version " TIPC_MOD_VER  	     " compiled " __DATE__ " " __TIME__ ")\n"); @@ -209,7 +208,7 @@ static void __exit tipc_exit(void)  	tipc_core_stop_net();  	tipc_core_stop();  	info("Deactivated\n"); -	tipc_log_stop(); +	tipc_log_resize(0);  }  module_init(tipc_init); diff --git a/net/tipc/core.h b/net/tipc/core.h index 5a0e4878d3b..a881f92a853 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -2,7 +2,7 @@   * net/tipc/core.h: Include file for TIPC global declarations   *   * Copyright (c) 2005-2006, Ericsson AB - * Copyright (c) 2005-2006, Wind River Systems + * Copyright (c) 2005-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -59,84 +59,108 @@  #include <linux/vmalloc.h>  /* - * TIPC debugging code + * TIPC sanity test macros   */  #define assert(i)  BUG_ON(!(i)) -struct tipc_msg; -extern struct print_buf *TIPC_NULL, *TIPC_CONS, *TIPC_LOG; -extern struct print_buf *TIPC_TEE(struct print_buf *, struct print_buf *); -void tipc_msg_print(struct print_buf*,struct tipc_msg *,const char*); -void tipc_printf(struct print_buf *, const char *fmt, ...); -void tipc_dump(struct print_buf*,const char *fmt, ...); - -#ifdef CONFIG_TIPC_DEBUG -  /* - * TIPC debug support included: - * - system messages are printed to TIPC_OUTPUT print buffer - * - debug messages are printed to DBG_OUTPUT print buffer + * TIPC system monitoring code   */ -#define err(fmt, arg...)  tipc_printf(TIPC_OUTPUT, KERN_ERR "TIPC: " fmt, ## arg) -#define warn(fmt, arg...) tipc_printf(TIPC_OUTPUT, KERN_WARNING "TIPC: " fmt, ## arg) -#define info(fmt, arg...) tipc_printf(TIPC_OUTPUT, KERN_NOTICE "TIPC: " fmt, ## arg) +/* + * TIPC's print buffer subsystem supports the following print buffers: + * + * TIPC_NULL : null buffer (i.e. print nowhere) + * TIPC_CONS : system console + * TIPC_LOG  : TIPC log buffer + * &buf	     : user-defined buffer (struct print_buf *) + * + * Note: TIPC_LOG is configured to echo its output to the system console; + *       user-defined buffers can be configured to do the same thing. + */ -#define dbg(fmt, arg...)  do {if (DBG_OUTPUT != TIPC_NULL) tipc_printf(DBG_OUTPUT, fmt, ## arg);} while(0) -#define msg_dbg(msg, txt) do {if (DBG_OUTPUT != TIPC_NULL) tipc_msg_print(DBG_OUTPUT, msg, txt);} while(0) -#define dump(fmt, arg...) do {if (DBG_OUTPUT != TIPC_NULL) tipc_dump(DBG_OUTPUT, fmt, ##arg);} while(0) +extern struct print_buf *const TIPC_NULL; +extern struct print_buf *const TIPC_CONS; +extern struct print_buf *const TIPC_LOG; +void tipc_printf(struct print_buf *, const char *fmt, ...);  /* - * By default, TIPC_OUTPUT is defined to be system console and TIPC log buffer, - * while DBG_OUTPUT is the null print buffer.  These defaults can be changed - * here, or on a per .c file basis, by redefining these symbols.  The following - * print buffer options are available: - * - * TIPC_NULL		   : null buffer (i.e. print nowhere) - * TIPC_CONS		   : system console - * TIPC_LOG		   : TIPC log buffer - * &buf			   : user-defined buffer (struct print_buf *) - * TIPC_TEE(&buf_a,&buf_b) : list of buffers (eg. TIPC_TEE(TIPC_CONS,TIPC_LOG)) + * TIPC_OUTPUT is the destination print buffer for system messages.   */  #ifndef TIPC_OUTPUT -#define TIPC_OUTPUT TIPC_TEE(TIPC_CONS,TIPC_LOG) -#endif - -#ifndef DBG_OUTPUT -#define DBG_OUTPUT TIPC_NULL +#define TIPC_OUTPUT TIPC_LOG  #endif -#else -  /* - * TIPC debug support not included: - * - system messages are printed to system console - * - debug messages are not printed + * TIPC can be configured to send system messages to TIPC_OUTPUT + * or to the system console only.   */ +#ifdef CONFIG_TIPC_DEBUG + +#define err(fmt, arg...)  tipc_printf(TIPC_OUTPUT, \ +					KERN_ERR "TIPC: " fmt, ## arg) +#define warn(fmt, arg...) tipc_printf(TIPC_OUTPUT, \ +					KERN_WARNING "TIPC: " fmt, ## arg) +#define info(fmt, arg...) tipc_printf(TIPC_OUTPUT, \ +					KERN_NOTICE "TIPC: " fmt, ## arg) + +#else +  #define err(fmt, arg...)  printk(KERN_ERR "TIPC: " fmt , ## arg)  #define info(fmt, arg...) printk(KERN_INFO "TIPC: " fmt , ## arg)  #define warn(fmt, arg...) printk(KERN_WARNING "TIPC: " fmt , ## arg) -#define dbg(fmt, arg...) do {} while (0) -#define msg_dbg(msg,txt) do {} while (0) -#define dump(fmt,arg...) do {} while (0) +#endif +/* + * DBG_OUTPUT is the destination print buffer for debug messages. + * It defaults to the the null print buffer, but can be redefined + * (typically in the individual .c files being debugged) to allow + * selected debug messages to be generated where needed. + */ + +#ifndef DBG_OUTPUT +#define DBG_OUTPUT TIPC_NULL +#endif  /* - * TIPC_OUTPUT is defined to be the system console, while DBG_OUTPUT is - * the null print buffer.  Thes ensures that any system or debug messages - * that are generated without using the above macros are handled correctly. + * TIPC can be configured to send debug messages to the specified print buffer + * (typically DBG_OUTPUT) or to suppress them entirely.   */ -#undef  TIPC_OUTPUT -#define TIPC_OUTPUT TIPC_CONS +#ifdef CONFIG_TIPC_DEBUG -#undef  DBG_OUTPUT -#define DBG_OUTPUT TIPC_NULL +#define dbg(fmt, arg...)  \ +	do { \ +		if (DBG_OUTPUT != TIPC_NULL) \ +			tipc_printf(DBG_OUTPUT, fmt, ## arg); \ +	} while (0) +#define msg_dbg(msg, txt) \ +	do { \ +		if (DBG_OUTPUT != TIPC_NULL) \ +			tipc_msg_dbg(DBG_OUTPUT, msg, txt); \ +	} while (0) +#define dump(fmt, arg...) \ +	do { \ +		if (DBG_OUTPUT != TIPC_NULL) \ +			tipc_dump_dbg(DBG_OUTPUT, fmt, ##arg); \ +	} while (0) + +void tipc_msg_dbg(struct print_buf *, struct tipc_msg *, const char *); +void tipc_dump_dbg(struct print_buf *, const char *fmt, ...); + +#else + +#define dbg(fmt, arg...)	do {} while (0) +#define msg_dbg(msg, txt)	do {} while (0) +#define dump(fmt, arg...)	do {} while (0) + +#define tipc_msg_dbg(...)	do {} while (0) +#define tipc_dump_dbg(...)	do {} while (0)  #endif @@ -178,7 +202,7 @@ extern atomic_t tipc_user_count;  extern int  tipc_core_start(void);  extern void tipc_core_stop(void); -extern int  tipc_core_start_net(void); +extern int  tipc_core_start_net(unsigned long addr);  extern void tipc_core_stop_net(void);  extern int  tipc_handler_start(void);  extern void tipc_handler_stop(void); diff --git a/net/tipc/dbg.c b/net/tipc/dbg.c index e809d2a2ce0..29ecae85166 100644 --- a/net/tipc/dbg.c +++ b/net/tipc/dbg.c @@ -2,7 +2,7 @@   * net/tipc/dbg.c: TIPC print buffer routines for debugging   *   * Copyright (c) 1996-2006, Ericsson AB - * Copyright (c) 2005-2006, Wind River Systems + * Copyright (c) 2005-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -38,17 +38,43 @@  #include "config.h"  #include "dbg.h" -static char print_string[TIPC_PB_MAX_STR]; -static DEFINE_SPINLOCK(print_lock); +/* + * TIPC pre-defines the following print buffers: + * + * TIPC_NULL : null buffer (i.e. print nowhere) + * TIPC_CONS : system console + * TIPC_LOG  : TIPC log buffer + * + * Additional user-defined print buffers are also permitted. + */ -static struct print_buf null_buf = { NULL, 0, NULL, NULL }; -struct print_buf *TIPC_NULL = &null_buf; +static struct print_buf null_buf = { NULL, 0, NULL, 0 }; +struct print_buf *const TIPC_NULL = &null_buf; -static struct print_buf cons_buf = { NULL, 0, NULL, NULL }; -struct print_buf *TIPC_CONS = &cons_buf; +static struct print_buf cons_buf = { NULL, 0, NULL, 1 }; +struct print_buf *const TIPC_CONS = &cons_buf; -static struct print_buf log_buf = { NULL, 0, NULL, NULL }; -struct print_buf *TIPC_LOG = &log_buf; +static struct print_buf log_buf = { NULL, 0, NULL, 1 }; +struct print_buf *const TIPC_LOG = &log_buf; + +/* + * Locking policy when using print buffers. + * + * 1) tipc_printf() uses 'print_lock' to protect against concurrent access to + * 'print_string' when writing to a print buffer. This also protects against + * concurrent writes to the print buffer being written to. + * + * 2) tipc_dump() and tipc_log_XXX() leverage the aforementioned + * use of 'print_lock' to protect against all types of concurrent operations + * on their associated print buffer (not just write operations). + * + * Note: All routines of the form tipc_printbuf_XXX() are lock-free, and rely + * on the caller to prevent simultaneous use of the print buffer(s) being + * manipulated. + */ + +static char print_string[TIPC_PB_MAX_STR]; +static DEFINE_SPINLOCK(print_lock);  #define FORMAT(PTR,LEN,FMT) \ @@ -60,27 +86,14 @@ struct print_buf *TIPC_LOG = &log_buf;         *(PTR + LEN) = '\0';\  } -/* - * Locking policy when using print buffers. - * - * The following routines use 'print_lock' for protection: - * 1) tipc_printf()  - to protect its print buffer(s) and 'print_string' - * 2) TIPC_TEE()     - to protect its print buffer(s) - * 3) tipc_dump()    - to protect its print buffer(s) and 'print_string' - * 4) tipc_log_XXX() - to protect TIPC_LOG - * - * All routines of the form tipc_printbuf_XXX() rely on the caller to prevent - * simultaneous use of the print buffer(s) being manipulated. - */ -  /**   * tipc_printbuf_init - initialize print buffer to empty   * @pb: pointer to print buffer structure   * @raw: pointer to character array used by print buffer   * @size: size of character array   * - * Makes the print buffer a null device that discards anything written to it - * if the character array is too small (or absent). + * Note: If the character array is too small (or absent), the print buffer + * becomes a null device that discards anything written to it.   */  void tipc_printbuf_init(struct print_buf *pb, char *raw, u32 size) @@ -88,13 +101,13 @@ void tipc_printbuf_init(struct print_buf *pb, char *raw, u32 size)  	pb->buf = raw;  	pb->crs = raw;  	pb->size = size; -	pb->next = NULL; +	pb->echo = 0;  	if (size < TIPC_PB_MIN_SIZE) {  		pb->buf = NULL;  	} else if (raw) {  		pb->buf[0] = 0; -		pb->buf[size-1] = ~0; +		pb->buf[size - 1] = ~0;  	}  } @@ -105,7 +118,11 @@ void tipc_printbuf_init(struct print_buf *pb, char *raw, u32 size)  void tipc_printbuf_reset(struct print_buf *pb)  { -	tipc_printbuf_init(pb, pb->buf, pb->size); +	if (pb->buf) { +		pb->crs = pb->buf; +		pb->buf[0] = 0; +		pb->buf[pb->size - 1] = ~0; +	}  }  /** @@ -141,7 +158,7 @@ int tipc_printbuf_validate(struct print_buf *pb)  	if (pb->buf[pb->size - 1] == 0) {  		cp_buf = kmalloc(pb->size, GFP_ATOMIC); -		if (cp_buf != NULL){ +		if (cp_buf) {  			tipc_printbuf_init(&cb, cp_buf, pb->size);  			tipc_printbuf_move(&cb, pb);  			tipc_printbuf_move(pb, &cb); @@ -179,15 +196,16 @@ void tipc_printbuf_move(struct print_buf *pb_to, struct print_buf *pb_from)  	}  	if (pb_to->size < pb_from->size) { -		tipc_printbuf_reset(pb_to); -		tipc_printf(pb_to, "*** PRINT BUFFER MOVE ERROR ***"); +		strcpy(pb_to->buf, "*** PRINT BUFFER MOVE ERROR ***"); +		pb_to->buf[pb_to->size - 1] = ~0; +		pb_to->crs = strchr(pb_to->buf, 0);  		return;  	}  	/* Copy data from char after cursor to end (if used) */  	len = pb_from->buf + pb_from->size - pb_from->crs - 2; -	if ((pb_from->buf[pb_from->size-1] == 0) && (len > 0)) { +	if ((pb_from->buf[pb_from->size - 1] == 0) && (len > 0)) {  		strcpy(pb_to->buf, pb_from->crs + 1);  		pb_to->crs = pb_to->buf + len;  	} else @@ -203,8 +221,8 @@ void tipc_printbuf_move(struct print_buf *pb_to, struct print_buf *pb_from)  }  /** - * tipc_printf - append formatted output to print buffer chain - * @pb: pointer to chain of print buffers (may be NULL) + * tipc_printf - append formatted output to print buffer + * @pb: pointer to print buffer   * @fmt: formatted info to be printed   */ @@ -213,68 +231,40 @@ void tipc_printf(struct print_buf *pb, const char *fmt, ...)  	int chars_to_add;  	int chars_left;  	char save_char; -	struct print_buf *pb_next;  	spin_lock_bh(&print_lock); +  	FORMAT(print_string, chars_to_add, fmt);  	if (chars_to_add >= TIPC_PB_MAX_STR)  		strcpy(print_string, "*** PRINT BUFFER STRING TOO LONG ***"); -	while (pb) { -		if (pb == TIPC_CONS) -			printk(print_string); -		else if (pb->buf) { -			chars_left = pb->buf + pb->size - pb->crs - 1; -			if (chars_to_add <= chars_left) { -				strcpy(pb->crs, print_string); -				pb->crs += chars_to_add; -			} else if (chars_to_add >= (pb->size - 1)) { -				strcpy(pb->buf, print_string + chars_to_add + 1 -				       - pb->size); -				pb->crs = pb->buf + pb->size - 1; -			} else { -				strcpy(pb->buf, print_string + chars_left); -				save_char = print_string[chars_left]; -				print_string[chars_left] = 0; -				strcpy(pb->crs, print_string); -				print_string[chars_left] = save_char; -				pb->crs = pb->buf + chars_to_add - chars_left; -			} +	if (pb->buf) { +		chars_left = pb->buf + pb->size - pb->crs - 1; +		if (chars_to_add <= chars_left) { +			strcpy(pb->crs, print_string); +			pb->crs += chars_to_add; +		} else if (chars_to_add >= (pb->size - 1)) { +			strcpy(pb->buf, print_string + chars_to_add + 1 +			       - pb->size); +			pb->crs = pb->buf + pb->size - 1; +		} else { +			strcpy(pb->buf, print_string + chars_left); +			save_char = print_string[chars_left]; +			print_string[chars_left] = 0; +			strcpy(pb->crs, print_string); +			print_string[chars_left] = save_char; +			pb->crs = pb->buf + chars_to_add - chars_left;  		} -		pb_next = pb->next; -		pb->next = NULL; -		pb = pb_next;  	} -	spin_unlock_bh(&print_lock); -} -/** - * TIPC_TEE - perform next output operation on both print buffers - * @b0: pointer to chain of print buffers (may be NULL) - * @b1: pointer to print buffer to add to chain - * - * Returns pointer to print buffer chain. - */ +	if (pb->echo) +		printk(print_string); -struct print_buf *TIPC_TEE(struct print_buf *b0, struct print_buf *b1) -{ -	struct print_buf *pb = b0; - -	if (!b0 || (b0 == b1)) -		return b1; - -	spin_lock_bh(&print_lock); -	while (pb->next) { -		if ((pb->next == b1) || (pb->next == b0)) -			pb->next = pb->next->next; -		else -			pb = pb->next; -	} -	pb->next = b1;  	spin_unlock_bh(&print_lock); -	return b0;  } +#ifdef CONFIG_TIPC_DEBUG +  /**   * print_to_console - write string of bytes to console in multiple chunks   */ @@ -321,72 +311,66 @@ static void printbuf_dump(struct print_buf *pb)  }  /** - * tipc_dump - dump non-console print buffer(s) to console - * @pb: pointer to chain of print buffers + * tipc_dump_dbg - dump (non-console) print buffer to console + * @pb: pointer to print buffer   */ -void tipc_dump(struct print_buf *pb, const char *fmt, ...) +void tipc_dump_dbg(struct print_buf *pb, const char *fmt, ...)  { -	struct print_buf *pb_next;  	int len; +	if (pb == TIPC_CONS) +		return; +  	spin_lock_bh(&print_lock); +  	FORMAT(print_string, len, fmt);  	printk(print_string); -	for (; pb; pb = pb->next) { -		if (pb != TIPC_CONS) { -			printk("\n---- Start of %s log dump ----\n\n", -			       (pb == TIPC_LOG) ? "global" : "local"); -			printbuf_dump(pb); -			tipc_printbuf_reset(pb); -			printk("\n---- End of dump ----\n"); -		} -		pb_next = pb->next; -		pb->next = NULL; -		pb = pb_next; -	} +	printk("\n---- Start of %s log dump ----\n\n", +	       (pb == TIPC_LOG) ? "global" : "local"); +	printbuf_dump(pb); +	tipc_printbuf_reset(pb); +	printk("\n---- End of dump ----\n"); +  	spin_unlock_bh(&print_lock);  } +#endif +  /** - * tipc_log_stop - free up TIPC log print buffer + * tipc_log_resize - change the size of the TIPC log buffer + * @log_size: print buffer size to use   */ -void tipc_log_stop(void) +int tipc_log_resize(int log_size)  { +	int res = 0; +  	spin_lock_bh(&print_lock);  	if (TIPC_LOG->buf) {  		kfree(TIPC_LOG->buf);  		TIPC_LOG->buf = NULL;  	} -	spin_unlock_bh(&print_lock); -} - -/** - * tipc_log_reinit - (re)initialize TIPC log print buffer - * @log_size: print buffer size to use - */ - -void tipc_log_reinit(int log_size) -{ -	tipc_log_stop(); -  	if (log_size) {  		if (log_size < TIPC_PB_MIN_SIZE)  			log_size = TIPC_PB_MIN_SIZE; -		spin_lock_bh(&print_lock); +		res = TIPC_LOG->echo;  		tipc_printbuf_init(TIPC_LOG, kmalloc(log_size, GFP_ATOMIC),  				   log_size); -		spin_unlock_bh(&print_lock); +		TIPC_LOG->echo = res; +		res = !TIPC_LOG->buf;  	} +	spin_unlock_bh(&print_lock); + +	return res;  }  /** - * tipc_log_resize - reconfigure size of TIPC log buffer + * tipc_log_resize_cmd - reconfigure size of TIPC log buffer   */ -struct sk_buff *tipc_log_resize(const void *req_tlv_area, int req_tlv_space) +struct sk_buff *tipc_log_resize_cmd(const void *req_tlv_area, int req_tlv_space)  {  	u32 value; @@ -397,7 +381,9 @@ struct sk_buff *tipc_log_resize(const void *req_tlv_area, int req_tlv_space)  	if (value != delimit(value, 0, 32768))  		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE  						   " (log size must be 0-32768)"); -	tipc_log_reinit(value); +	if (tipc_log_resize(value)) +		return tipc_cfg_reply_error_string( +			"unable to create specified log (log size is now 0)");  	return tipc_cfg_reply_none();  } @@ -410,27 +396,32 @@ struct sk_buff *tipc_log_dump(void)  	struct sk_buff *reply;  	spin_lock_bh(&print_lock); -	if (!TIPC_LOG->buf) +	if (!TIPC_LOG->buf) { +		spin_unlock_bh(&print_lock);  		reply = tipc_cfg_reply_ultra_string("log not activated\n"); -	else if (tipc_printbuf_empty(TIPC_LOG)) +	} else if (tipc_printbuf_empty(TIPC_LOG)) { +		spin_unlock_bh(&print_lock);  		reply = tipc_cfg_reply_ultra_string("log is empty\n"); +	}  	else {  		struct tlv_desc *rep_tlv;  		struct print_buf pb;  		int str_len;  		str_len = min(TIPC_LOG->size, 32768u); +		spin_unlock_bh(&print_lock);  		reply = tipc_cfg_reply_alloc(TLV_SPACE(str_len));  		if (reply) {  			rep_tlv = (struct tlv_desc *)reply->data;  			tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), str_len); +			spin_lock_bh(&print_lock);  			tipc_printbuf_move(&pb, TIPC_LOG); +			spin_unlock_bh(&print_lock);  			str_len = strlen(TLV_DATA(rep_tlv)) + 1;  			skb_put(reply, TLV_SPACE(str_len));  			TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);  		}  	} -	spin_unlock_bh(&print_lock);  	return reply;  } diff --git a/net/tipc/dbg.h b/net/tipc/dbg.h index c01b085000e..5ef1bc8f64e 100644 --- a/net/tipc/dbg.h +++ b/net/tipc/dbg.h @@ -2,7 +2,7 @@   * net/tipc/dbg.h: Include file for TIPC print buffer routines   *   * Copyright (c) 1997-2006, Ericsson AB - * Copyright (c) 2005-2006, Wind River Systems + * Copyright (c) 2005-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -42,14 +42,14 @@   * @buf: pointer to character array containing print buffer contents   * @size: size of character array   * @crs: pointer to first unused space in character array (i.e. final NUL) - * @next: used to link print buffers when printing to more than one at a time + * @echo: echo output to system console if non-zero   */  struct print_buf {  	char *buf;  	u32 size;  	char *crs; -	struct print_buf *next; +	int echo;  };  #define TIPC_PB_MIN_SIZE 64	/* minimum size for a print buffer's array */ @@ -61,10 +61,10 @@ int  tipc_printbuf_empty(struct print_buf *pb);  int  tipc_printbuf_validate(struct print_buf *pb);  void tipc_printbuf_move(struct print_buf *pb_to, struct print_buf *pb_from); -void tipc_log_reinit(int log_size); -void tipc_log_stop(void); +int tipc_log_resize(int log_size); -struct sk_buff *tipc_log_resize(const void *req_tlv_area, int req_tlv_space); +struct sk_buff *tipc_log_resize_cmd(const void *req_tlv_area, +				    int req_tlv_space);  struct sk_buff *tipc_log_dump(void);  #endif diff --git a/net/tipc/discover.c b/net/tipc/discover.c index 5d643e5721e..1657f0e795f 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -120,9 +120,8 @@ static struct sk_buff *tipc_disc_init_msg(u32 type,  	if (buf) {  		msg = buf_msg(buf); -		msg_init(msg, LINK_CONFIG, type, TIPC_OK, DSC_H_SIZE, -			 dest_domain); -		msg_set_non_seq(msg); +		msg_init(msg, LINK_CONFIG, type, DSC_H_SIZE, dest_domain); +		msg_set_non_seq(msg, 1);  		msg_set_req_links(msg, req_links);  		msg_set_dest_domain(msg, dest_domain);  		msg_set_bc_netid(msg, tipc_net_id); @@ -156,11 +155,11 @@ static void disc_dupl_alert(struct bearer *b_ptr, u32 node_addr,  /**   * tipc_disc_recv_msg - handle incoming link setup message (request or response)   * @buf: buffer containing message + * @b_ptr: bearer that message arrived on   */ -void tipc_disc_recv_msg(struct sk_buff *buf) +void tipc_disc_recv_msg(struct sk_buff *buf, struct bearer *b_ptr)  { -	struct bearer *b_ptr = (struct bearer *)TIPC_SKB_CB(buf)->handle;  	struct link *link;  	struct tipc_media_addr media_addr;  	struct tipc_msg *msg = buf_msg(buf); @@ -200,9 +199,8 @@ void tipc_disc_recv_msg(struct sk_buff *buf)  		dbg(" in own cluster\n");  		if (n_ptr == NULL) {  			n_ptr = tipc_node_create(orig); -		} -		if (n_ptr == NULL) { -			return; +			if (!n_ptr) +				return;  		}  		spin_lock_bh(&n_ptr->lock);  		link = n_ptr->links[b_ptr->identity]; diff --git a/net/tipc/discover.h b/net/tipc/discover.h index 9fd7587b143..c36eaeb7d5d 100644 --- a/net/tipc/discover.h +++ b/net/tipc/discover.h @@ -48,7 +48,7 @@ struct link_req *tipc_disc_init_link_req(struct bearer *b_ptr,  void tipc_disc_update_link_req(struct link_req *req);  void tipc_disc_stop_link_req(struct link_req *req); -void tipc_disc_recv_msg(struct sk_buff *buf); +void tipc_disc_recv_msg(struct sk_buff *buf, struct bearer *b_ptr);  void tipc_disc_link_event(u32 addr, char *name, int up);  #if 0 diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c index 9cd35eec3e7..fe43ef7dd7e 100644 --- a/net/tipc/eth_media.c +++ b/net/tipc/eth_media.c @@ -82,7 +82,7 @@ static int send_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr,  				 dev->dev_addr, clone->len);  		dev_queue_xmit(clone);  	} -	return TIPC_OK; +	return 0;  }  /** @@ -101,7 +101,7 @@ static int recv_msg(struct sk_buff *buf, struct net_device *dev,  	struct eth_bearer *eb_ptr = (struct eth_bearer *)pt->af_packet_priv;  	u32 size; -	if (dev_net(dev) != &init_net) { +	if (!net_eq(dev_net(dev), &init_net)) {  		kfree_skb(buf);  		return 0;  	} @@ -113,12 +113,12 @@ static int recv_msg(struct sk_buff *buf, struct net_device *dev,  			if (likely(buf->len == size)) {  				buf->next = NULL;  				tipc_recv_msg(buf, eb_ptr->bearer); -				return TIPC_OK; +				return 0;  			}  		}  	}  	kfree_skb(buf); -	return TIPC_OK; +	return 0;  }  /** @@ -198,7 +198,7 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,  	struct eth_bearer *eb_ptr = ð_bearers[0];  	struct eth_bearer *stop = ð_bearers[MAX_ETH_BEARERS]; -	if (dev_net(dev) != &init_net) +	if (!net_eq(dev_net(dev), &init_net))  		return NOTIFY_DONE;  	while ((eb_ptr->dev != dev)) { diff --git a/net/tipc/link.c b/net/tipc/link.c index 2a26a16e269..d60113ba4b1 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -51,6 +51,12 @@  /* + * Out-of-range value for link session numbers + */ + +#define INVALID_SESSION 0x10000 + +/*   * Limit for deferred reception queue:   */ @@ -147,9 +153,21 @@ static void link_print(struct link *l_ptr, struct print_buf *buf,  #define LINK_LOG_BUF_SIZE 0 -#define dbg_link(fmt, arg...)  do {if (LINK_LOG_BUF_SIZE) tipc_printf(&l_ptr->print_buf, fmt, ## arg); } while(0) -#define dbg_link_msg(msg, txt) do {if (LINK_LOG_BUF_SIZE) tipc_msg_print(&l_ptr->print_buf, msg, txt); } while(0) -#define dbg_link_state(txt) do {if (LINK_LOG_BUF_SIZE) link_print(l_ptr, &l_ptr->print_buf, txt); } while(0) +#define dbg_link(fmt, arg...) \ +	do { \ +		if (LINK_LOG_BUF_SIZE) \ +			tipc_printf(&l_ptr->print_buf, fmt, ## arg); \ +	} while (0) +#define dbg_link_msg(msg, txt) \ +	do { \ +		if (LINK_LOG_BUF_SIZE) \ +			tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \ +	} while (0) +#define dbg_link_state(txt) \ +	do { \ +		if (LINK_LOG_BUF_SIZE) \ +			link_print(l_ptr, &l_ptr->print_buf, txt); \ +	} while (0)  #define dbg_link_dump() do { \  	if (LINK_LOG_BUF_SIZE) { \  		tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \ @@ -450,9 +468,9 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,  	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;  	msg = l_ptr->pmsg; -	msg_init(msg, LINK_PROTOCOL, RESET_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); +	msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);  	msg_set_size(msg, sizeof(l_ptr->proto_msg)); -	msg_set_session(msg, tipc_random); +	msg_set_session(msg, (tipc_random & 0xffff));  	msg_set_bearer_id(msg, b_ptr->identity);  	strcpy((char *)msg_data(msg), if_name); @@ -693,10 +711,10 @@ void tipc_link_reset(struct link *l_ptr)  	u32 checkpoint = l_ptr->next_in_no;  	int was_active_link = tipc_link_is_active(l_ptr); -	msg_set_session(l_ptr->pmsg, msg_session(l_ptr->pmsg) + 1); +	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); -	/* Link is down, accept any session: */ -	l_ptr->peer_session = 0; +	/* Link is down, accept any session */ +	l_ptr->peer_session = INVALID_SESSION;  	/* Prepare for max packet size negotiation */  	link_init_max_pkt(l_ptr); @@ -1110,7 +1128,7 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)  			if (bundler) {  				msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, -					 TIPC_OK, INT_H_SIZE, l_ptr->addr); +					 INT_H_SIZE, l_ptr->addr);  				skb_copy_to_linear_data(bundler, &bundler_hdr,  							INT_H_SIZE);  				skb_trim(bundler, INT_H_SIZE); @@ -1374,7 +1392,7 @@ again:  	msg_dbg(hdr, ">FRAGMENTING>");  	msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, -		 TIPC_OK, INT_H_SIZE, msg_destnode(hdr)); +		 INT_H_SIZE, msg_destnode(hdr));  	msg_set_link_selector(&fragm_hdr, sender->publ.ref);  	msg_set_size(&fragm_hdr, max_pkt);  	msg_set_fragm_no(&fragm_hdr, 1); @@ -1543,7 +1561,7 @@ u32 tipc_link_push_packet(struct link *l_ptr)  			l_ptr->retransm_queue_head = mod(++r_q_head);  			l_ptr->retransm_queue_size = --r_q_size;  			l_ptr->stats.retransmitted++; -			return TIPC_OK; +			return 0;  		} else {  			l_ptr->stats.bearer_congs++;  			msg_dbg(buf_msg(buf), "|>DEF-RETR>"); @@ -1562,7 +1580,7 @@ u32 tipc_link_push_packet(struct link *l_ptr)  			l_ptr->unacked_window = 0;  			buf_discard(buf);  			l_ptr->proto_msg_queue = NULL; -			return TIPC_OK; +			return 0;  		} else {  			msg_dbg(buf_msg(buf), "|>DEF-PROT>");  			l_ptr->stats.bearer_congs++; @@ -1586,7 +1604,7 @@ u32 tipc_link_push_packet(struct link *l_ptr)  					msg_set_type(msg, CLOSED_MSG);  				msg_dbg(msg, ">PUSH-DATA>");  				l_ptr->next_out = buf->next; -				return TIPC_OK; +				return 0;  			} else {  				msg_dbg(msg, "|PUSH-DATA|");  				l_ptr->stats.bearer_congs++; @@ -1610,8 +1628,8 @@ void tipc_link_push_queue(struct link *l_ptr)  	do {  		res = tipc_link_push_packet(l_ptr); -	} -	while (res == TIPC_OK); +	} while (!res); +  	if (res == PUSH_FAILED)  		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);  } @@ -1651,7 +1669,7 @@ static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)  	struct tipc_msg *msg = buf_msg(buf);  	warn("Retransmission failure on link <%s>\n", l_ptr->name); -	tipc_msg_print(TIPC_OUTPUT, msg, ">RETR-FAIL>"); +	tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>");  	if (l_ptr->addr) { @@ -1748,21 +1766,6 @@ void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,  	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;  } -/* - * link_recv_non_seq: Receive packets which are outside - *                    the link sequence flow - */ - -static void link_recv_non_seq(struct sk_buff *buf) -{ -	struct tipc_msg *msg = buf_msg(buf); - -	if (msg_user(msg) ==  LINK_CONFIG) -		tipc_disc_recv_msg(buf); -	else -		tipc_bclink_recv_pkt(buf); -} -  /**   * link_insert_deferred_queue - insert deferred messages back into receive chain   */ @@ -1839,7 +1842,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)  {  	read_lock_bh(&tipc_net_lock);  	while (head) { -		struct bearer *b_ptr; +		struct bearer *b_ptr = (struct bearer *)tb_ptr;  		struct node *n_ptr;  		struct link *l_ptr;  		struct sk_buff *crs; @@ -1850,9 +1853,6 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)  		u32 released = 0;  		int type; -		b_ptr = (struct bearer *)tb_ptr; -		TIPC_SKB_CB(buf)->handle = b_ptr; -  		head = head->next;  		/* Ensure message is well-formed */ @@ -1871,7 +1871,10 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)  		msg = buf_msg(buf);  		if (unlikely(msg_non_seq(msg))) { -			link_recv_non_seq(buf); +			if (msg_user(msg) ==  LINK_CONFIG) +				tipc_disc_recv_msg(buf, b_ptr); +			else +				tipc_bclink_recv_pkt(buf);  			continue;  		} @@ -1978,8 +1981,6 @@ deliver:  						if (link_recv_changeover_msg(&l_ptr, &buf)) {  							msg = buf_msg(buf);  							seq_no = msg_seqno(msg); -							TIPC_SKB_CB(buf)->handle -								= b_ptr;  							if (type == ORIGINAL_MSG)  								goto deliver;  							goto protocol_check; @@ -2263,7 +2264,8 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)  	switch (msg_type(msg)) {  	case RESET_MSG: -		if (!link_working_unknown(l_ptr) && l_ptr->peer_session) { +		if (!link_working_unknown(l_ptr) && +		    (l_ptr->peer_session != INVALID_SESSION)) {  			if (msg_session(msg) == l_ptr->peer_session) {  				dbg("Duplicate RESET: %u<->%u\n",  				    msg_session(msg), l_ptr->peer_session); @@ -2424,7 +2426,7 @@ void tipc_link_changeover(struct link *l_ptr)  	}  	msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, -		 ORIGINAL_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); +		 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);  	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);  	msg_set_msgcnt(&tunnel_hdr, msgcount);  	dbg("Link changeover requires %u tunnel messages\n", msgcount); @@ -2479,7 +2481,7 @@ void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)  	struct tipc_msg tunnel_hdr;  	msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, -		 DUPLICATE_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); +		 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);  	msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);  	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);  	iter = l_ptr->first_out; @@ -2672,10 +2674,12 @@ int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)  	u32 pack_sz = link_max_pkt(l_ptr);  	u32 fragm_sz = pack_sz - INT_H_SIZE;  	u32 fragm_no = 1; -	u32 destaddr = msg_destnode(inmsg); +	u32 destaddr;  	if (msg_short(inmsg))  		destaddr = l_ptr->addr; +	else +		destaddr = msg_destnode(inmsg);  	if (msg_routed(inmsg))  		msg_set_prevnode(inmsg, tipc_own_addr); @@ -2683,7 +2687,7 @@ int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)  	/* Prepare reusable fragment header: */  	msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, -		 TIPC_OK, INT_H_SIZE, destaddr); +		 INT_H_SIZE, destaddr);  	msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));  	msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));  	msg_set_fragm_no(&fragm_hdr, fragm_no); @@ -2994,7 +2998,7 @@ struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space  			link_set_supervision_props(l_ptr, new_value);  			tipc_link_send_proto_msg(l_ptr, STATE_MSG,  						 0, 0, new_value, 0, 0); -			res = TIPC_OK; +			res = 0;  		}  		break;  	case TIPC_CMD_SET_LINK_PRI: @@ -3003,14 +3007,14 @@ struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space  			l_ptr->priority = new_value;  			tipc_link_send_proto_msg(l_ptr, STATE_MSG,  						 0, 0, 0, new_value, 0); -			res = TIPC_OK; +			res = 0;  		}  		break;  	case TIPC_CMD_SET_LINK_WINDOW:  		if ((new_value >= TIPC_MIN_LINK_WIN) &&  		    (new_value <= TIPC_MAX_LINK_WIN)) {  			tipc_link_set_queue_limits(l_ptr, new_value); -			res = TIPC_OK; +			res = 0;  		}  		break;  	} @@ -3226,7 +3230,7 @@ int link_control(const char *name, u32 op, u32 val)  			if (op == TIPC_CMD_UNBLOCK_LINK) {  				l_ptr->blocked = 0;  			} -			res = TIPC_OK; +			res = 0;  		}  		tipc_node_unlock(node);  	} diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 696a8633df7..73dcd00d674 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -41,7 +41,9 @@  #include "bearer.h" -void tipc_msg_print(struct print_buf *buf, struct tipc_msg *msg, const char *str) +#ifdef CONFIG_TIPC_DEBUG + +void tipc_msg_dbg(struct print_buf *buf, struct tipc_msg *msg, const char *str)  {  	u32 usr = msg_user(msg);  	tipc_printf(buf, str); @@ -228,13 +230,10 @@ void tipc_msg_print(struct print_buf *buf, struct tipc_msg *msg, const char *str  	switch (usr) {  	case CONN_MANAGER: -	case NAME_DISTRIBUTOR:  	case TIPC_LOW_IMPORTANCE:  	case TIPC_MEDIUM_IMPORTANCE:  	case TIPC_HIGH_IMPORTANCE:  	case TIPC_CRITICAL_IMPORTANCE: -		if (msg_short(msg)) -			break;	/* No error */  		switch (msg_errcode(msg)) {  		case TIPC_OK:  			break; @@ -315,9 +314,11 @@ void tipc_msg_print(struct print_buf *buf, struct tipc_msg *msg, const char *str  	}  	tipc_printf(buf, "\n");  	if ((usr == CHANGEOVER_PROTOCOL) && (msg_msgcnt(msg))) { -		tipc_msg_print(buf,msg_get_wrapped(msg),"      /"); +		tipc_msg_dbg(buf, msg_get_wrapped(msg), "      /");  	}  	if ((usr == MSG_FRAGMENTER) && (msg_type(msg) == FIRST_FRAGMENT)) { -		tipc_msg_print(buf,msg_get_wrapped(msg),"      /"); +		tipc_msg_dbg(buf, msg_get_wrapped(msg), "      /");  	}  } + +#endif diff --git a/net/tipc/msg.h b/net/tipc/msg.h index ad487e8abcc..7ee6ae23814 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -2,7 +2,7 @@   * net/tipc/msg.h: Include file for TIPC message header routines   *   * Copyright (c) 2000-2007, Ericsson AB - * Copyright (c) 2005-2007, Wind River Systems + * Copyright (c) 2005-2008, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -75,6 +75,14 @@ static inline void msg_set_bits(struct tipc_msg *m, u32 w,  	m->hdr[w] |= htonl(val);  } +static inline void msg_swap_words(struct tipc_msg *msg, u32 a, u32 b) +{ +	u32 temp = msg->hdr[a]; + +	msg->hdr[a] = msg->hdr[b]; +	msg->hdr[b] = temp; +} +  /*   * Word 0   */ @@ -119,9 +127,9 @@ static inline int msg_non_seq(struct tipc_msg *m)  	return msg_bits(m, 0, 20, 1);  } -static inline void msg_set_non_seq(struct tipc_msg *m) +static inline void msg_set_non_seq(struct tipc_msg *m, u32 n)  { -	msg_set_bits(m, 0, 20, 1, 1); +	msg_set_bits(m, 0, 20, 1, n);  }  static inline int msg_dest_droppable(struct tipc_msg *m) @@ -224,6 +232,25 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n)  	msg_set_bits(m, 2, 0, 0xffff, n);  } +/* + * TIPC may utilize the "link ack #" and "link seq #" fields of a short + * message header to hold the destination node for the message, since the + * normal "dest node" field isn't present.  This cache is only referenced + * when required, so populating the cache of a longer message header is + * harmless (as long as the header has the two link sequence fields present). + * + * Note: Host byte order is OK here, since the info never goes off-card. + */ + +static inline u32 msg_destnode_cache(struct tipc_msg *m) +{ +	return m->hdr[2]; +} + +static inline void msg_set_destnode_cache(struct tipc_msg *m, u32 dnode) +{ +	m->hdr[2] = dnode; +}  /*   * Words 3-10 @@ -325,7 +352,7 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+     w0:|vers |msg usr|hdr sz |n|resrv|            packet size          |        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -   w1:|m typ|rsv=0|   sequence gap    |       broadcast ack no        | +   w1:|m typ|      sequence gap       |       broadcast ack no        |        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+     w2:| link level ack no/bc_gap_from |     seq no / bcast_gap_to     |        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ @@ -388,12 +415,12 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)  static inline u32 msg_seq_gap(struct tipc_msg *m)  { -	return msg_bits(m, 1, 16, 0xff); +	return msg_bits(m, 1, 16, 0x1fff);  }  static inline void msg_set_seq_gap(struct tipc_msg *m, u32 n)  { -	msg_set_bits(m, 1, 16, 0xff, n); +	msg_set_bits(m, 1, 16, 0x1fff, n);  }  static inline u32 msg_req_links(struct tipc_msg *m) @@ -696,7 +723,7 @@ static inline u32 msg_tot_importance(struct tipc_msg *m)  static inline void msg_init(struct tipc_msg *m, u32 user, u32 type, -			    u32 err, u32 hsize, u32 destnode) +			    u32 hsize, u32 destnode)  {  	memset(m, 0, hsize);  	msg_set_version(m); @@ -705,7 +732,6 @@ static inline void msg_init(struct tipc_msg *m, u32 user, u32 type,  	msg_set_size(m, hsize);  	msg_set_prevnode(m, tipc_own_addr);  	msg_set_type(m, type); -	msg_set_errcode(m, err);  	if (!msg_short(m)) {  		msg_set_orignode(m, tipc_own_addr);  		msg_set_destnode(m, destnode); diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index 39fd1619feb..10a69894e2f 100644 --- a/net/tipc/name_distr.c +++ b/net/tipc/name_distr.c @@ -41,9 +41,6 @@  #include "msg.h"  #include "name_distr.h" -#undef  DBG_OUTPUT -#define DBG_OUTPUT NULL -  #define ITEM_SIZE sizeof(struct distr_item)  /** @@ -106,8 +103,7 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)  	if (buf != NULL) {  		msg = buf_msg(buf); -		msg_init(msg, NAME_DISTRIBUTOR, type, TIPC_OK, -			 LONG_H_SIZE, dest); +		msg_init(msg, NAME_DISTRIBUTOR, type, LONG_H_SIZE, dest);  		msg_set_size(msg, LONG_H_SIZE + size);  	}  	return buf; diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index ac7dfdda797..cd72e22b132 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -2,7 +2,7 @@   * net/tipc/name_table.c: TIPC name table code   *   * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2004-2005, Wind River Systems + * Copyright (c) 2004-2008, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -52,9 +52,16 @@ static int tipc_nametbl_size = 1024;		/* must be a power of 2 */   * struct sub_seq - container for all published instances of a name sequence   * @lower: name sequence lower bound   * @upper: name sequence upper bound - * @node_list: circular list of matching publications with >= node scope - * @cluster_list: circular list of matching publications with >= cluster scope - * @zone_list: circular list of matching publications with >= zone scope + * @node_list: circular list of publications made by own node + * @cluster_list: circular list of publications made by own cluster + * @zone_list: circular list of publications made by own zone + * @node_list_size: number of entries in "node_list" + * @cluster_list_size: number of entries in "cluster_list" + * @zone_list_size: number of entries in "zone_list" + * + * Note: The zone list always contains at least one entry, since all + *       publications of the associated name sequence belong to it. + *       (The cluster and node lists may be empty.)   */  struct sub_seq { @@ -63,6 +70,9 @@ struct sub_seq {  	struct publication *node_list;  	struct publication *cluster_list;  	struct publication *zone_list; +	u32 node_list_size; +	u32 cluster_list_size; +	u32 zone_list_size;  };  /** @@ -74,7 +84,7 @@ struct sub_seq {   * @first_free: array index of first unused sub-sequence entry   * @ns_list: links to adjacent name sequences in hash chain   * @subscriptions: list of subscriptions for this 'type' - * @lock: spinlock controlling access to name sequence structure + * @lock: spinlock controlling access to publication lists of all sub-sequences   */  struct name_seq { @@ -317,6 +327,7 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,  	dbg("inserting publ %p, node=0x%x publ->node=0x%x, subscr->node=%p\n",  	    publ, node, publ->node, publ->subscr.node); +	sseq->zone_list_size++;  	if (!sseq->zone_list)  		sseq->zone_list = publ->zone_list_next = publ;  	else { @@ -325,6 +336,7 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,  	}  	if (in_own_cluster(node)) { +		sseq->cluster_list_size++;  		if (!sseq->cluster_list)  			sseq->cluster_list = publ->cluster_list_next = publ;  		else { @@ -335,6 +347,7 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,  	}  	if (node == tipc_own_addr) { +		sseq->node_list_size++;  		if (!sseq->node_list)  			sseq->node_list = publ->node_list_next = publ;  		else { @@ -411,6 +424,7 @@ static struct publication *tipc_nameseq_remove_publ(struct name_seq *nseq, u32 i  	} else {  		sseq->zone_list = NULL;  	} +	sseq->zone_list_size--;  	/* Remove publication from cluster scope list, if present */ @@ -439,6 +453,7 @@ static struct publication *tipc_nameseq_remove_publ(struct name_seq *nseq, u32 i  		} else {  			sseq->cluster_list = NULL;  		} +		sseq->cluster_list_size--;  	}  end_cluster: @@ -469,6 +484,7 @@ end_cluster:  		} else {  			sseq->node_list = NULL;  		} +		sseq->node_list_size--;  	}  end_node: @@ -709,15 +725,18 @@ int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,  		if (sseq->lower > upper)  			break; -		publ = sseq->cluster_list; -		if (publ && (publ->scope <= limit)) + +		publ = sseq->node_list; +		if (publ) {  			do { -				if (publ->node == tipc_own_addr) +				if (publ->scope <= limit)  					tipc_port_list_add(dports, publ->ref); -				else -					res = 1; -				publ = publ->cluster_list_next; -			} while (publ != sseq->cluster_list); +				publ = publ->node_list_next; +			} while (publ != sseq->node_list); +		} + +		if (sseq->cluster_list_size != sseq->node_list_size) +			res = 1;  	}  	spin_unlock_bh(&seq->lock); @@ -905,6 +924,9 @@ static void nameseq_list(struct name_seq *seq, struct print_buf *buf, u32 depth,  	struct sub_seq *sseq;  	char typearea[11]; +	if (seq->first_free == 0) +		return; +  	sprintf(typearea, "%-10u", seq->type);  	if (depth == 1) { @@ -915,7 +937,9 @@ static void nameseq_list(struct name_seq *seq, struct print_buf *buf, u32 depth,  	for (sseq = seq->sseqs; sseq != &seq->sseqs[seq->first_free]; sseq++) {  		if ((lowbound <= sseq->upper) && (upbound >= sseq->lower)) {  			tipc_printf(buf, "%s ", typearea); +			spin_lock_bh(&seq->lock);  			subseq_list(sseq, buf, depth, index); +			spin_unlock_bh(&seq->lock);  			sprintf(typearea, "%10s", " ");  		}  	} @@ -1050,15 +1074,12 @@ void tipc_nametbl_dump(void)  int tipc_nametbl_init(void)  { -	int array_size = sizeof(struct hlist_head) * tipc_nametbl_size; - -	table.types = kzalloc(array_size, GFP_ATOMIC); +	table.types = kcalloc(tipc_nametbl_size, sizeof(struct hlist_head), +			      GFP_ATOMIC);  	if (!table.types)  		return -ENOMEM; -	write_lock_bh(&tipc_nametbl_lock);  	table.local_publ_count = 0; -	write_unlock_bh(&tipc_nametbl_lock);  	return 0;  } diff --git a/net/tipc/net.c b/net/tipc/net.c index c39c76201e8..ec7b04fbdc4 100644 --- a/net/tipc/net.c +++ b/net/tipc/net.c @@ -165,7 +165,7 @@ static int net_init(void)  	if (!tipc_net.zones) {  		return -ENOMEM;  	} -	return TIPC_OK; +	return 0;  }  static void net_stop(void) @@ -266,7 +266,7 @@ void tipc_net_route_msg(struct sk_buff *buf)  	tipc_link_send(buf, dnode, msg_link_selector(msg));  } -int tipc_net_start(void) +int tipc_net_start(u32 addr)  {  	char addr_string[16];  	int res; @@ -274,6 +274,10 @@ int tipc_net_start(void)  	if (tipc_mode != TIPC_NODE_MODE)  		return -ENOPROTOOPT; +	tipc_subscr_stop(); +	tipc_cfg_stop(); + +	tipc_own_addr = addr;  	tipc_mode = TIPC_NET_MODE;  	tipc_named_reinit();  	tipc_port_reinit(); @@ -284,14 +288,14 @@ int tipc_net_start(void)  	    (res = tipc_bclink_init())) {  		return res;  	} -	tipc_subscr_stop(); -	tipc_cfg_stop(); +  	tipc_k_signal((Handler)tipc_subscr_start, 0);  	tipc_k_signal((Handler)tipc_cfg_init, 0); +  	info("Started in network mode\n");  	info("Own node address %s, network identity %u\n",  	     addr_string_fill(addr_string, tipc_own_addr), tipc_net_id); -	return TIPC_OK; +	return 0;  }  void tipc_net_stop(void) diff --git a/net/tipc/net.h b/net/tipc/net.h index a6a0e9976ac..d154ac2bda9 100644 --- a/net/tipc/net.h +++ b/net/tipc/net.h @@ -58,7 +58,7 @@ void tipc_net_route_msg(struct sk_buff *buf);  struct node *tipc_net_select_remote_node(u32 addr, u32 ref);  u32 tipc_net_select_router(u32 addr, u32 ref); -int tipc_net_start(void); +int tipc_net_start(u32 addr);  void tipc_net_stop(void);  #endif diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c index 6a7f7b4c259..c387217bb23 100644 --- a/net/tipc/netlink.c +++ b/net/tipc/netlink.c @@ -2,7 +2,7 @@   * net/tipc/netlink.c: TIPC configuration handling   *   * Copyright (c) 2005-2006, Ericsson AB - * Copyright (c) 2005, Wind River Systems + * Copyright (c) 2005-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -45,15 +45,17 @@ static int handle_cmd(struct sk_buff *skb, struct genl_info *info)  	struct nlmsghdr *req_nlh = info->nlhdr;  	struct tipc_genlmsghdr *req_userhdr = info->userhdr;  	int hdr_space = NLMSG_SPACE(GENL_HDRLEN + TIPC_GENL_HDRLEN); +	u16 cmd;  	if ((req_userhdr->cmd & 0xC000) && (!capable(CAP_NET_ADMIN))) -		rep_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_NET_ADMIN); +		cmd = TIPC_CMD_NOT_NET_ADMIN;  	else -		rep_buf = tipc_cfg_do_cmd(req_userhdr->dest, -					  req_userhdr->cmd, -					  NLMSG_DATA(req_nlh) + GENL_HDRLEN + TIPC_GENL_HDRLEN, -					  NLMSG_PAYLOAD(req_nlh, GENL_HDRLEN + TIPC_GENL_HDRLEN), -					  hdr_space); +		cmd = req_userhdr->cmd; + +	rep_buf = tipc_cfg_do_cmd(req_userhdr->dest, cmd, +			NLMSG_DATA(req_nlh) + GENL_HDRLEN + TIPC_GENL_HDRLEN, +			NLMSG_PAYLOAD(req_nlh, GENL_HDRLEN + TIPC_GENL_HDRLEN), +			hdr_space);  	if (rep_buf) {  		skb_push(rep_buf, hdr_space); diff --git a/net/tipc/node.c b/net/tipc/node.c index 598f4d3a009..ee952ad6021 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -52,16 +52,40 @@ static void node_established_contact(struct node *n_ptr);  struct node *tipc_nodes = NULL;	/* sorted list of nodes within cluster */ +static DEFINE_SPINLOCK(node_create_lock); +  u32 tipc_own_tag = 0; +/** + * tipc_node_create - create neighboring node + * + * Currently, this routine is called by neighbor discovery code, which holds + * net_lock for reading only.  We must take node_create_lock to ensure a node + * isn't created twice if two different bearers discover the node at the same + * time.  (It would be preferable to switch to holding net_lock in write mode, + * but this is a non-trivial change.) + */ +  struct node *tipc_node_create(u32 addr)  {  	struct cluster *c_ptr;  	struct node *n_ptr;  	struct node **curr_node; +	spin_lock_bh(&node_create_lock); + +	for (n_ptr = tipc_nodes; n_ptr; n_ptr = n_ptr->next) { +		if (addr < n_ptr->addr) +			break; +		if (addr == n_ptr->addr) { +			spin_unlock_bh(&node_create_lock); +			return n_ptr; +		} +	} +  	n_ptr = kzalloc(sizeof(*n_ptr),GFP_ATOMIC);  	if (!n_ptr) { +		spin_unlock_bh(&node_create_lock);  		warn("Node creation failed, no memory\n");  		return NULL;  	} @@ -71,6 +95,7 @@ struct node *tipc_node_create(u32 addr)  		c_ptr = tipc_cltr_create(addr);  	}  	if (!c_ptr) { +		spin_unlock_bh(&node_create_lock);  		kfree(n_ptr);  		return NULL;  	} @@ -91,6 +116,7 @@ struct node *tipc_node_create(u32 addr)  		}  	}  	(*curr_node) = n_ptr; +	spin_unlock_bh(&node_create_lock);  	return n_ptr;  } @@ -574,12 +600,14 @@ u32 tipc_available_nodes(const u32 domain)  	struct node *n_ptr;  	u32 cnt = 0; +	read_lock_bh(&tipc_net_lock);  	for (n_ptr = tipc_nodes; n_ptr; n_ptr = n_ptr->next) {  		if (!in_scope(domain, n_ptr->addr))  			continue;  		if (tipc_node_is_up(n_ptr))  			cnt++;  	} +	read_unlock_bh(&tipc_net_lock);  	return cnt;  } @@ -599,19 +627,26 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)  		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE  						   " (network address)"); -	if (!tipc_nodes) +	read_lock_bh(&tipc_net_lock); +	if (!tipc_nodes) { +		read_unlock_bh(&tipc_net_lock);  		return tipc_cfg_reply_none(); +	}  	/* For now, get space for all other nodes  	   (will need to modify this when slave nodes are supported */  	payload_size = TLV_SPACE(sizeof(node_info)) * (tipc_max_nodes - 1); -	if (payload_size > 32768u) +	if (payload_size > 32768u) { +		read_unlock_bh(&tipc_net_lock);  		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  						   " (too many nodes)"); +	}  	buf = tipc_cfg_reply_alloc(payload_size); -	if (!buf) +	if (!buf) { +		read_unlock_bh(&tipc_net_lock);  		return NULL; +	}  	/* Add TLVs for all nodes in scope */ @@ -624,6 +659,7 @@ struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)  				    &node_info, sizeof(node_info));  	} +	read_unlock_bh(&tipc_net_lock);  	return buf;  } @@ -646,16 +682,22 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)  	if (tipc_mode != TIPC_NET_MODE)  		return tipc_cfg_reply_none(); +	read_lock_bh(&tipc_net_lock); +  	/* Get space for all unicast links + multicast link */  	payload_size = TLV_SPACE(sizeof(link_info)) *  		(tipc_net.zones[tipc_zone(tipc_own_addr)]->links + 1); -	if (payload_size > 32768u) +	if (payload_size > 32768u) { +		read_unlock_bh(&tipc_net_lock);  		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED  						   " (too many links)"); +	}  	buf = tipc_cfg_reply_alloc(payload_size); -	if (!buf) +	if (!buf) { +		read_unlock_bh(&tipc_net_lock);  		return NULL; +	}  	/* Add TLV for broadcast link */ @@ -671,6 +713,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)  		if (!in_scope(domain, n_ptr->addr))  			continue; +		tipc_node_lock(n_ptr);  		for (i = 0; i < MAX_BEARERS; i++) {  			if (!n_ptr->links[i])  				continue; @@ -680,7 +723,9 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)  			tipc_cfg_append_tlv(buf, TIPC_TLV_LINK_INFO,  					    &link_info, sizeof(link_info));  		} +		tipc_node_unlock(n_ptr);  	} +	read_unlock_bh(&tipc_net_lock);  	return buf;  } diff --git a/net/tipc/port.c b/net/tipc/port.c index 2f5806410c6..e70d27ea657 100644 --- a/net/tipc/port.c +++ b/net/tipc/port.c @@ -2,7 +2,7 @@   * net/tipc/port.c: TIPC port code   *   * Copyright (c) 1992-2007, Ericsson AB - * Copyright (c) 2004-2007, Wind River Systems + * Copyright (c) 2004-2008, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -211,12 +211,12 @@ exit:  }  /** - * tipc_createport_raw - create a native TIPC port + * tipc_createport_raw - create a generic TIPC port   * - * Returns local port reference + * Returns pointer to (locked) TIPC port, or NULL if unable to create it   */ -u32 tipc_createport_raw(void *usr_handle, +struct tipc_port *tipc_createport_raw(void *usr_handle,  			u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),  			void (*wakeup)(struct tipc_port *),  			const u32 importance) @@ -228,26 +228,21 @@ u32 tipc_createport_raw(void *usr_handle,  	p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);  	if (!p_ptr) {  		warn("Port creation failed, no memory\n"); -		return 0; +		return NULL;  	}  	ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);  	if (!ref) {  		warn("Port creation failed, reference table exhausted\n");  		kfree(p_ptr); -		return 0; +		return NULL;  	} -	tipc_port_lock(ref);  	p_ptr->publ.usr_handle = usr_handle;  	p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;  	p_ptr->publ.ref = ref;  	msg = &p_ptr->publ.phdr; -	msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, -		 0); -	msg_set_orignode(msg, tipc_own_addr); -	msg_set_prevnode(msg, tipc_own_addr); +	msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);  	msg_set_origport(msg, ref); -	msg_set_importance(msg,importance);  	p_ptr->last_in_seqno = 41;  	p_ptr->sent = 1;  	INIT_LIST_HEAD(&p_ptr->wait_list); @@ -262,8 +257,7 @@ u32 tipc_createport_raw(void *usr_handle,  	INIT_LIST_HEAD(&p_ptr->port_list);  	list_add_tail(&p_ptr->port_list, &ports);  	spin_unlock_bh(&tipc_port_list_lock); -	tipc_port_unlock(p_ptr); -	return ref; +	return &(p_ptr->publ);  }  int tipc_deleteport(u32 ref) @@ -297,7 +291,7 @@ int tipc_deleteport(u32 ref)  	kfree(p_ptr);  	dbg("Deleted port %u\n", ref);  	tipc_net_route_msg(buf); -	return TIPC_OK; +	return 0;  }  /** @@ -342,7 +336,7 @@ int tipc_portunreliable(u32 ref, unsigned int *isunreliable)  		return -EINVAL;  	*isunreliable = port_unreliable(p_ptr);  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  }  int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) @@ -354,7 +348,7 @@ int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)  		return -EINVAL;  	msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  }  static int port_unreturnable(struct port *p_ptr) @@ -371,7 +365,7 @@ int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)  		return -EINVAL;  	*isunrejectable = port_unreturnable(p_ptr);  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  }  int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) @@ -383,7 +377,7 @@ int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)  		return -EINVAL;  	msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  }  /* @@ -402,10 +396,10 @@ static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,  	buf = buf_acquire(LONG_H_SIZE);  	if (buf) {  		msg = buf_msg(buf); -		msg_init(msg, usr, type, err, LONG_H_SIZE, destnode); +		msg_init(msg, usr, type, LONG_H_SIZE, destnode); +		msg_set_errcode(msg, err);  		msg_set_destport(msg, destport);  		msg_set_origport(msg, origport); -		msg_set_destnode(msg, destnode);  		msg_set_orignode(msg, orignode);  		msg_set_transp_seqno(msg, seqno);  		msg_set_msgcnt(msg, ack); @@ -446,17 +440,19 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)  		return data_sz;  	}  	rmsg = buf_msg(rbuf); -	msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg)); +	msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg)); +	msg_set_errcode(rmsg, err);  	msg_set_destport(rmsg, msg_origport(msg)); -	msg_set_prevnode(rmsg, tipc_own_addr);  	msg_set_origport(rmsg, msg_destport(msg)); -	if (msg_short(msg)) +	if (msg_short(msg)) {  		msg_set_orignode(rmsg, tipc_own_addr); -	else +		/* leave name type & instance as zeroes */ +	} else {  		msg_set_orignode(rmsg, msg_destnode(msg)); +		msg_set_nametype(rmsg, msg_nametype(msg)); +		msg_set_nameinst(rmsg, msg_nameinst(msg)); +	}  	msg_set_size(rmsg, data_sz + hdr_sz); -	msg_set_nametype(rmsg, msg_nametype(msg)); -	msg_set_nameinst(rmsg, msg_nameinst(msg));  	skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz);  	/* send self-abort message when rejecting on a connected port */ @@ -778,6 +774,7 @@ void tipc_port_reinit(void)  		msg = &p_ptr->publ.phdr;  		if (msg_orignode(msg) == tipc_own_addr)  			break; +		msg_set_prevnode(msg, tipc_own_addr);  		msg_set_orignode(msg, tipc_own_addr);  	}  	spin_unlock_bh(&tipc_port_list_lock); @@ -838,16 +835,13 @@ static void port_dispatcher_sigh(void *dummy)  				u32 peer_node = port_peernode(p_ptr);  				tipc_port_unlock(p_ptr); +				if (unlikely(!cb)) +					goto reject;  				if (unlikely(!connected)) { -					if (unlikely(published)) +					if (tipc_connect2port(dref, &orig))  						goto reject; -					tipc_connect2port(dref,&orig); -				} -				if (unlikely(msg_origport(msg) != peer_port)) -					goto reject; -				if (unlikely(msg_orignode(msg) != peer_node)) -					goto reject; -				if (unlikely(!cb)) +				} else if ((msg_origport(msg) != peer_port) || +					   (msg_orignode(msg) != peer_node))  					goto reject;  				if (unlikely(++p_ptr->publ.conn_unacked >=  					     TIPC_FLOW_CONTROL_WIN)) @@ -862,9 +856,7 @@ static void port_dispatcher_sigh(void *dummy)  				tipc_msg_event cb = up_ptr->msg_cb;  				tipc_port_unlock(p_ptr); -				if (unlikely(connected)) -					goto reject; -				if (unlikely(!cb)) +				if (unlikely(!cb || connected))  					goto reject;  				skb_pull(buf, msg_hdr_sz(msg));  				cb(usr_handle, dref, &buf, msg_data(msg), @@ -877,11 +869,7 @@ static void port_dispatcher_sigh(void *dummy)  				tipc_named_msg_event cb = up_ptr->named_msg_cb;  				tipc_port_unlock(p_ptr); -				if (unlikely(connected)) -					goto reject; -				if (unlikely(!cb)) -					goto reject; -				if (unlikely(!published)) +				if (unlikely(!cb || connected || !published))  					goto reject;  				dseq.type =  msg_nametype(msg);  				dseq.lower = msg_nameinst(msg); @@ -908,11 +896,10 @@ err:  				u32 peer_node = port_peernode(p_ptr);  				tipc_port_unlock(p_ptr); -				if (!connected || !cb) -					break; -				if (msg_origport(msg) != peer_port) +				if (!cb || !connected)  					break; -				if (msg_orignode(msg) != peer_node) +				if ((msg_origport(msg) != peer_port) || +				    (msg_orignode(msg) != peer_node))  					break;  				tipc_disconnect(dref);  				skb_pull(buf, msg_hdr_sz(msg)); @@ -924,7 +911,7 @@ err:  				tipc_msg_err_event cb = up_ptr->err_cb;  				tipc_port_unlock(p_ptr); -				if (connected || !cb) +				if (!cb || connected)  					break;  				skb_pull(buf, msg_hdr_sz(msg));  				cb(usr_handle, dref, &buf, msg_data(msg), @@ -937,7 +924,7 @@ err:  					up_ptr->named_err_cb;  				tipc_port_unlock(p_ptr); -				if (connected || !cb) +				if (!cb || connected)  					break;  				dseq.type =  msg_nametype(msg);  				dseq.lower = msg_nameinst(msg); @@ -976,7 +963,7 @@ static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)  		tipc_k_signal((Handler)port_dispatcher_sigh, 0);  	}  	spin_unlock_bh(&queue_lock); -	return TIPC_OK; +	return 0;  }  /* @@ -1053,15 +1040,14 @@ int tipc_createport(u32 user_ref,  {  	struct user_port *up_ptr;  	struct port *p_ptr; -	u32 ref;  	up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);  	if (!up_ptr) {  		warn("Port creation failed, no memory\n");  		return -ENOMEM;  	} -	ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance); -	p_ptr = tipc_port_lock(ref); +	p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher, +						   port_wakeup, importance);  	if (!p_ptr) {  		kfree(up_ptr);  		return -ENOMEM; @@ -1081,16 +1067,15 @@ int tipc_createport(u32 user_ref,  	INIT_LIST_HEAD(&up_ptr->uport_list);  	tipc_reg_add_port(up_ptr);  	*portref = p_ptr->publ.ref; -	dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref);  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  }  int tipc_ownidentity(u32 ref, struct tipc_portid *id)  {  	id->ref = ref;  	id->node = tipc_own_addr; -	return TIPC_OK; +	return 0;  }  int tipc_portimportance(u32 ref, unsigned int *importance) @@ -1102,7 +1087,7 @@ int tipc_portimportance(u32 ref, unsigned int *importance)  		return -EINVAL;  	*importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  }  int tipc_set_portimportance(u32 ref, unsigned int imp) @@ -1117,7 +1102,7 @@ int tipc_set_portimportance(u32 ref, unsigned int imp)  		return -EINVAL;  	msg_set_importance(&p_ptr->publ.phdr, (u32)imp);  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  } @@ -1152,7 +1137,7 @@ int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)  		list_add(&publ->pport_list, &p_ptr->publications);  		p_ptr->pub_count++;  		p_ptr->publ.published = 1; -		res = TIPC_OK; +		res = 0;  	}  exit:  	tipc_port_unlock(p_ptr); @@ -1175,7 +1160,7 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)  			tipc_nametbl_withdraw(publ->type, publ->lower,  					      publ->ref, publ->key);  		} -		res = TIPC_OK; +		res = 0;  	} else {  		list_for_each_entry_safe(publ, tpubl,  					 &p_ptr->publications, pport_list) { @@ -1189,7 +1174,7 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)  				break;  			tipc_nametbl_withdraw(publ->type, publ->lower,  					      publ->ref, publ->key); -			res = TIPC_OK; +			res = 0;  			break;  		}  	} @@ -1233,7 +1218,7 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)  	tipc_nodesub_subscribe(&p_ptr->subscription,peer->node,  			  (void *)(unsigned long)ref,  			  (net_ev_handler)port_handle_node_down); -	res = TIPC_OK; +	res = 0;  exit:  	tipc_port_unlock(p_ptr);  	p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); @@ -1255,7 +1240,7 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr)  		/* let timer expire on it's own to avoid deadlock! */  		tipc_nodesub_unsubscribe(  			&((struct port *)tp_ptr)->subscription); -		res = TIPC_OK; +		res = 0;  	} else {  		res = -ENOTCONN;  	} @@ -1320,7 +1305,7 @@ int tipc_isconnected(u32 ref, int *isconnected)  		return -EINVAL;  	*isconnected = p_ptr->publ.connected;  	tipc_port_unlock(p_ptr); -	return TIPC_OK; +	return 0;  }  int tipc_peer(u32 ref, struct tipc_portid *peer) @@ -1334,7 +1319,7 @@ int tipc_peer(u32 ref, struct tipc_portid *peer)  	if (p_ptr->publ.connected) {  		peer->ref = port_peerport(p_ptr);  		peer->node = port_peernode(p_ptr); -		res = TIPC_OK; +		res = 0;  	} else  		res = -ENOTCONN;  	tipc_port_unlock(p_ptr); diff --git a/net/tipc/ref.c b/net/tipc/ref.c index 89cbab24d08..414fc34b8be 100644 --- a/net/tipc/ref.c +++ b/net/tipc/ref.c @@ -123,7 +123,7 @@ int tipc_ref_table_init(u32 requested_size, u32 start)  	tipc_ref_table.index_mask = actual_size - 1;  	tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask; -	return TIPC_OK; +	return 0;  }  /** @@ -142,9 +142,13 @@ void tipc_ref_table_stop(void)  /**   * tipc_ref_acquire - create reference to an object   * - * Return a unique reference value which can be translated back to the pointer - * 'object' at a later time.  Also, pass back a pointer to the lock protecting - * the object, but without locking it. + * Register an object pointer in reference table and lock the object. + * Returns a unique reference value that is used from then on to retrieve the + * object pointer, or to determine that the object has been deregistered. + * + * Note: The object is returned in the locked state so that the caller can + * register a partially initialized object, without running the risk that + * the object will be accessed before initialization is complete.   */  u32 tipc_ref_acquire(void *object, spinlock_t **lock) @@ -178,13 +182,13 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)  		ref = (next_plus_upper & ~index_mask) + index;  		entry->ref = ref;  		entry->object = object; -		spin_unlock_bh(&entry->lock);  		*lock = &entry->lock;  	}  	else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {  		index = tipc_ref_table.init_point++;  		entry = &(tipc_ref_table.entries[index]);  		spin_lock_init(&entry->lock); +		spin_lock_bh(&entry->lock);  		ref = tipc_ref_table.start_mask + index;  		entry->ref = ref;  		entry->object = object; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 230f9ca2ad6..1848693ebb8 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2,7 +2,7 @@   * net/tipc/socket.c: TIPC socket API   *   * Copyright (c) 2001-2007, Ericsson AB - * Copyright (c) 2004-2007, Wind River Systems + * Copyright (c) 2004-2008, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -63,6 +63,7 @@  struct tipc_sock {  	struct sock sk;  	struct tipc_port *p; +	struct tipc_portid peer_name;  };  #define tipc_sk(sk) ((struct tipc_sock *)(sk)) @@ -188,7 +189,7 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol)  	const struct proto_ops *ops;  	socket_state state;  	struct sock *sk; -	u32 portref; +	struct tipc_port *tp_ptr;  	/* Validate arguments */ @@ -224,9 +225,9 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol)  	/* Allocate TIPC port for socket to use */ -	portref = tipc_createport_raw(sk, &dispatch, &wakeupdispatch, -				      TIPC_LOW_IMPORTANCE); -	if (unlikely(portref == 0)) { +	tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch, +				     TIPC_LOW_IMPORTANCE); +	if (unlikely(!tp_ptr)) {  		sk_free(sk);  		return -ENOMEM;  	} @@ -239,12 +240,14 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol)  	sock_init_data(sock, sk);  	sk->sk_rcvtimeo = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);  	sk->sk_backlog_rcv = backlog_rcv; -	tipc_sk(sk)->p = tipc_get_port(portref); +	tipc_sk(sk)->p = tp_ptr; + +	spin_unlock_bh(tp_ptr->lock);  	if (sock->state == SS_READY) { -		tipc_set_portunreturnable(portref, 1); +		tipc_set_portunreturnable(tp_ptr->ref, 1);  		if (sock->type == SOCK_DGRAM) -			tipc_set_portunreliable(portref, 1); +			tipc_set_portunreliable(tp_ptr->ref, 1);  	}  	atomic_inc(&tipc_user_count); @@ -375,27 +378,29 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)   * @sock: socket structure   * @uaddr: area for returned socket address   * @uaddr_len: area for returned length of socket address - * @peer: 0 to obtain socket name, 1 to obtain peer socket name + * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID   *   * Returns 0 on success, errno otherwise   * - * NOTE: This routine doesn't need to take the socket lock since it doesn't - *       access any non-constant socket information. + * NOTE: This routine doesn't need to take the socket lock since it only + *       accesses socket information that is unchanging (or which changes in + * 	 a completely predictable manner).   */  static int get_name(struct socket *sock, struct sockaddr *uaddr,  		    int *uaddr_len, int peer)  {  	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; -	u32 portref = tipc_sk_port(sock->sk)->ref; -	u32 res; +	struct tipc_sock *tsock = tipc_sk(sock->sk);  	if (peer) { -		res = tipc_peer(portref, &addr->addr.id); -		if (res) -			return res; +		if ((sock->state != SS_CONNECTED) && +			((peer != 2) || (sock->state != SS_DISCONNECTING))) +			return -ENOTCONN; +		addr->addr.id.ref = tsock->peer_name.ref; +		addr->addr.id.node = tsock->peer_name.node;  	} else { -		tipc_ownidentity(portref, &addr->addr.id); +		tipc_ownidentity(tsock->p->ref, &addr->addr.id);  	}  	*uaddr_len = sizeof(*addr); @@ -764,18 +769,17 @@ exit:  static int auto_connect(struct socket *sock, struct tipc_msg *msg)  { -	struct tipc_port *tport = tipc_sk_port(sock->sk); -	struct tipc_portid peer; +	struct tipc_sock *tsock = tipc_sk(sock->sk);  	if (msg_errcode(msg)) {  		sock->state = SS_DISCONNECTING;  		return -ECONNREFUSED;  	} -	peer.ref = msg_origport(msg); -	peer.node = msg_orignode(msg); -	tipc_connect2port(tport->ref, &peer); -	tipc_set_portimportance(tport->ref, msg_importance(msg)); +	tsock->peer_name.ref = msg_origport(msg); +	tsock->peer_name.node = msg_orignode(msg); +	tipc_connect2port(tsock->p->ref, &tsock->peer_name); +	tipc_set_portimportance(tsock->p->ref, msg_importance(msg));  	sock->state = SS_CONNECTED;  	return 0;  } @@ -1131,7 +1135,7 @@ restart:  	/* Loop around if more data is required */  	if ((sz_copied < buf_len)    /* didn't get all requested data */ -	    && (!skb_queue_empty(&sock->sk->sk_receive_queue) || +	    && (!skb_queue_empty(&sk->sk_receive_queue) ||  		(flags & MSG_WAITALL))  				     /* ... and more is ready or required */  	    && (!(flags & MSG_PEEK)) /* ... and aren't just peeking at data */ @@ -1527,9 +1531,9 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  	res = tipc_create(sock_net(sock->sk), new_sock, 0);  	if (!res) {  		struct sock *new_sk = new_sock->sk; -		struct tipc_port *new_tport = tipc_sk_port(new_sk); +		struct tipc_sock *new_tsock = tipc_sk(new_sk); +		struct tipc_port *new_tport = new_tsock->p;  		u32 new_ref = new_tport->ref; -		struct tipc_portid id;  		struct tipc_msg *msg = buf_msg(buf);  		lock_sock(new_sk); @@ -1543,9 +1547,9 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)  		/* Connect new socket to it's peer */ -		id.ref = msg_origport(msg); -		id.node = msg_orignode(msg); -		tipc_connect2port(new_ref, &id); +		new_tsock->peer_name.ref = msg_origport(msg); +		new_tsock->peer_name.node = msg_orignode(msg); +		tipc_connect2port(new_ref, &new_tsock->peer_name);  		new_sock->state = SS_CONNECTED;  		tipc_set_portimportance(new_ref, msg_importance(msg)); diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 8c01ccd3626..0326d3060bc 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -1,8 +1,8 @@  /* - * net/tipc/subscr.c: TIPC subscription service + * net/tipc/subscr.c: TIPC network topology service   *   * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005, Wind River Systems + * Copyright (c) 2005-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -36,27 +36,24 @@  #include "core.h"  #include "dbg.h" -#include "subscr.h"  #include "name_table.h" +#include "port.h"  #include "ref.h" +#include "subscr.h"  /**   * struct subscriber - TIPC network topology subscriber - * @ref: object reference to subscriber object itself - * @lock: pointer to spinlock controlling access to subscriber object + * @port_ref: object reference to server port connecting to subscriber + * @lock: pointer to spinlock controlling access to subscriber's server port   * @subscriber_list: adjacent subscribers in top. server's list of subscribers   * @subscription_list: list of subscription objects for this subscriber - * @port_ref: object reference to port used to communicate with subscriber - * @swap: indicates if subscriber uses opposite endianness in its messages   */  struct subscriber { -	u32 ref; +	u32 port_ref;  	spinlock_t *lock;  	struct list_head subscriber_list;  	struct list_head subscription_list; -	u32 port_ref; -	int swap;  };  /** @@ -88,13 +85,14 @@ static struct top_srv topsrv = { 0 };  static u32 htohl(u32 in, int swap)  { -	char *c = (char *)∈ - -	return swap ? ((c[3] << 3) + (c[2] << 2) + (c[1] << 1) + c[0]) : in; +	return swap ? (u32)___constant_swab32(in) : in;  }  /**   * subscr_send_event - send a message containing a tipc_event to the subscriber + * + * Note: Must not hold subscriber's server port lock, since tipc_send() will + *       try to take the lock if the message is rejected and returned!   */  static void subscr_send_event(struct subscription *sub, @@ -109,12 +107,12 @@ static void subscr_send_event(struct subscription *sub,  	msg_sect.iov_base = (void *)&sub->evt;  	msg_sect.iov_len = sizeof(struct tipc_event); -	sub->evt.event = htohl(event, sub->owner->swap); -	sub->evt.found_lower = htohl(found_lower, sub->owner->swap); -	sub->evt.found_upper = htohl(found_upper, sub->owner->swap); -	sub->evt.port.ref = htohl(port_ref, sub->owner->swap); -	sub->evt.port.node = htohl(node, sub->owner->swap); -	tipc_send(sub->owner->port_ref, 1, &msg_sect); +	sub->evt.event = htohl(event, sub->swap); +	sub->evt.found_lower = htohl(found_lower, sub->swap); +	sub->evt.found_upper = htohl(found_upper, sub->swap); +	sub->evt.port.ref = htohl(port_ref, sub->swap); +	sub->evt.port.node = htohl(node, sub->swap); +	tipc_send(sub->server_ref, 1, &msg_sect);  }  /** @@ -151,13 +149,12 @@ void tipc_subscr_report_overlap(struct subscription *sub,  				u32 node,  				int must)  { -	dbg("Rep overlap %u:%u,%u<->%u,%u\n", sub->seq.type, sub->seq.lower, -	    sub->seq.upper, found_lower, found_upper);  	if (!tipc_subscr_overlap(sub, found_lower, found_upper))  		return;  	if (!must && !(sub->filter & TIPC_SUB_PORTS))  		return; -	subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); + +	sub->event_cb(sub, found_lower, found_upper, event, port_ref, node);  }  /** @@ -166,20 +163,18 @@ void tipc_subscr_report_overlap(struct subscription *sub,  static void subscr_timeout(struct subscription *sub)  { -	struct subscriber *subscriber; -	u32 subscriber_ref; +	struct port *server_port; -	/* Validate subscriber reference (in case subscriber is terminating) */ +	/* Validate server port reference (in case subscriber is terminating) */ -	subscriber_ref = sub->owner->ref; -	subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref); -	if (subscriber == NULL) +	server_port = tipc_port_lock(sub->server_ref); +	if (server_port == NULL)  		return;  	/* Validate timeout (in case subscription is being cancelled) */  	if (sub->timeout == TIPC_WAIT_FOREVER) { -		tipc_ref_unlock(subscriber_ref); +		tipc_port_unlock(server_port);  		return;  	} @@ -187,19 +182,21 @@ static void subscr_timeout(struct subscription *sub)  	tipc_nametbl_unsubscribe(sub); -	/* Notify subscriber of timeout, then unlink subscription */ +	/* Unlink subscription from subscriber */ -	subscr_send_event(sub, -			  sub->evt.s.seq.lower, -			  sub->evt.s.seq.upper, -			  TIPC_SUBSCR_TIMEOUT, -			  0, -			  0);  	list_del(&sub->subscription_list); +	/* Release subscriber's server port */ + +	tipc_port_unlock(server_port); + +	/* Notify subscriber of timeout */ + +	subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, +			  TIPC_SUBSCR_TIMEOUT, 0, 0); +  	/* Now destroy subscription */ -	tipc_ref_unlock(subscriber_ref);  	k_term_timer(&sub->timer);  	kfree(sub);  	atomic_dec(&topsrv.subscription_count); @@ -208,7 +205,7 @@ static void subscr_timeout(struct subscription *sub)  /**   * subscr_del - delete a subscription within a subscription list   * - * Called with subscriber locked. + * Called with subscriber port locked.   */  static void subscr_del(struct subscription *sub) @@ -222,7 +219,7 @@ static void subscr_del(struct subscription *sub)  /**   * subscr_terminate - terminate communication with a subscriber   * - * Called with subscriber locked.  Routine must temporarily release this lock + * Called with subscriber port locked.  Routine must temporarily release lock   * to enable subscription timeout routine(s) to finish without deadlocking;   * the lock is then reclaimed to allow caller to release it upon return.   * (This should work even in the unlikely event some other thread creates @@ -232,14 +229,21 @@ static void subscr_del(struct subscription *sub)  static void subscr_terminate(struct subscriber *subscriber)  { +	u32 port_ref;  	struct subscription *sub;  	struct subscription *sub_temp;  	/* Invalidate subscriber reference */ -	tipc_ref_discard(subscriber->ref); +	port_ref = subscriber->port_ref; +	subscriber->port_ref = 0;  	spin_unlock_bh(subscriber->lock); +	/* Sever connection to subscriber */ + +	tipc_shutdown(port_ref); +	tipc_deleteport(port_ref); +  	/* Destroy any existing subscriptions for subscriber */  	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, @@ -253,27 +257,25 @@ static void subscr_terminate(struct subscriber *subscriber)  		subscr_del(sub);  	} -	/* Sever connection to subscriber */ - -	tipc_shutdown(subscriber->port_ref); -	tipc_deleteport(subscriber->port_ref); -  	/* Remove subscriber from topology server's subscriber list */  	spin_lock_bh(&topsrv.lock);  	list_del(&subscriber->subscriber_list);  	spin_unlock_bh(&topsrv.lock); -	/* Now destroy subscriber */ +	/* Reclaim subscriber lock */  	spin_lock_bh(subscriber->lock); + +	/* Now destroy subscriber */ +  	kfree(subscriber);  }  /**   * subscr_cancel - handle subscription cancellation request   * - * Called with subscriber locked.  Routine must temporarily release this lock + * Called with subscriber port locked.  Routine must temporarily release lock   * to enable the subscription timeout routine to finish without deadlocking;   * the lock is then reclaimed to allow caller to release it upon return.   * @@ -316,27 +318,25 @@ static void subscr_cancel(struct tipc_subscr *s,  /**   * subscr_subscribe - create subscription for subscriber   * - * Called with subscriber locked + * Called with subscriber port locked.   */ -static void subscr_subscribe(struct tipc_subscr *s, -			     struct subscriber *subscriber) +static struct subscription *subscr_subscribe(struct tipc_subscr *s, +					     struct subscriber *subscriber)  {  	struct subscription *sub; +	int swap; -	/* Determine/update subscriber's endianness */ +	/* Determine subscriber's endianness */ -	if (s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE)) -		subscriber->swap = 0; -	else -		subscriber->swap = 1; +	swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));  	/* Detect & process a subscription cancellation request */ -	if (s->filter & htohl(TIPC_SUB_CANCEL, subscriber->swap)) { -		s->filter &= ~htohl(TIPC_SUB_CANCEL, subscriber->swap); +	if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { +		s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);  		subscr_cancel(s, subscriber); -		return; +		return NULL;  	}  	/* Refuse subscription if global limit exceeded */ @@ -345,63 +345,66 @@ static void subscr_subscribe(struct tipc_subscr *s,  		warn("Subscription rejected, subscription limit reached (%u)\n",  		     tipc_max_subscriptions);  		subscr_terminate(subscriber); -		return; +		return NULL;  	}  	/* Allocate subscription object */ -	sub = kzalloc(sizeof(*sub), GFP_ATOMIC); +	sub = kmalloc(sizeof(*sub), GFP_ATOMIC);  	if (!sub) {  		warn("Subscription rejected, no memory\n");  		subscr_terminate(subscriber); -		return; +		return NULL;  	}  	/* Initialize subscription object */ -	sub->seq.type = htohl(s->seq.type, subscriber->swap); -	sub->seq.lower = htohl(s->seq.lower, subscriber->swap); -	sub->seq.upper = htohl(s->seq.upper, subscriber->swap); -	sub->timeout = htohl(s->timeout, subscriber->swap); -	sub->filter = htohl(s->filter, subscriber->swap); +	sub->seq.type = htohl(s->seq.type, swap); +	sub->seq.lower = htohl(s->seq.lower, swap); +	sub->seq.upper = htohl(s->seq.upper, swap); +	sub->timeout = htohl(s->timeout, swap); +	sub->filter = htohl(s->filter, swap);  	if ((!(sub->filter & TIPC_SUB_PORTS)  	     == !(sub->filter & TIPC_SUB_SERVICE))  	    || (sub->seq.lower > sub->seq.upper)) {  		warn("Subscription rejected, illegal request\n");  		kfree(sub);  		subscr_terminate(subscriber); -		return; +		return NULL;  	} -	memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); -	INIT_LIST_HEAD(&sub->subscription_list); +	sub->event_cb = subscr_send_event;  	INIT_LIST_HEAD(&sub->nameseq_list);  	list_add(&sub->subscription_list, &subscriber->subscription_list); +	sub->server_ref = subscriber->port_ref; +	sub->swap = swap; +	memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));  	atomic_inc(&topsrv.subscription_count);  	if (sub->timeout != TIPC_WAIT_FOREVER) {  		k_init_timer(&sub->timer,  			     (Handler)subscr_timeout, (unsigned long)sub);  		k_start_timer(&sub->timer, sub->timeout);  	} -	sub->owner = subscriber; -	tipc_nametbl_subscribe(sub); + +	return sub;  }  /**   * subscr_conn_shutdown_event - handle termination request from subscriber + * + * Called with subscriber's server port unlocked.   */  static void subscr_conn_shutdown_event(void *usr_handle, -				       u32 portref, +				       u32 port_ref,  				       struct sk_buff **buf,  				       unsigned char const *data,  				       unsigned int size,  				       int reason)  { -	struct subscriber *subscriber; +	struct subscriber *subscriber = usr_handle;  	spinlock_t *subscriber_lock; -	subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); -	if (subscriber == NULL) +	if (tipc_port_lock(port_ref) == NULL)  		return;  	subscriber_lock = subscriber->lock; @@ -411,6 +414,8 @@ static void subscr_conn_shutdown_event(void *usr_handle,  /**   * subscr_conn_msg_event - handle new subscription request from subscriber + * + * Called with subscriber's server port unlocked.   */  static void subscr_conn_msg_event(void *usr_handle, @@ -419,20 +424,46 @@ static void subscr_conn_msg_event(void *usr_handle,  				  const unchar *data,  				  u32 size)  { -	struct subscriber *subscriber; +	struct subscriber *subscriber = usr_handle;  	spinlock_t *subscriber_lock; +	struct subscription *sub; + +	/* +	 * Lock subscriber's server port (& make a local copy of lock pointer, +	 * in case subscriber is deleted while processing subscription request) +	 */ -	subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); -	if (subscriber == NULL) +	if (tipc_port_lock(port_ref) == NULL)  		return;  	subscriber_lock = subscriber->lock; -	if (size != sizeof(struct tipc_subscr)) -		subscr_terminate(subscriber); -	else -		subscr_subscribe((struct tipc_subscr *)data, subscriber); -	spin_unlock_bh(subscriber_lock); +	if (size != sizeof(struct tipc_subscr)) { +		subscr_terminate(subscriber); +		spin_unlock_bh(subscriber_lock); +	} else { +		sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); +		spin_unlock_bh(subscriber_lock); +		if (sub != NULL) { + +			/* +			 * We must release the server port lock before adding a +			 * subscription to the name table since TIPC needs to be +			 * able to (re)acquire the port lock if an event message +			 * issued by the subscription process is rejected and +			 * returned.  The subscription cannot be deleted while +			 * it is being added to the name table because: +			 * a) the single-threading of the native API port code +			 *    ensures the subscription cannot be cancelled and +			 *    the subscriber connection cannot be broken, and +			 * b) the name table lock ensures the subscription +			 *    timeout code cannot delete the subscription, +			 * so the subscription object is still protected. +			 */ + +			tipc_nametbl_subscribe(sub); +		} +	}  }  /** @@ -448,16 +479,10 @@ static void subscr_named_msg_event(void *usr_handle,  				   struct tipc_portid const *orig,  				   struct tipc_name_seq const *dest)  { -	struct subscriber *subscriber; -	struct iovec msg_sect = {NULL, 0}; -	spinlock_t *subscriber_lock; +	static struct iovec msg_sect = {NULL, 0}; -	dbg("subscr_named_msg_event: orig = %x own = %x,\n", -	    orig->node, tipc_own_addr); -	if (size && (size != sizeof(struct tipc_subscr))) { -		warn("Subscriber rejected, invalid subscription size\n"); -		return; -	} +	struct subscriber *subscriber; +	u32 server_port_ref;  	/* Create subscriber object */ @@ -468,17 +493,11 @@ static void subscr_named_msg_event(void *usr_handle,  	}  	INIT_LIST_HEAD(&subscriber->subscription_list);  	INIT_LIST_HEAD(&subscriber->subscriber_list); -	subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock); -	if (subscriber->ref == 0) { -		warn("Subscriber rejected, reference table exhausted\n"); -		kfree(subscriber); -		return; -	} -	/* Establish a connection to subscriber */ +	/* Create server port & establish connection to subscriber */  	tipc_createport(topsrv.user_ref, -			(void *)(unsigned long)subscriber->ref, +			subscriber,  			importance,  			NULL,  			NULL, @@ -490,32 +509,36 @@ static void subscr_named_msg_event(void *usr_handle,  			&subscriber->port_ref);  	if (subscriber->port_ref == 0) {  		warn("Subscriber rejected, unable to create port\n"); -		tipc_ref_discard(subscriber->ref);  		kfree(subscriber);  		return;  	}  	tipc_connect2port(subscriber->port_ref, orig); +	/* Lock server port (& save lock address for future use) */ + +	subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock;  	/* Add subscriber to topology server's subscriber list */ -	tipc_ref_lock(subscriber->ref);  	spin_lock_bh(&topsrv.lock);  	list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);  	spin_unlock_bh(&topsrv.lock); -	/* -	 * Subscribe now if message contains a subscription, -	 * otherwise send an empty response to complete connection handshaking -	 */ +	/* Unlock server port */ -	subscriber_lock = subscriber->lock; -	if (size) -		subscr_subscribe((struct tipc_subscr *)data, subscriber); -	else -		tipc_send(subscriber->port_ref, 1, &msg_sect); +	server_port_ref = subscriber->port_ref; +	spin_unlock_bh(subscriber->lock); -	spin_unlock_bh(subscriber_lock); +	/* Send an ACK- to complete connection handshaking */ + +	tipc_send(server_port_ref, 1, &msg_sect); + +	/* Handle optional subscription request */ + +	if (size != 0) { +		subscr_conn_msg_event(subscriber, server_port_ref, +				      buf, data, size); +	}  }  int tipc_subscr_start(void) @@ -574,8 +597,8 @@ void tipc_subscr_stop(void)  		list_for_each_entry_safe(subscriber, subscriber_temp,  					 &topsrv.subscriber_list,  					 subscriber_list) { -			tipc_ref_lock(subscriber->ref);  			subscriber_lock = subscriber->lock; +			spin_lock_bh(subscriber_lock);  			subscr_terminate(subscriber);  			spin_unlock_bh(subscriber_lock);  		} diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index 93a8e674fac..45d89bf4d20 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -1,8 +1,8 @@  /* - * net/tipc/subscr.h: Include file for TIPC subscription service + * net/tipc/subscr.h: Include file for TIPC network topology service   *   * Copyright (c) 2003-2006, Ericsson AB - * Copyright (c) 2005, Wind River Systems + * Copyright (c) 2005-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -37,34 +37,44 @@  #ifndef _TIPC_SUBSCR_H  #define _TIPC_SUBSCR_H +struct subscription; + +typedef void (*tipc_subscr_event) (struct subscription *sub, +				   u32 found_lower, u32 found_upper, +				   u32 event, u32 port_ref, u32 node); +  /**   * struct subscription - TIPC network topology subscription object   * @seq: name sequence associated with subscription   * @timeout: duration of subscription (in ms)   * @filter: event filtering to be done for subscription - * @evt: template for events generated by subscription - * @subscription_list: adjacent subscriptions in subscriber's subscription list + * @event_cb: routine invoked when a subscription event is detected + * @timer: timer governing subscription duration (optional)   * @nameseq_list: adjacent subscriptions in name sequence's subscription list - * @timer_ref: reference to timer governing subscription duration (may be NULL) - * @owner: pointer to subscriber object associated with this subscription + * @subscription_list: adjacent subscriptions in subscriber's subscription list + * @server_ref: object reference of server port associated with subscription + * @swap: indicates if subscriber uses opposite endianness in its messages + * @evt: template for events generated by subscription   */  struct subscription {  	struct tipc_name_seq seq;  	u32 timeout;  	u32 filter; -	struct tipc_event evt; -	struct list_head subscription_list; -	struct list_head nameseq_list; +	tipc_subscr_event event_cb;  	struct timer_list timer; -	struct subscriber *owner; +	struct list_head nameseq_list; +	struct list_head subscription_list; +	u32 server_ref; +	int swap; +	struct tipc_event evt;  }; -int tipc_subscr_overlap(struct subscription * sub, +int tipc_subscr_overlap(struct subscription *sub,  			u32 found_lower,  			u32 found_upper); -void tipc_subscr_report_overlap(struct subscription * sub, +void tipc_subscr_report_overlap(struct subscription *sub,  				u32 found_lower,  				u32 found_upper,  				u32 event, diff --git a/net/tipc/user_reg.c b/net/tipc/user_reg.c index 4146c40cd20..50692880316 100644 --- a/net/tipc/user_reg.c +++ b/net/tipc/user_reg.c @@ -91,7 +91,7 @@ static int reg_init(void)  		}  	}  	spin_unlock_bh(®_lock); -	return users ? TIPC_OK : -ENOMEM; +	return users ? 0 : -ENOMEM;  }  /** @@ -129,7 +129,7 @@ int tipc_reg_start(void)  			tipc_k_signal((Handler)reg_callback,  				      (unsigned long)&users[u]);  	} -	return TIPC_OK; +	return 0;  }  /** @@ -184,7 +184,7 @@ int tipc_attach(u32 *userid, tipc_mode_event cb, void *usr_handle)  	if (cb && (tipc_mode != TIPC_NOT_RUNNING))  		tipc_k_signal((Handler)reg_callback, (unsigned long)user_ptr); -	return TIPC_OK; +	return 0;  }  /** @@ -230,7 +230,7 @@ int tipc_reg_add_port(struct user_port *up_ptr)  	struct tipc_user *user_ptr;  	if (up_ptr->user_ref == 0) -		return TIPC_OK; +		return 0;  	if (up_ptr->user_ref > MAX_USERID)  		return -EINVAL;  	if ((tipc_mode == TIPC_NOT_RUNNING) || !users ) @@ -240,7 +240,7 @@ int tipc_reg_add_port(struct user_port *up_ptr)  	user_ptr = &users[up_ptr->user_ref];  	list_add(&up_ptr->uport_list, &user_ptr->ports);  	spin_unlock_bh(®_lock); -	return TIPC_OK; +	return 0;  }  /** @@ -250,7 +250,7 @@ int tipc_reg_add_port(struct user_port *up_ptr)  int tipc_reg_remove_port(struct user_port *up_ptr)  {  	if (up_ptr->user_ref == 0) -		return TIPC_OK; +		return 0;  	if (up_ptr->user_ref > MAX_USERID)  		return -EINVAL;  	if (!users ) @@ -259,6 +259,6 @@ int tipc_reg_remove_port(struct user_port *up_ptr)  	spin_lock_bh(®_lock);  	list_del_init(&up_ptr->uport_list);  	spin_unlock_bh(®_lock); -	return TIPC_OK; +	return 0;  } | 
