diff options
Diffstat (limited to 'net/tipc/subscr.c')
| -rw-r--r-- | net/tipc/subscr.c | 249 | 
1 files changed, 136 insertions, 113 deletions
| diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 8c01ccd3626..0326d3060bc 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -1,8 +1,8 @@  /* - * net/tipc/subscr.c: TIPC subscription service + * net/tipc/subscr.c: TIPC network topology service   *   * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005, Wind River Systems + * Copyright (c) 2005-2007, Wind River Systems   * All rights reserved.   *   * Redistribution and use in source and binary forms, with or without @@ -36,27 +36,24 @@  #include "core.h"  #include "dbg.h" -#include "subscr.h"  #include "name_table.h" +#include "port.h"  #include "ref.h" +#include "subscr.h"  /**   * struct subscriber - TIPC network topology subscriber - * @ref: object reference to subscriber object itself - * @lock: pointer to spinlock controlling access to subscriber object + * @port_ref: object reference to server port connecting to subscriber + * @lock: pointer to spinlock controlling access to subscriber's server port   * @subscriber_list: adjacent subscribers in top. server's list of subscribers   * @subscription_list: list of subscription objects for this subscriber - * @port_ref: object reference to port used to communicate with subscriber - * @swap: indicates if subscriber uses opposite endianness in its messages   */  struct subscriber { -	u32 ref; +	u32 port_ref;  	spinlock_t *lock;  	struct list_head subscriber_list;  	struct list_head subscription_list; -	u32 port_ref; -	int swap;  };  /** @@ -88,13 +85,14 @@ static struct top_srv topsrv = { 0 };  static u32 htohl(u32 in, int swap)  { -	char *c = (char *)∈ - -	return swap ? ((c[3] << 3) + (c[2] << 2) + (c[1] << 1) + c[0]) : in; +	return swap ? (u32)___constant_swab32(in) : in;  }  /**   * subscr_send_event - send a message containing a tipc_event to the subscriber + * + * Note: Must not hold subscriber's server port lock, since tipc_send() will + *       try to take the lock if the message is rejected and returned!   */  static void subscr_send_event(struct subscription *sub, @@ -109,12 +107,12 @@ static void subscr_send_event(struct subscription *sub,  	msg_sect.iov_base = (void *)&sub->evt;  	msg_sect.iov_len = sizeof(struct tipc_event); -	sub->evt.event = htohl(event, sub->owner->swap); -	sub->evt.found_lower = htohl(found_lower, sub->owner->swap); -	sub->evt.found_upper = htohl(found_upper, sub->owner->swap); -	sub->evt.port.ref = htohl(port_ref, sub->owner->swap); -	sub->evt.port.node = htohl(node, sub->owner->swap); -	tipc_send(sub->owner->port_ref, 1, &msg_sect); +	sub->evt.event = htohl(event, sub->swap); +	sub->evt.found_lower = htohl(found_lower, sub->swap); +	sub->evt.found_upper = htohl(found_upper, sub->swap); +	sub->evt.port.ref = htohl(port_ref, sub->swap); +	sub->evt.port.node = htohl(node, sub->swap); +	tipc_send(sub->server_ref, 1, &msg_sect);  }  /** @@ -151,13 +149,12 @@ void tipc_subscr_report_overlap(struct subscription *sub,  				u32 node,  				int must)  { -	dbg("Rep overlap %u:%u,%u<->%u,%u\n", sub->seq.type, sub->seq.lower, -	    sub->seq.upper, found_lower, found_upper);  	if (!tipc_subscr_overlap(sub, found_lower, found_upper))  		return;  	if (!must && !(sub->filter & TIPC_SUB_PORTS))  		return; -	subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); + +	sub->event_cb(sub, found_lower, found_upper, event, port_ref, node);  }  /** @@ -166,20 +163,18 @@ void tipc_subscr_report_overlap(struct subscription *sub,  static void subscr_timeout(struct subscription *sub)  { -	struct subscriber *subscriber; -	u32 subscriber_ref; +	struct port *server_port; -	/* Validate subscriber reference (in case subscriber is terminating) */ +	/* Validate server port reference (in case subscriber is terminating) */ -	subscriber_ref = sub->owner->ref; -	subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref); -	if (subscriber == NULL) +	server_port = tipc_port_lock(sub->server_ref); +	if (server_port == NULL)  		return;  	/* Validate timeout (in case subscription is being cancelled) */  	if (sub->timeout == TIPC_WAIT_FOREVER) { -		tipc_ref_unlock(subscriber_ref); +		tipc_port_unlock(server_port);  		return;  	} @@ -187,19 +182,21 @@ static void subscr_timeout(struct subscription *sub)  	tipc_nametbl_unsubscribe(sub); -	/* Notify subscriber of timeout, then unlink subscription */ +	/* Unlink subscription from subscriber */ -	subscr_send_event(sub, -			  sub->evt.s.seq.lower, -			  sub->evt.s.seq.upper, -			  TIPC_SUBSCR_TIMEOUT, -			  0, -			  0);  	list_del(&sub->subscription_list); +	/* Release subscriber's server port */ + +	tipc_port_unlock(server_port); + +	/* Notify subscriber of timeout */ + +	subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, +			  TIPC_SUBSCR_TIMEOUT, 0, 0); +  	/* Now destroy subscription */ -	tipc_ref_unlock(subscriber_ref);  	k_term_timer(&sub->timer);  	kfree(sub);  	atomic_dec(&topsrv.subscription_count); @@ -208,7 +205,7 @@ static void subscr_timeout(struct subscription *sub)  /**   * subscr_del - delete a subscription within a subscription list   * - * Called with subscriber locked. + * Called with subscriber port locked.   */  static void subscr_del(struct subscription *sub) @@ -222,7 +219,7 @@ static void subscr_del(struct subscription *sub)  /**   * subscr_terminate - terminate communication with a subscriber   * - * Called with subscriber locked.  Routine must temporarily release this lock + * Called with subscriber port locked.  Routine must temporarily release lock   * to enable subscription timeout routine(s) to finish without deadlocking;   * the lock is then reclaimed to allow caller to release it upon return.   * (This should work even in the unlikely event some other thread creates @@ -232,14 +229,21 @@ static void subscr_del(struct subscription *sub)  static void subscr_terminate(struct subscriber *subscriber)  { +	u32 port_ref;  	struct subscription *sub;  	struct subscription *sub_temp;  	/* Invalidate subscriber reference */ -	tipc_ref_discard(subscriber->ref); +	port_ref = subscriber->port_ref; +	subscriber->port_ref = 0;  	spin_unlock_bh(subscriber->lock); +	/* Sever connection to subscriber */ + +	tipc_shutdown(port_ref); +	tipc_deleteport(port_ref); +  	/* Destroy any existing subscriptions for subscriber */  	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, @@ -253,27 +257,25 @@ static void subscr_terminate(struct subscriber *subscriber)  		subscr_del(sub);  	} -	/* Sever connection to subscriber */ - -	tipc_shutdown(subscriber->port_ref); -	tipc_deleteport(subscriber->port_ref); -  	/* Remove subscriber from topology server's subscriber list */  	spin_lock_bh(&topsrv.lock);  	list_del(&subscriber->subscriber_list);  	spin_unlock_bh(&topsrv.lock); -	/* Now destroy subscriber */ +	/* Reclaim subscriber lock */  	spin_lock_bh(subscriber->lock); + +	/* Now destroy subscriber */ +  	kfree(subscriber);  }  /**   * subscr_cancel - handle subscription cancellation request   * - * Called with subscriber locked.  Routine must temporarily release this lock + * Called with subscriber port locked.  Routine must temporarily release lock   * to enable the subscription timeout routine to finish without deadlocking;   * the lock is then reclaimed to allow caller to release it upon return.   * @@ -316,27 +318,25 @@ static void subscr_cancel(struct tipc_subscr *s,  /**   * subscr_subscribe - create subscription for subscriber   * - * Called with subscriber locked + * Called with subscriber port locked.   */ -static void subscr_subscribe(struct tipc_subscr *s, -			     struct subscriber *subscriber) +static struct subscription *subscr_subscribe(struct tipc_subscr *s, +					     struct subscriber *subscriber)  {  	struct subscription *sub; +	int swap; -	/* Determine/update subscriber's endianness */ +	/* Determine subscriber's endianness */ -	if (s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE)) -		subscriber->swap = 0; -	else -		subscriber->swap = 1; +	swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));  	/* Detect & process a subscription cancellation request */ -	if (s->filter & htohl(TIPC_SUB_CANCEL, subscriber->swap)) { -		s->filter &= ~htohl(TIPC_SUB_CANCEL, subscriber->swap); +	if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { +		s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);  		subscr_cancel(s, subscriber); -		return; +		return NULL;  	}  	/* Refuse subscription if global limit exceeded */ @@ -345,63 +345,66 @@ static void subscr_subscribe(struct tipc_subscr *s,  		warn("Subscription rejected, subscription limit reached (%u)\n",  		     tipc_max_subscriptions);  		subscr_terminate(subscriber); -		return; +		return NULL;  	}  	/* Allocate subscription object */ -	sub = kzalloc(sizeof(*sub), GFP_ATOMIC); +	sub = kmalloc(sizeof(*sub), GFP_ATOMIC);  	if (!sub) {  		warn("Subscription rejected, no memory\n");  		subscr_terminate(subscriber); -		return; +		return NULL;  	}  	/* Initialize subscription object */ -	sub->seq.type = htohl(s->seq.type, subscriber->swap); -	sub->seq.lower = htohl(s->seq.lower, subscriber->swap); -	sub->seq.upper = htohl(s->seq.upper, subscriber->swap); -	sub->timeout = htohl(s->timeout, subscriber->swap); -	sub->filter = htohl(s->filter, subscriber->swap); +	sub->seq.type = htohl(s->seq.type, swap); +	sub->seq.lower = htohl(s->seq.lower, swap); +	sub->seq.upper = htohl(s->seq.upper, swap); +	sub->timeout = htohl(s->timeout, swap); +	sub->filter = htohl(s->filter, swap);  	if ((!(sub->filter & TIPC_SUB_PORTS)  	     == !(sub->filter & TIPC_SUB_SERVICE))  	    || (sub->seq.lower > sub->seq.upper)) {  		warn("Subscription rejected, illegal request\n");  		kfree(sub);  		subscr_terminate(subscriber); -		return; +		return NULL;  	} -	memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); -	INIT_LIST_HEAD(&sub->subscription_list); +	sub->event_cb = subscr_send_event;  	INIT_LIST_HEAD(&sub->nameseq_list);  	list_add(&sub->subscription_list, &subscriber->subscription_list); +	sub->server_ref = subscriber->port_ref; +	sub->swap = swap; +	memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));  	atomic_inc(&topsrv.subscription_count);  	if (sub->timeout != TIPC_WAIT_FOREVER) {  		k_init_timer(&sub->timer,  			     (Handler)subscr_timeout, (unsigned long)sub);  		k_start_timer(&sub->timer, sub->timeout);  	} -	sub->owner = subscriber; -	tipc_nametbl_subscribe(sub); + +	return sub;  }  /**   * subscr_conn_shutdown_event - handle termination request from subscriber + * + * Called with subscriber's server port unlocked.   */  static void subscr_conn_shutdown_event(void *usr_handle, -				       u32 portref, +				       u32 port_ref,  				       struct sk_buff **buf,  				       unsigned char const *data,  				       unsigned int size,  				       int reason)  { -	struct subscriber *subscriber; +	struct subscriber *subscriber = usr_handle;  	spinlock_t *subscriber_lock; -	subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); -	if (subscriber == NULL) +	if (tipc_port_lock(port_ref) == NULL)  		return;  	subscriber_lock = subscriber->lock; @@ -411,6 +414,8 @@ static void subscr_conn_shutdown_event(void *usr_handle,  /**   * subscr_conn_msg_event - handle new subscription request from subscriber + * + * Called with subscriber's server port unlocked.   */  static void subscr_conn_msg_event(void *usr_handle, @@ -419,20 +424,46 @@ static void subscr_conn_msg_event(void *usr_handle,  				  const unchar *data,  				  u32 size)  { -	struct subscriber *subscriber; +	struct subscriber *subscriber = usr_handle;  	spinlock_t *subscriber_lock; +	struct subscription *sub; + +	/* +	 * Lock subscriber's server port (& make a local copy of lock pointer, +	 * in case subscriber is deleted while processing subscription request) +	 */ -	subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); -	if (subscriber == NULL) +	if (tipc_port_lock(port_ref) == NULL)  		return;  	subscriber_lock = subscriber->lock; -	if (size != sizeof(struct tipc_subscr)) -		subscr_terminate(subscriber); -	else -		subscr_subscribe((struct tipc_subscr *)data, subscriber); -	spin_unlock_bh(subscriber_lock); +	if (size != sizeof(struct tipc_subscr)) { +		subscr_terminate(subscriber); +		spin_unlock_bh(subscriber_lock); +	} else { +		sub = subscr_subscribe((struct tipc_subscr *)data, subscriber); +		spin_unlock_bh(subscriber_lock); +		if (sub != NULL) { + +			/* +			 * We must release the server port lock before adding a +			 * subscription to the name table since TIPC needs to be +			 * able to (re)acquire the port lock if an event message +			 * issued by the subscription process is rejected and +			 * returned.  The subscription cannot be deleted while +			 * it is being added to the name table because: +			 * a) the single-threading of the native API port code +			 *    ensures the subscription cannot be cancelled and +			 *    the subscriber connection cannot be broken, and +			 * b) the name table lock ensures the subscription +			 *    timeout code cannot delete the subscription, +			 * so the subscription object is still protected. +			 */ + +			tipc_nametbl_subscribe(sub); +		} +	}  }  /** @@ -448,16 +479,10 @@ static void subscr_named_msg_event(void *usr_handle,  				   struct tipc_portid const *orig,  				   struct tipc_name_seq const *dest)  { -	struct subscriber *subscriber; -	struct iovec msg_sect = {NULL, 0}; -	spinlock_t *subscriber_lock; +	static struct iovec msg_sect = {NULL, 0}; -	dbg("subscr_named_msg_event: orig = %x own = %x,\n", -	    orig->node, tipc_own_addr); -	if (size && (size != sizeof(struct tipc_subscr))) { -		warn("Subscriber rejected, invalid subscription size\n"); -		return; -	} +	struct subscriber *subscriber; +	u32 server_port_ref;  	/* Create subscriber object */ @@ -468,17 +493,11 @@ static void subscr_named_msg_event(void *usr_handle,  	}  	INIT_LIST_HEAD(&subscriber->subscription_list);  	INIT_LIST_HEAD(&subscriber->subscriber_list); -	subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock); -	if (subscriber->ref == 0) { -		warn("Subscriber rejected, reference table exhausted\n"); -		kfree(subscriber); -		return; -	} -	/* Establish a connection to subscriber */ +	/* Create server port & establish connection to subscriber */  	tipc_createport(topsrv.user_ref, -			(void *)(unsigned long)subscriber->ref, +			subscriber,  			importance,  			NULL,  			NULL, @@ -490,32 +509,36 @@ static void subscr_named_msg_event(void *usr_handle,  			&subscriber->port_ref);  	if (subscriber->port_ref == 0) {  		warn("Subscriber rejected, unable to create port\n"); -		tipc_ref_discard(subscriber->ref);  		kfree(subscriber);  		return;  	}  	tipc_connect2port(subscriber->port_ref, orig); +	/* Lock server port (& save lock address for future use) */ + +	subscriber->lock = tipc_port_lock(subscriber->port_ref)->publ.lock;  	/* Add subscriber to topology server's subscriber list */ -	tipc_ref_lock(subscriber->ref);  	spin_lock_bh(&topsrv.lock);  	list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);  	spin_unlock_bh(&topsrv.lock); -	/* -	 * Subscribe now if message contains a subscription, -	 * otherwise send an empty response to complete connection handshaking -	 */ +	/* Unlock server port */ -	subscriber_lock = subscriber->lock; -	if (size) -		subscr_subscribe((struct tipc_subscr *)data, subscriber); -	else -		tipc_send(subscriber->port_ref, 1, &msg_sect); +	server_port_ref = subscriber->port_ref; +	spin_unlock_bh(subscriber->lock); -	spin_unlock_bh(subscriber_lock); +	/* Send an ACK- to complete connection handshaking */ + +	tipc_send(server_port_ref, 1, &msg_sect); + +	/* Handle optional subscription request */ + +	if (size != 0) { +		subscr_conn_msg_event(subscriber, server_port_ref, +				      buf, data, size); +	}  }  int tipc_subscr_start(void) @@ -574,8 +597,8 @@ void tipc_subscr_stop(void)  		list_for_each_entry_safe(subscriber, subscriber_temp,  					 &topsrv.subscriber_list,  					 subscriber_list) { -			tipc_ref_lock(subscriber->ref);  			subscriber_lock = subscriber->lock; +			spin_lock_bh(subscriber_lock);  			subscr_terminate(subscriber);  			spin_unlock_bh(subscriber_lock);  		} | 
