diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 3164 |
1 files changed, 3164 insertions, 0 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c new file mode 100644 index 00000000000..92acb80bb24 --- /dev/null +++ b/net/tipc/link.c @@ -0,0 +1,3164 @@ +/* + * net/tipc/link.c: TIPC link code + * + * Copyright (c) 2003-2005, Ericsson Research Canada + * Copyright (c) 2004-2005, Wind River Systems + * Copyright (c) 2005-2006, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "core.h" +#include "dbg.h" +#include "link.h" +#include "net.h" +#include "node.h" +#include "port.h" +#include "addr.h" +#include "node_subscr.h" +#include "name_distr.h" +#include "bearer.h" +#include "name_table.h" +#include "discover.h" +#include "config.h" +#include "bcast.h" + + +/* + * Limit for deferred reception queue: + */ + +#define DEF_QUEUE_LIMIT 256u + +/* + * Link state events: + */ + +#define STARTING_EVT 856384768 /* link processing trigger */ +#define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */ +#define TIMEOUT_EVT 560817u /* link timer expired */ + +/* + * The following two 'message types' is really just implementation + * data conveniently stored in the message header. + * They must not be considered part of the protocol + */ +#define OPEN_MSG 0 +#define CLOSED_MSG 1 + +/* + * State value stored in 'exp_msg_count' + */ + +#define START_CHANGEOVER 100000u + +/** + * struct link_name - deconstructed link name + * @addr_local: network address of node at this end + * @if_local: name of interface at this end + * @addr_peer: network address of node at far end + * @if_peer: name of interface at far end + */ + +struct link_name { + u32 addr_local; + char if_local[TIPC_MAX_IF_NAME]; + u32 addr_peer; + char if_peer[TIPC_MAX_IF_NAME]; +}; + +#if 0 + +/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */ + +/** + * struct link_event - link up/down event notification + */ + +struct link_event { + u32 addr; + int up; + void (*fcn)(u32, char *, int); + char name[TIPC_MAX_LINK_NAME]; +}; + +#endif + +static void link_handle_out_of_seq_msg(struct link *l_ptr, + struct sk_buff *buf); +static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf); +static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf); +static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); +static int link_send_sections_long(struct port *sender, + struct iovec const *msg_sect, + u32 num_sect, u32 destnode); +static void link_check_defragm_bufs(struct link *l_ptr); +static void link_state_event(struct link *l_ptr, u32 event); +static void link_reset_statistics(struct link *l_ptr); +static void link_print(struct link *l_ptr, struct print_buf *buf, + const char *str); + +/* + * Debugging code used by link routines only + * + * When debugging link problems on a system that has multiple links, + * the standard TIPC debugging routines may not be useful since they + * allow the output from multiple links to be intermixed. For this reason + * routines of the form "dbg_link_XXX()" have been created that will capture + * debug info into a link's personal print buffer, which can then be dumped + * into the TIPC system log (LOG) upon request. + * + * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size + * of the print buffer used by each link. If LINK_LOG_BUF_SIZE is set to 0, + * the dbg_link_XXX() routines simply send their output to the standard + * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful + * when there is only a single link in the system being debugged. + * + * Notes: + * - When enabled, LINK_LOG_BUF_SIZE should be set to at least 1000 (bytes) + * - "l_ptr" must be valid when using dbg_link_XXX() macros + */ + +#define LINK_LOG_BUF_SIZE 0 + +#define dbg_link(fmt, arg...) do {if (LINK_LOG_BUF_SIZE) tipc_printf(&l_ptr->print_buf, fmt, ## arg); } while(0) +#define dbg_link_msg(msg, txt) do {if (LINK_LOG_BUF_SIZE) msg_print(&l_ptr->print_buf, msg, txt); } while(0) +#define dbg_link_state(txt) do {if (LINK_LOG_BUF_SIZE) link_print(l_ptr, &l_ptr->print_buf, txt); } while(0) +#define dbg_link_dump() do { \ + if (LINK_LOG_BUF_SIZE) { \ + tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \ + printbuf_move(LOG, &l_ptr->print_buf); \ + } \ +} while (0) + +static inline void dbg_print_link(struct link *l_ptr, const char *str) +{ + if (DBG_OUTPUT) + link_print(l_ptr, DBG_OUTPUT, str); +} + +static inline void dbg_print_buf_chain(struct sk_buff *root_buf) +{ + if (DBG_OUTPUT) { + struct sk_buff *buf = root_buf; + + while (buf) { + msg_dbg(buf_msg(buf), "In chain: "); + buf = buf->next; + } + } +} + +/* + * Simple inlined link routines + */ + +static inline unsigned int align(unsigned int i) +{ + return (i + 3) & ~3u; +} + +static inline int link_working_working(struct link *l_ptr) +{ + return (l_ptr->state == WORKING_WORKING); +} + +static inline int link_working_unknown(struct link *l_ptr) +{ + return (l_ptr->state == WORKING_UNKNOWN); +} + +static inline int link_reset_unknown(struct link *l_ptr) +{ + return (l_ptr->state == RESET_UNKNOWN); +} + +static inline int link_reset_reset(struct link *l_ptr) +{ + return (l_ptr->state == RESET_RESET); +} + +static inline int link_blocked(struct link *l_ptr) +{ + return (l_ptr->exp_msg_count || l_ptr->blocked); +} + +static inline int link_congested(struct link *l_ptr) +{ + return (l_ptr->out_queue_size >= l_ptr->queue_limit[0]); +} + +static inline u32 link_max_pkt(struct link *l_ptr) +{ + return l_ptr->max_pkt; +} + +static inline void link_init_max_pkt(struct link *l_ptr) +{ + u32 max_pkt; + + max_pkt = (l_ptr->b_ptr->publ.mtu & ~3); + if (max_pkt > MAX_MSG_SIZE) + max_pkt = MAX_MSG_SIZE; + + l_ptr->max_pkt_target = max_pkt; + if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) + l_ptr->max_pkt = l_ptr->max_pkt_target; + else + l_ptr->max_pkt = MAX_PKT_DEFAULT; + + l_ptr->max_pkt_probes = 0; +} + +static inline u32 link_next_sent(struct link *l_ptr) +{ + if (l_ptr->next_out) + return msg_seqno(buf_msg(l_ptr->next_out)); + return mod(l_ptr->next_out_no); +} + +static inline u32 link_last_sent(struct link *l_ptr) +{ + return mod(link_next_sent(l_ptr) - 1); +} + +/* + * Simple non-inlined link routines (i.e. referenced outside this file) + */ + +int link_is_up(struct link *l_ptr) +{ + if (!l_ptr) + return 0; + return (link_working_working(l_ptr) || link_working_unknown(l_ptr)); +} + +int link_is_active(struct link *l_ptr) +{ + return ((l_ptr->owner->active_links[0] == l_ptr) || + (l_ptr->owner->active_links[1] == l_ptr)); +} + +/** + * link_name_validate - validate & (optionally) deconstruct link name + * @name - ptr to link name string + * @name_parts - ptr to area for link name components (or NULL if not needed) + * + * Returns 1 if link name is valid, otherwise 0. + */ + +static int link_name_validate(const char *name, struct link_name *name_parts) +{ + char name_copy[TIPC_MAX_LINK_NAME]; + char *addr_local; + char *if_local; + char *addr_peer; + char *if_peer; + char dummy; + u32 z_local, c_local, n_local; + u32 z_peer, c_peer, n_peer; + u32 if_local_len; + u32 if_peer_len; + + /* copy link name & ensure length is OK */ + + name_copy[TIPC_MAX_LINK_NAME - 1] = 0; + /* need above in case non-Posix strncpy() doesn't pad with nulls */ + strncpy(name_copy, name, TIPC_MAX_LINK_NAME); + if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0) + return 0; + + /* ensure all component parts of link name are present */ + + addr_local = name_copy; + if ((if_local = strchr(addr_local, ':')) == NULL) + return 0; + *(if_local++) = 0; + if ((addr_peer = strchr(if_local, '-')) == NULL) + return 0; + *(addr_peer++) = 0; + if_local_len = addr_peer - if_local; + if ((if_peer = strchr(addr_peer, ':')) == NULL) + return 0; + *(if_peer++) = 0; + if_peer_len = strlen(if_peer) + 1; + + /* validate component parts of link name */ + + if ((sscanf(addr_local, "%u.%u.%u%c", + &z_local, &c_local, &n_local, &dummy) != 3) || + (sscanf(addr_peer, "%u.%u.%u%c", + &z_peer, &c_peer, &n_peer, &dummy) != 3) || + (z_local > 255) || (c_local > 4095) || (n_local > 4095) || + (z_peer > 255) || (c_peer > 4095) || (n_peer > 4095) || + (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) || + (if_peer_len <= 1) || (if_peer_len > TIPC_MAX_IF_NAME) || + (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) || + (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1))) + return 0; + + /* return link name components, if necessary */ + + if (name_parts) { + name_parts->addr_local = tipc_addr(z_local, c_local, n_local); + strcpy(name_parts->if_local, if_local); + name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer); + strcpy(name_parts->if_peer, if_peer); + } + return 1; +} + +/** + * link_timeout - handle expiration of link timer + * @l_ptr: pointer to link + * + * This routine must not grab "net_lock" to avoid a potential deadlock conflict + * with link_delete(). (There is no risk that the node will be deleted by + * another thread because link_delete() always cancels the link timer before + * node_delete() is called.) + */ + +static void link_timeout(struct link *l_ptr) +{ + node_lock(l_ptr->owner); + + /* update counters used in statistical profiling of send traffic */ + + l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; + l_ptr->stats.queue_sz_counts++; + + if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) + l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; + + if (l_ptr->first_out) { + struct tipc_msg *msg = buf_msg(l_ptr->first_out); + u32 length = msg_size(msg); + + if ((msg_user(msg) == MSG_FRAGMENTER) + && (msg_type(msg) == FIRST_FRAGMENT)) { + length = msg_size(msg_get_wrapped(msg)); + } + if (length) { + l_ptr->stats.msg_lengths_total += length; + l_ptr->stats.msg_length_counts++; + if (length <= 64) + l_ptr->stats.msg_length_profile[0]++; + else if (length <= 256) + l_ptr->stats.msg_length_profile[1]++; + else if (length <= 1024) + l_ptr->stats.msg_length_profile[2]++; + else if (length <= 4096) + l_ptr->stats.msg_length_profile[3]++; + else if (length <= 16384) + l_ptr->stats.msg_length_profile[4]++; + else if (length <= 32768) + l_ptr->stats.msg_length_profile[5]++; + else + l_ptr->stats.msg_length_profile[6]++; + } + } + + /* do all other link processing performed on a periodic basis */ + + link_check_defragm_bufs(l_ptr); + + link_state_event(l_ptr, TIMEOUT_EVT); + + if (l_ptr->next_out) + link_push_queue(l_ptr); + + node_unlock(l_ptr->owner); +} + +static inline void link_set_timer(struct link *l_ptr, u32 time) +{ + k_start_timer(&l_ptr->timer, time); +} + +/** + * link_create - create a new link + * @b_ptr: pointer to associated bearer + * @peer: network address of node at other end of link + * @media_addr: media address to use when sending messages over link + * + * Returns pointer to link. + */ + +struct link *link_create(struct bearer *b_ptr, const u32 peer, + const struct tipc_media_addr *media_addr) +{ + struct link *l_ptr; + struct tipc_msg *msg; + char *if_name; + + l_ptr = (struct link *)kmalloc(sizeof(*l_ptr), GFP_ATOMIC); + if (!l_ptr) { + warn("Memory squeeze; Failed to create link\n"); + return NULL; + } + memset(l_ptr, 0, sizeof(*l_ptr)); + + l_ptr->addr = peer; + if_name = strchr(b_ptr->publ.name, ':') + 1; + sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", + tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), + tipc_node(tipc_own_addr), + if_name, + tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); + /* note: peer i/f is appended to link name by reset/activate */ + memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); + k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); + list_add_tail(&l_ptr->link_list, &b_ptr->links); + l_ptr->checkpoint = 1; + l_ptr->b_ptr = b_ptr; + link_set_supervision_props(l_ptr, b_ptr->media->tolerance); + l_ptr->state = RESET_UNKNOWN; + + l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; + msg = l_ptr->pmsg; + msg_init(msg, LINK_PROTOCOL, RESET_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); + msg_set_size(msg, sizeof(l_ptr->proto_msg)); + msg_set_session(msg, tipc_random); + msg_set_bearer_id(msg, b_ptr->identity); + strcpy((char *)msg_data(msg), if_name); + + l_ptr->priority = b_ptr->priority; + link_set_queue_limits(l_ptr, b_ptr->media->window); + + link_init_max_pkt(l_ptr); + + l_ptr->next_out_no = 1; + INIT_LIST_HEAD(&l_ptr->waiting_ports); + + link_reset_statistics(l_ptr); + + l_ptr->owner = node_attach_link(l_ptr); + if (!l_ptr->owner) { + kfree(l_ptr); + return NULL; + } + + if (LINK_LOG_BUF_SIZE) { + char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC); + + if (!pb) { + kfree(l_ptr); + warn("Memory squeeze; Failed to create link\n"); + return NULL; + } + printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE); + } + + k_signal((Handler)link_start, (unsigned long)l_ptr); + + dbg("link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n", + l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit); + + return l_ptr; +} + +/** + * link_delete - delete a link + * @l_ptr: pointer to link + * + * Note: 'net_lock' is write_locked, bearer is locked. + * This routine must not grab the node lock until after link timer cancellation + * to avoid a potential deadlock situation. + */ + +void link_delete(struct link *l_ptr) +{ + if (!l_ptr) { + err("Attempt to delete non-existent link\n"); + return; + } + + dbg("link_delete()\n"); + + k_cancel_timer(&l_ptr->timer); + + node_lock(l_ptr->owner); + link_reset(l_ptr); + node_detach_link(l_ptr->owner, l_ptr); + link_stop(l_ptr); + list_del_init(&l_ptr->link_list); + if (LINK_LOG_BUF_SIZE) + kfree(l_ptr->print_buf.buf); + node_unlock(l_ptr->owner); + k_term_timer(&l_ptr->timer); + kfree(l_ptr); +} + +void link_start(struct link *l_ptr) +{ + dbg("link_start %x\n", l_ptr); + link_state_event(l_ptr, STARTING_EVT); +} + +/** + * link_schedule_port - schedule port for deferred sending + * @l_ptr: pointer to link + * @origport: reference to sending port + * @sz: amount of data to be sent + * + * Schedules port for renewed sending of messages after link congestion + * has abated. + */ + +static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) +{ + struct port *p_ptr; + + spin_lock_bh(&port_list_lock); + p_ptr = port_lock(origport); + if (p_ptr) { + if (!p_ptr->wakeup) + goto exit; + if (!list_empty(&p_ptr->wait_list)) + goto exit; + p_ptr->congested_link = l_ptr; + p_ptr->publ.congested = 1; + p_ptr->waiting_pkts = 1 + ((sz - 1) / link_max_pkt(l_ptr)); + list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); + l_ptr->stats.link_congs++; +exit: + port_unlock(p_ptr); + } + spin_unlock_bh(&port_list_lock); + return -ELINKCONG; +} + +void link_wakeup_ports(struct link *l_ptr, int all) +{ + struct port *p_ptr; + struct port *temp_p_ptr; + int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; + + if (all) + win = 100000; + if (win <= 0) + return; + if (!spin_trylock_bh(&port_list_lock)) + return; + if (link_congested(l_ptr)) + goto exit; + list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, + wait_list) { + if (win <= 0) + break; + list_del_init(&p_ptr->wait_list); + p_ptr->congested_link = 0; + assert(p_ptr->wakeup); + spin_lock_bh(p_ptr->publ.lock); + p_ptr->publ.congested = 0; + p_ptr->wakeup(&p_ptr->publ); + win -= p_ptr->waiting_pkts; + spin_unlock_bh(p_ptr->publ.lock); + } + +exit: + spin_unlock_bh(&port_list_lock); +} + +/** + * link_release_outqueue - purge link's outbound message queue + * @l_ptr: pointer to link + */ + +static void link_release_outqueue(struct link *l_ptr) +{ + struct sk_buff *buf = l_ptr->first_out; + struct sk_buff *next; + + while (buf) { + next = buf->next; + buf_discard(buf); + buf = next; + } + l_ptr->first_out = NULL; + l_ptr->out_queue_size = 0; +} + +/** + * link_reset_fragments - purge link's inbound message fragments queue + * @l_ptr: pointer to link + */ + +void link_reset_fragments(struct link *l_ptr) +{ + struct sk_buff *buf = l_ptr->defragm_buf; + struct sk_buff *next; + + while (buf) { + next = buf->next; + buf_discard(buf); + buf = next; + } + l_ptr->defragm_buf = NULL; +} + +/** + * link_stop - purge all inbound and outbound messages associated with link + * @l_ptr: pointer to link + */ + +void link_stop(struct link *l_ptr) +{ + struct sk_buff *buf; + struct sk_buff *next; + + buf = l_ptr->oldest_deferred_in; + while (buf) { + next = buf->next; + buf_discard(buf); + buf = next; + } + + buf = l_ptr->first_out; + while (buf) { + next = buf->next; + buf_discard(buf); + buf = next; + } + + link_reset_fragments(l_ptr); + + buf_discard(l_ptr->proto_msg_queue); + l_ptr->proto_msg_queue = NULL; +} + +#if 0 + +/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */ + +static void link_recv_event(struct link_event *ev) +{ + ev->fcn(ev->addr, ev->name, ev->up); + kfree(ev); +} + +static void link_send_event(void (*fcn)(u32 a, char *n, int up), + struct link *l_ptr, int up) +{ + struct link_event *ev; + + ev = kmalloc(sizeof(*ev), GFP_ATOMIC); + if (!ev) { + warn("Link event allocation failure\n"); + return; + } + ev->addr = l_ptr->addr; + ev->up = up; + ev->fcn = fcn; + memcpy(ev->name, l_ptr->name, TIPC_MAX_LINK_NAME); + k_signal((Handler)link_recv_event, (unsigned long)ev); +} + +#else + +#define link_send_event(fcn, l_ptr, up) do { } while (0) + +#endif + +void link_reset(struct link *l_ptr) +{ + struct sk_buff *buf; + u32 prev_state = l_ptr->state; + u32 checkpoint = l_ptr->next_in_no; + + msg_set_session(l_ptr->pmsg, msg_session(l_ptr->pmsg) + 1); + + /* Link is down, accept any session: */ + l_ptr->peer_session = 0; + + /* Prepare for max packet size negotiation */ + link_init_max_pkt(l_ptr); + + l_ptr->state = RESET_UNKNOWN; + dbg_link_state("Resetting Link\n"); + + if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) + return; + + node_link_down(l_ptr->owner, l_ptr); + bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); +#if 0 + tipc_printf(CONS, "\nReset link <%s>\n", l_ptr->name); + dbg_link_dump(); +#endif + if (node_has_active_links(l_ptr->owner) && + l_ptr->owner->permit_changeover) { + l_ptr->reset_checkpoint = checkpoint; + l_ptr->exp_msg_count = START_CHANGEOVER; + } + + /* Clean up all queues: */ + + link_release_outqueue(l_ptr); + buf_discard(l_ptr->proto_msg_queue); + l_ptr->proto_msg_queue = NULL; + buf = l_ptr->oldest_deferred_in; + while (buf) { + struct sk_buff *next = buf->next; + buf_discard(buf); + buf = next; + } + if (!list_empty(&l_ptr->waiting_ports)) + link_wakeup_ports(l_ptr, 1); + + l_ptr->retransm_queue_head = 0; + l_ptr->retransm_queue_size = 0; + l_ptr->last_out = NULL; + l_ptr->first_out = NULL; + l_ptr->next_out = NULL; + l_ptr->unacked_window = 0; + l_ptr->checkpoint = 1; + l_ptr->next_out_no = 1; + l_ptr->deferred_inqueue_sz = 0; + l_ptr->oldest_deferred_in = NULL; + l_ptr->newest_deferred_in = NULL; + l_ptr->fsm_msg_cnt = 0; + l_ptr->stale_count = 0; + link_reset_statistics(l_ptr); + + link_send_event(cfg_link_event, l_ptr, 0); + if (!in_own_cluster(l_ptr->addr)) + link_send_event(disc_link_event, l_ptr, 0); +} + + +static void link_activate(struct link *l_ptr) +{ + l_ptr->next_in_no = 1; + node_link_up(l_ptr->owner, l_ptr); + bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); + link_send_event(cfg_link_event, l_ptr, 1); + if (!in_own_cluster(l_ptr->addr)) + link_send_event(disc_link_event, l_ptr, 1); +} + +/** + * link_state_event - link finite state machine + * @l_ptr: pointer to link + * @event: state machine event to process + */ + +static void link_state_event(struct link *l_ptr, unsigned event) +{ + struct link *other; + u32 cont_intv = l_ptr->continuity_interval; + + if (!l_ptr->started && (event != STARTING_EVT)) + return; /* Not yet. */ + + if (link_blocked(l_ptr)) { + if (event == TIMEOUT_EVT) { + link_set_timer(l_ptr, cont_intv); + } + return; /* Changeover going on */ + } + dbg_link("STATE_EV: <%s> ", l_ptr->name); + + switch (l_ptr->state) { + case WORKING_WORKING: + dbg_link("WW/"); + switch (event) { + case TRAFFIC_MSG_EVT: + dbg_link("TRF-"); + /* fall through */ + case ACTIVATE_MSG: + dbg_link("ACT\n"); + break; + case TIMEOUT_EVT: + dbg_link("TIM "); + if (l_ptr->next_in_no != l_ptr->checkpoint) { + l_ptr->checkpoint = l_ptr->next_in_no; + if (bclink_acks_missing(l_ptr->owner)) { + link_send_proto_msg(l_ptr, STATE_MSG, + 0, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { + link_send_proto_msg(l_ptr, STATE_MSG, + 1, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + } + link_set_timer(l_ptr, cont_intv); + break; + } + dbg_link(" -> WU\n"); + l_ptr->state = WORKING_UNKNOWN; + l_ptr->fsm_msg_cnt = 0; + link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv / 4); + break; + case RESET_MSG: + dbg_link("RES -> RR\n"); + link_reset(l_ptr); + l_ptr->state = RESET_RESET; + l_ptr->fsm_msg_cnt = 0; + link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + break; + default: + err("Unknown link event %u in WW state\n", event); + } + break; + case WORKING_UNKNOWN: + dbg_link("WU/"); + switch (event) { + case TRAFFIC_MSG_EVT: + dbg_link("TRF-"); + case ACTIVATE_MSG: + dbg_link("ACT -> WW\n"); + l_ptr->state = WORKING_WORKING; + l_ptr->fsm_msg_cnt = 0; + link_set_timer(l_ptr, cont_intv); + break; + case RESET_MSG: + dbg_link("RES -> RR\n"); + link_reset(l_ptr); + l_ptr->state = RESET_RESET; + l_ptr->fsm_msg_cnt = 0; + link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + break; + case TIMEOUT_EVT: + dbg_link("TIM "); + if (l_ptr->next_in_no != l_ptr->checkpoint) { + dbg_link("-> WW \n"); + l_ptr->state = WORKING_WORKING; + l_ptr->fsm_msg_cnt = 0; + l_ptr->checkpoint = l_ptr->next_in_no; + if (bclink_acks_missing(l_ptr->owner)) { + link_send_proto_msg(l_ptr, STATE_MSG, + 0, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + } + link_set_timer(l_ptr, cont_intv); + } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { + dbg_link("Probing %u/%u,timer = %u ms)\n", + l_ptr->fsm_msg_cnt, l_ptr->abort_limit, + cont_intv / 4); + link_send_proto_msg(l_ptr, STATE_MSG, + 1, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv / 4); + } else { /* Link has failed */ + dbg_link("-> RU (%u probes unanswered)\n", + l_ptr->fsm_msg_cnt); + link_reset(l_ptr); + l_ptr->state = RESET_UNKNOWN; + l_ptr->fsm_msg_cnt = 0; + link_send_proto_msg(l_ptr, RESET_MSG, + 0, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + } + break; + default: + err("Unknown link event %u in WU state\n", event); + } + break; + case RESET_UNKNOWN: + dbg_link("RU/"); + switch (event) { + case TRAFFIC_MSG_EVT: + dbg_link("TRF-\n"); + break; + case ACTIVATE_MSG: + other = l_ptr->owner->active_links[0]; + if (other && link_working_unknown(other)) { + dbg_link("ACT\n"); + break; + } + dbg_link("ACT -> WW\n"); + l_ptr->state = WORKING_WORKING; + l_ptr->fsm_msg_cnt = 0; + link_activate(l_ptr); + link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + break; + case RESET_MSG: + dbg_link("RES \n"); + dbg_link(" -> RR\n"); + l_ptr->state = RESET_RESET; + l_ptr->fsm_msg_cnt = 0; + link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + break; + case STARTING_EVT: + dbg_link("START-"); + l_ptr->started = 1; + /* fall through */ + case TIMEOUT_EVT: + dbg_link("TIM \n"); + link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + break; + default: + err("Unknown link event %u in RU state\n", event); + } + break; + case RESET_RESET: + dbg_link("RR/ "); + switch (event) { + case TRAFFIC_MSG_EVT: + dbg_link("TRF-"); + /* fall through */ + case ACTIVATE_MSG: + other = l_ptr->owner->active_links[0]; + if (other && link_working_unknown(other)) { + dbg_link("ACT\n"); + break; + } + dbg_link("ACT -> WW\n"); + l_ptr->state = WORKING_WORKING; + l_ptr->fsm_msg_cnt = 0; + link_activate(l_ptr); + link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + break; + case RESET_MSG: + dbg_link("RES\n"); + break; + case TIMEOUT_EVT: + dbg_link("TIM\n"); + link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); + l_ptr->fsm_msg_cnt++; + link_set_timer(l_ptr, cont_intv); + dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt); + break; + default: + err("Unknown link event %u in RR state\n", event); + } + break; + default: + err("Unknown link state %u/%u\n", l_ptr->state, event); + } +} + +/* + * link_bundle_buf(): Append contents of a buffer to + * the tail of an existing one. + */ + +static int link_bundle_buf(struct link *l_ptr, + struct sk_buff *bundler, + struct sk_buff *buf) +{ + struct tipc_msg *bundler_msg = buf_msg(bundler); + struct tipc_msg *msg = buf_msg(buf); + u32 size = msg_size(msg); + u32 to_pos = align(msg_size(bundler_msg)); + u32 rest = link_max_pkt(l_ptr) - to_pos; + + if (msg_user(bundler_msg) != MSG_BUNDLER) + return 0; + if (msg_type(bundler_msg) != OPEN_MSG) + return 0; + if (rest < align(size)) + return 0; + + skb_put(bundler, (to_pos - msg_size(bundler_msg)) + size); + memcpy(bundler->data + to_pos, buf->data, size); + msg_set_size(bundler_msg, to_pos + size); + msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); + dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n", + msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg)); + msg_dbg(msg, "PACKD:"); + buf_discard(buf); + l_ptr->stats.sent_bundled++; + return 1; +} + +static inline void link_add_to_outqueue(struct link *l_ptr, + struct sk_buff *buf, + struct tipc_msg *msg) +{ + u32 ack = mod(l_ptr->next_in_no - 1); + u32 seqno = mod(l_ptr->next_out_no++); + + msg_set_word(msg, 2, ((ack << 16) | seqno)); + msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); + buf->next = NULL; + if (l_ptr->first_out) { + l_ptr->last_out->next = buf; + l_ptr->last_out = buf; + } else + l_ptr->first_out = l_ptr->last_out = buf; + l_ptr->out_queue_size++; +} + +/* + * link_send_buf() is the 'full path' for messages, called from + * inside TIPC when the 'fast path' in tipc_send_buf + * has failed, and from link_send() + */ + +int link_send_buf(struct link *l_ptr, struct sk_buff *buf) +{ + struct tipc_msg *msg = buf_msg(buf); + u32 size = msg_size(msg); + u32 dsz = msg_data_sz(msg); + u32 queue_size = l_ptr->out_queue_size; + u32 imp = msg_tot_importance(msg); + u32 queue_limit = l_ptr->queue_limit[imp]; + u32 max_packet = link_max_pkt(l_ptr); + + msg_set_prevnode(msg, tipc_own_addr); /* If routed message */ + + /* Match msg importance against queue limits: */ + + if (unlikely(queue_size >= queue_limit)) { + if (imp <= TIPC_CRITICAL_IMPORTANCE) { + return link_schedule_port(l_ptr, msg_origport(msg), + size); + } + msg_dbg(msg, "TIPC: Congestion, throwing away\n"); + buf_discard(buf); + if (imp > CONN_MANAGER) { + warn("Resetting <%s>, send queue full", l_ptr->name); + link_reset(l_ptr); + } + return dsz; + } + + /* Fragmentation needed ? */ + + if (size > max_packet) + return link_send_long_buf(l_ptr, buf); + + /* Packet can be queued or sent: */ + + if (queue_size > l_ptr->stats.max_queue_sz) + l_ptr->stats.max_queue_sz = queue_size; + + if (likely(!bearer_congested(l_ptr->b_ptr, l_ptr) && + !link_congested(l_ptr))) { + link_add_to_outqueue(l_ptr, buf, msg); + + if (likely(bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { + l_ptr->unacked_window = 0; + } else { + bearer_schedule(l_ptr->b_ptr, l_ptr); + l_ptr->stats.bearer_congs++; + l_ptr->next_out = buf; + } + return dsz; + } + /* Congestion: can message be bundled ?: */ + + if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && + (msg_user(msg) != MSG_FRAGMENTER)) { + + /* Try adding message to an existing bundle */ + + if (l_ptr->next_out && + link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { + bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); + return dsz; + } + + /* Try creating a new bundle */ + + if (size <= max_packet * 2 / 3) { + struct sk_buff *bundler = buf_acquire(max_packet); + struct tipc_msg bundler_hdr; + + if (bundler) { + msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, + TIPC_OK, INT_H_SIZE, l_ptr->addr); + memcpy(bundler->data, (unchar *)&bundler_hdr, + INT_H_SIZE); + skb_trim(bundler, INT_H_SIZE); + link_bundle_buf(l_ptr, bundler, buf); + buf = bundler; + msg = buf_msg(buf); + l_ptr->stats.sent_bundles++; + } + } + } + if (!l_ptr->next_out) + l_ptr->next_out = buf; + link_add_to_outqueue(l_ptr, buf, msg); + bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); + return dsz; +} + +/* + * link_send(): same as link_send_buf(), but the link to use has + * not been selected yet, and the the owner node is not locked + * Called by TIPC internal users, e.g. the name distributor + */ + +int link_send(struct sk_buff *buf, u32 dest, u32 selector) +{ + struct link *l_ptr; + struct node *n_ptr; + int res = -ELINKCONG; + + read_lock_bh(&net_lock); + n_ptr = node_select(dest, selector); + if (n_ptr) { + node_lock(n_ptr); + l_ptr = n_ptr->active_links[selector & 1]; + dbg("link_send: found link %x for dest %x\n", l_ptr, dest); + if (l_ptr) { + res = link_send_buf(l_ptr, buf); + } + node_unlock(n_ptr); + } else { + dbg("Attempt to send msg to unknown node:\n"); + msg_dbg(buf_msg(buf),">>>"); + buf_discard(buf); + } + read_unlock_bh(&net_lock); + return res; +} + +/* + * link_send_buf_fast: Entry for data messages where the + * destination link is known and the header is complete, + * inclusive total message length. Very time critical. + * Link is locked. Returns user data length. + */ + +static inline int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf, + u32 *used_max_pkt) +{ + struct tipc_msg *msg = buf_msg(buf); + int res = msg_data_sz(msg); + + if (likely(!link_congested(l_ptr))) { + if (likely(msg_size(msg) <= link_max_pkt(l_ptr))) { + if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { + link_add_to_outqueue(l_ptr, buf, msg); + if (likely(bearer_send(l_ptr->b_ptr, buf, + &l_ptr->media_addr))) { + l_ptr->unacked_window = 0; + msg_dbg(msg,"SENT_FAST:"); + return res; + } + dbg("failed sent fast...\n"); + bearer_schedule(l_ptr->b_ptr, l_ptr); + l_ptr->stats.bearer_congs++; + l_ptr->next_out = buf; + return res; + } + } + else + *used_max_pkt = link_max_pkt(l_ptr); + } + return link_send_buf(l_ptr, buf); /* All other cases */ +} + +/* + * tipc_send_buf_fast: Entry for data messages where the + * destination node is known and the header is complete, + * inclusive total message length. + * Returns user data length. + */ +int tipc_send_buf_fast( |