diff options
Diffstat (limited to 'net/tipc/subscr.c')
| -rw-r--r-- | net/tipc/subscr.c | 554 |
1 files changed, 201 insertions, 353 deletions
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c index 5ff38b9f319..642437231ad 100644 --- a/net/tipc/subscr.c +++ b/net/tipc/subscr.c @@ -1,8 +1,8 @@ /* - * net/tipc/subscr.c: TIPC subscription service - * + * net/tipc/subscr.c: TIPC network topology service + * * Copyright (c) 2000-2006, Ericsson AB - * Copyright (c) 2005, Wind River Systems + * Copyright (c) 2005-2007, 2010-2013, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,86 +35,77 @@ */ #include "core.h" -#include "dbg.h" -#include "subscr.h" #include "name_table.h" -#include "ref.h" +#include "port.h" +#include "subscr.h" /** - * struct subscriber - TIPC network topology subscriber - * @ref: object reference to subscriber object itself - * @lock: pointer to spinlock controlling access to subscriber object - * @subscriber_list: adjacent subscribers in top. server's list of subscribers + * struct tipc_subscriber - TIPC network topology subscriber + * @conid: connection identifier to server connecting to subscriber + * @lock: control access to subscriber * @subscription_list: list of subscription objects for this subscriber - * @port_ref: object reference to port used to communicate with subscriber - * @swap: indicates if subscriber uses opposite endianness in its messages */ - -struct subscriber { - u32 ref; - spinlock_t *lock; - struct list_head subscriber_list; +struct tipc_subscriber { + int conid; + spinlock_t lock; struct list_head subscription_list; - u32 port_ref; - int swap; }; -/** - * struct top_srv - TIPC network topology subscription service - * @user_ref: TIPC userid of subscription service - * @setup_port: reference to TIPC port that handles subscription requests - * @subscription_count: number of active subscriptions (not subscribers!) - * @subscriber_list: list of ports subscribing to service - * @lock: spinlock govering access to subscriber list - */ +static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len); +static void *subscr_named_msg_event(int conid); +static void subscr_conn_shutdown_event(int conid, void *usr_data); -struct top_srv { - u32 user_ref; - u32 setup_port; - atomic_t subscription_count; - struct list_head subscriber_list; - spinlock_t lock; +static atomic_t subscription_count = ATOMIC_INIT(0); + +static struct sockaddr_tipc topsrv_addr __read_mostly = { + .family = AF_TIPC, + .addrtype = TIPC_ADDR_NAMESEQ, + .addr.nameseq.type = TIPC_TOP_SRV, + .addr.nameseq.lower = TIPC_TOP_SRV, + .addr.nameseq.upper = TIPC_TOP_SRV, + .scope = TIPC_NODE_SCOPE }; -static struct top_srv topsrv = { 0 }; +static struct tipc_server topsrv __read_mostly = { + .saddr = &topsrv_addr, + .imp = TIPC_CRITICAL_IMPORTANCE, + .type = SOCK_SEQPACKET, + .max_rcvbuf_size = sizeof(struct tipc_subscr), + .name = "topology_server", + .tipc_conn_recvmsg = subscr_conn_msg_event, + .tipc_conn_new = subscr_named_msg_event, + .tipc_conn_shutdown = subscr_conn_shutdown_event, +}; /** * htohl - convert value to endianness used by destination * @in: value to convert * @swap: non-zero if endianness must be reversed - * + * * Returns converted value */ - -static inline u32 htohl(u32 in, int swap) +static u32 htohl(u32 in, int swap) { - char *c = (char *)∈ - - return swap ? ((c[3] << 3) + (c[2] << 2) + (c[1] << 1) + c[0]) : in; + return swap ? swab32(in) : in; } -/** - * subscr_send_event - send a message containing a tipc_event to the subscriber - */ - -static void subscr_send_event(struct subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, +static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, u32 node) { - struct iovec msg_sect; + struct tipc_subscriber *subscriber = sub->subscriber; + struct kvec msg_sect; msg_sect.iov_base = (void *)&sub->evt; msg_sect.iov_len = sizeof(struct tipc_event); - - sub->evt.event = htohl(event, sub->owner->swap); - sub->evt.found_lower = htohl(found_lower, sub->owner->swap); - sub->evt.found_upper = htohl(found_upper, sub->owner->swap); - sub->evt.port.ref = htohl(port_ref, sub->owner->swap); - sub->evt.port.node = htohl(node, sub->owner->swap); - tipc_send(sub->owner->port_ref, 1, &msg_sect); + sub->evt.event = htohl(event, sub->swap); + sub->evt.found_lower = htohl(found_lower, sub->swap); + sub->evt.found_upper = htohl(found_upper, sub->swap); + sub->evt.port.ref = htohl(port_ref, sub->swap); + sub->evt.port.node = htohl(node, sub->swap); + tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, msg_sect.iov_base, + msg_sect.iov_len); } /** @@ -122,11 +113,8 @@ static void subscr_send_event(struct subscription *sub, * * Returns 1 if there is overlap, otherwise 0. */ - -int tipc_subscr_overlap(struct subscription *sub, - u32 found_lower, +int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower, u32 found_upper) - { if (found_lower < sub->seq.lower) found_lower = sub->seq.lower; @@ -139,389 +127,249 @@ int tipc_subscr_overlap(struct subscription *sub, /** * tipc_subscr_report_overlap - issue event if there is subscription overlap - * + * * Protected by nameseq.lock in name_table.c */ - -void tipc_subscr_report_overlap(struct subscription *sub, - u32 found_lower, - u32 found_upper, - u32 event, - u32 port_ref, - u32 node, - int must) +void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower, + u32 found_upper, u32 event, u32 port_ref, + u32 node, int must) { - dbg("Rep overlap %u:%u,%u<->%u,%u\n", sub->seq.type, sub->seq.lower, - sub->seq.upper, found_lower, found_upper); if (!tipc_subscr_overlap(sub, found_lower, found_upper)) return; - if (!must && (sub->filter != TIPC_SUB_PORTS)) + if (!must && !(sub->filter & TIPC_SUB_PORTS)) return; + subscr_send_event(sub, found_lower, found_upper, event, port_ref, node); } -/** - * subscr_timeout - subscription timeout has occurred - */ - -static void subscr_timeout(struct subscription *sub) +static void subscr_timeout(struct tipc_subscription *sub) { - struct subscriber *subscriber; - u32 subscriber_ref; + struct tipc_subscriber *subscriber = sub->subscriber; - /* Validate subscriber reference (in case subscriber is terminating) */ + /* The spin lock per subscriber is used to protect its members */ + spin_lock_bh(&subscriber->lock); - subscriber_ref = sub->owner->ref; - subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref); - if (subscriber == NULL) + /* Validate timeout (in case subscription is being cancelled) */ + if (sub->timeout == TIPC_WAIT_FOREVER) { + spin_unlock_bh(&subscriber->lock); return; + } /* Unlink subscription from name table */ - tipc_nametbl_unsubscribe(sub); - /* Notify subscriber of timeout, then unlink subscription */ - - subscr_send_event(sub, - sub->evt.s.seq.lower, - sub->evt.s.seq.upper, - TIPC_SUBSCR_TIMEOUT, - 0, - 0); + /* Unlink subscription from subscriber */ list_del(&sub->subscription_list); - /* Now destroy subscription */ + spin_unlock_bh(&subscriber->lock); + + /* Notify subscriber of timeout */ + subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper, + TIPC_SUBSCR_TIMEOUT, 0, 0); - tipc_ref_unlock(subscriber_ref); + /* Now destroy subscription */ k_term_timer(&sub->timer); kfree(sub); - atomic_dec(&topsrv.subscription_count); + atomic_dec(&subscription_count); } /** - * subscr_terminate - terminate communication with a subscriber - * - * Called with subscriber locked. Routine must temporarily release this lock - * to enable subscription timeout routine(s) to finish without deadlocking; - * the lock is then reclaimed to allow caller to release it upon return. - * (This should work even in the unlikely event some other thread creates - * a new object reference in the interim that uses this lock; this routine will - * simply wait for it to be released, then claim it.) + * subscr_del - delete a subscription within a subscription list + * + * Called with subscriber lock held. */ +static void subscr_del(struct tipc_subscription *sub) +{ + tipc_nametbl_unsubscribe(sub); + list_del(&sub->subscription_list); + kfree(sub); + atomic_dec(&subscription_count); +} -static void subscr_terminate(struct subscriber *subscriber) +/** + * subscr_terminate - terminate communication with a subscriber + * + * Note: Must call it in process context since it might sleep. + */ +static void subscr_terminate(struct tipc_subscriber *subscriber) { - struct subscription *sub; - struct subscription *sub_temp; + tipc_conn_terminate(&topsrv, subscriber->conid); +} - /* Invalidate subscriber reference */ +static void subscr_release(struct tipc_subscriber *subscriber) +{ + struct tipc_subscription *sub; + struct tipc_subscription *sub_temp; - tipc_ref_discard(subscriber->ref); - spin_unlock_bh(subscriber->lock); + spin_lock_bh(&subscriber->lock); /* Destroy any existing subscriptions for subscriber */ - list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, subscription_list) { if (sub->timeout != TIPC_WAIT_FOREVER) { + spin_unlock_bh(&subscriber->lock); k_cancel_timer(&sub->timer); k_term_timer(&sub->timer); + spin_lock_bh(&subscriber->lock); } - tipc_nametbl_unsubscribe(sub); - list_del(&sub->subscription_list); - dbg("Term: Removed sub %u,%u,%u from subscriber %x list\n", - sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber); - kfree(sub); - atomic_dec(&topsrv.subscription_count); + subscr_del(sub); } + spin_unlock_bh(&subscriber->lock); - /* Sever connection to subscriber */ - - tipc_shutdown(subscriber->port_ref); - tipc_deleteport(subscriber->port_ref); - - /* Remove subscriber from topology server's subscriber list */ + /* Now destroy subscriber */ + kfree(subscriber); +} - spin_lock_bh(&topsrv.lock); - list_del(&subscriber->subscriber_list); - spin_unlock_bh(&topsrv.lock); +/** + * subscr_cancel - handle subscription cancellation request + * + * Called with subscriber lock held. Routine must temporarily release lock + * to enable the subscription timeout routine to finish without deadlocking; + * the lock is then reclaimed to allow caller to release it upon return. + * + * Note that fields of 's' use subscriber's endianness! + */ +static void subscr_cancel(struct tipc_subscr *s, + struct tipc_subscriber *subscriber) +{ + struct tipc_subscription *sub; + struct tipc_subscription *sub_temp; + int found = 0; - /* Now destroy subscriber */ + /* Find first matching subscription, exit if not found */ + list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list, + subscription_list) { + if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) { + found = 1; + break; + } + } + if (!found) + return; - spin_lock_bh(subscriber->lock); - kfree(subscriber); + /* Cancel subscription timer (if used), then delete subscription */ + if (sub->timeout != TIPC_WAIT_FOREVER) { + sub->timeout = TIPC_WAIT_FOREVER; + spin_unlock_bh(&subscriber->lock); + k_cancel_timer(&sub->timer); + k_term_timer(&sub->timer); + spin_lock_bh(&subscriber->lock); + } + subscr_del(sub); } /** * subscr_subscribe - create subscription for subscriber - * - * Called with subscriber locked + * + * Called with subscriber lock held. */ +static int subscr_subscribe(struct tipc_subscr *s, + struct tipc_subscriber *subscriber, + struct tipc_subscription **sub_p) { + struct tipc_subscription *sub; + int swap; -static void subscr_subscribe(struct tipc_subscr *s, - struct subscriber *subscriber) -{ - struct subscription *sub; + /* Determine subscriber's endianness */ + swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE)); - /* Refuse subscription if global limit exceeded */ + /* Detect & process a subscription cancellation request */ + if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { + s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); + subscr_cancel(s, subscriber); + return 0; + } - if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) { - warn("Failed: max %u subscriptions\n", tipc_max_subscriptions); - subscr_terminate(subscriber); - return; + /* Refuse subscription if global limit exceeded */ + if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) { + pr_warn("Subscription rejected, limit reached (%u)\n", + TIPC_MAX_SUBSCRIPTIONS); + return -EINVAL; } /* Allocate subscription object */ - sub = kmalloc(sizeof(*sub), GFP_ATOMIC); - if (sub == NULL) { - warn("Memory squeeze; ignoring subscription\n"); - subscr_terminate(subscriber); - return; + if (!sub) { + pr_warn("Subscription rejected, no memory\n"); + return -ENOMEM; } - /* Determine/update subscriber's endianness */ - - if ((s->filter == TIPC_SUB_PORTS) || (s->filter == TIPC_SUB_SERVICE)) - subscriber->swap = 0; - else - subscriber->swap = 1; - /* Initialize subscription object */ - - memset(sub, 0, sizeof(*sub)); - sub->seq.type = htohl(s->seq.type, subscriber->swap); - sub->seq.lower = htohl(s->seq.lower, subscriber->swap); - sub->seq.upper = htohl(s->seq.upper, subscriber->swap); - sub->timeout = htohl(s->timeout, subscriber->swap); - sub->filter = htohl(s->filter, subscriber->swap); - if ((((sub->filter != TIPC_SUB_PORTS) - && (sub->filter != TIPC_SUB_SERVICE))) - || (sub->seq.lower > sub->seq.upper)) { - warn("Rejecting illegal subscription %u,%u,%u\n", - sub->seq.type, sub->seq.lower, sub->seq.upper); + sub->seq.type = htohl(s->seq.type, swap); + sub->seq.lower = htohl(s->seq.lower, swap); + sub->seq.upper = htohl(s->seq.upper, swap); + sub->timeout = htohl(s->timeout, swap); + sub->filter = htohl(s->filter, swap); + if ((!(sub->filter & TIPC_SUB_PORTS) == + !(sub->filter & TIPC_SUB_SERVICE)) || + (sub->seq.lower > sub->seq.upper)) { + pr_warn("Subscription rejected, illegal request\n"); kfree(sub); - subscr_terminate(subscriber); - return; + return -EINVAL; } - memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); - INIT_LIST_HEAD(&sub->subscription_list); INIT_LIST_HEAD(&sub->nameseq_list); list_add(&sub->subscription_list, &subscriber->subscription_list); - atomic_inc(&topsrv.subscription_count); + sub->subscriber = subscriber; + sub->swap = swap; + memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr)); + atomic_inc(&subscription_count); if (sub->timeout != TIPC_WAIT_FOREVER) { k_init_timer(&sub->timer, (Handler)subscr_timeout, (unsigned long)sub); k_start_timer(&sub->timer, sub->timeout); } - sub->owner = subscriber; - tipc_nametbl_subscribe(sub); + *sub_p = sub; + return 0; } -/** - * subscr_conn_shutdown_event - handle termination request from subscriber - */ - -static void subscr_conn_shutdown_event(void *usr_handle, - u32 portref, - struct sk_buff **buf, - unsigned char const *data, - unsigned int size, - int reason) +/* Handle one termination request for the subscriber */ +static void subscr_conn_shutdown_event(int conid, void *usr_data) { - struct subscriber *subscriber; - spinlock_t *subscriber_lock; - - subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); - if (subscriber == NULL) - return; - - subscriber_lock = subscriber->lock; - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); + subscr_release((struct tipc_subscriber *)usr_data); } -/** - * subscr_conn_msg_event - handle new subscription request from subscriber - */ - -static void subscr_conn_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size) +/* Handle one request to create a new subscription for the subscriber */ +static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr, + void *usr_data, void *buf, size_t len) { - struct subscriber *subscriber; - spinlock_t *subscriber_lock; + struct tipc_subscriber *subscriber = usr_data; + struct tipc_subscription *sub = NULL; - subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle); - if (subscriber == NULL) - return; - - subscriber_lock = subscriber->lock; - if (size != sizeof(struct tipc_subscr)) + spin_lock_bh(&subscriber->lock); + if (subscr_subscribe((struct tipc_subscr *)buf, subscriber, &sub) < 0) { + spin_unlock_bh(&subscriber->lock); subscr_terminate(subscriber); - else - subscr_subscribe((struct tipc_subscr *)data, subscriber); - - spin_unlock_bh(subscriber_lock); + return; + } + if (sub) + tipc_nametbl_subscribe(sub); + spin_unlock_bh(&subscriber->lock); } -/** - * subscr_named_msg_event - handle request to establish a new subscriber - */ -static void subscr_named_msg_event(void *usr_handle, - u32 port_ref, - struct sk_buff **buf, - const unchar *data, - u32 size, - u32 importance, - struct tipc_portid const *orig, - struct tipc_name_seq const *dest) +/* Handle one request to establish a new subscriber */ +static void *subscr_named_msg_event(int conid) { - struct subscriber *subscriber; - struct iovec msg_sect = {0, 0}; - spinlock_t *subscriber_lock; - - dbg("subscr_named_msg_event: orig = %x own = %x,\n", - orig->node, tipc_own_addr); - if (size && (size != sizeof(struct tipc_subscr))) { - warn("Received tipc_subscr of invalid size\n"); - return; - } + struct tipc_subscriber *subscriber; /* Create subscriber object */ - - subscriber = kmalloc(sizeof(struct subscriber), GFP_ATOMIC); + subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC); if (subscriber == NULL) { - warn("Memory squeeze; ignoring subscriber setup\n"); - return; + pr_warn("Subscriber rejected, no memory\n"); + return NULL; } - memset(subscriber, 0, sizeof(struct subscriber)); INIT_LIST_HEAD(&subscriber->subscription_list); - INIT_LIST_HEAD(&subscriber->subscriber_list); - subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock); - if (subscriber->ref == 0) { - warn("Failed to acquire subscriber reference\n"); - kfree(subscriber); - return; - } - - /* Establish a connection to subscriber */ - - tipc_createport(topsrv.user_ref, - (void *)(unsigned long)subscriber->ref, - importance, - 0, - 0, - subscr_conn_shutdown_event, - 0, - 0, - subscr_conn_msg_event, - 0, - &subscriber->port_ref); - if (subscriber->port_ref == 0) { - warn("Memory squeeze; failed to create subscription port\n"); - tipc_ref_discard(subscriber->ref); - kfree(subscriber); - return; - } - tipc_connect2port(subscriber->port_ref, orig); - - - /* Add subscriber to topology server's subscriber list */ + subscriber->conid = conid; + spin_lock_init(&subscriber->lock); - tipc_ref_lock(subscriber->ref); - spin_lock_bh(&topsrv.lock); - list_add(&subscriber->subscriber_list, &topsrv.subscriber_list); - spin_unlock_bh(&topsrv.lock); - - /* - * Subscribe now if message contains a subscription, - * otherwise send an empty response to complete connection handshaking - */ - - subscriber_lock = subscriber->lock; - if (size) - subscr_subscribe((struct tipc_subscr *)data, subscriber); - else - tipc_send(subscriber->port_ref, 1, &msg_sect); - - spin_unlock_bh(subscriber_lock); + return (void *)subscriber; } int tipc_subscr_start(void) { - struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV}; - int res = -1; - - memset(&topsrv, 0, sizeof (topsrv)); - topsrv.lock = SPIN_LOCK_UNLOCKED; - INIT_LIST_HEAD(&topsrv.subscriber_list); - - spin_lock_bh(&topsrv.lock); - res = tipc_attach(&topsrv.user_ref, 0, 0); - if (res) { - spin_unlock_bh(&topsrv.lock); - return res; - } - - res = tipc_createport(topsrv.user_ref, - 0, - TIPC_CRITICAL_IMPORTANCE, - 0, - 0, - 0, - 0, - subscr_named_msg_event, - 0, - 0, - &topsrv.setup_port); - if (res) - goto failed; - - res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq); - if (res) - goto failed; - - spin_unlock_bh(&topsrv.lock); - return 0; - -failed: - err("Failed to create subscription service\n"); - tipc_detach(topsrv.user_ref); - topsrv.user_ref = 0; - spin_unlock_bh(&topsrv.lock); - return res; + return tipc_server_start(&topsrv); } void tipc_subscr_stop(void) { - struct subscriber *subscriber; - struct subscriber *subscriber_temp; - spinlock_t *subscriber_lock; - - if (topsrv.user_ref) { - tipc_deleteport(topsrv.setup_port); - list_for_each_entry_safe(subscriber, subscriber_temp, - &topsrv.subscriber_list, - subscriber_list) { - tipc_ref_lock(subscriber->ref); - subscriber_lock = subscriber->lock; - subscr_terminate(subscriber); - spin_unlock_bh(subscriber_lock); - } - tipc_detach(topsrv.user_ref); - topsrv.user_ref = 0; - } + tipc_server_stop(&topsrv); } - - -int tipc_ispublished(struct tipc_name const *name) -{ - u32 domain = 0; - - return(tipc_nametbl_translate(name->type, name->instance,&domain) != 0); -} - |
