diff options
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r-- | net/tipc/link.c | 232 |
1 files changed, 129 insertions, 103 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c index 18702f58d11..5ed4b4f7452 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2,7 +2,7 @@ * net/tipc/link.c: TIPC link code * * Copyright (c) 1996-2007, Ericsson AB - * Copyright (c) 2004-2007, Wind River Systems + * Copyright (c) 2004-2007, 2010-2011, Wind River Systems * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -90,9 +90,10 @@ static void link_handle_out_of_seq_msg(struct link *l_ptr, static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf); static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf); static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); -static int link_send_sections_long(struct port *sender, +static int link_send_sections_long(struct tipc_port *sender, struct iovec const *msg_sect, - u32 num_sect, u32 destnode); + u32 num_sect, unsigned int total_len, + u32 destnode); static void link_check_defragm_bufs(struct link *l_ptr); static void link_state_event(struct link *l_ptr, u32 event); static void link_reset_statistics(struct link *l_ptr); @@ -113,7 +114,7 @@ static void link_init_max_pkt(struct link *l_ptr) { u32 max_pkt; - max_pkt = (l_ptr->b_ptr->publ.mtu & ~3); + max_pkt = (l_ptr->b_ptr->mtu & ~3); if (max_pkt > MAX_MSG_SIZE) max_pkt = MAX_MSG_SIZE; @@ -246,9 +247,6 @@ static void link_timeout(struct link *l_ptr) l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; l_ptr->stats.queue_sz_counts++; - if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) - l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; - if (l_ptr->first_out) { struct tipc_msg *msg = buf_msg(l_ptr->first_out); u32 length = msg_size(msg); @@ -296,19 +294,35 @@ static void link_set_timer(struct link *l_ptr, u32 time) /** * tipc_link_create - create a new link + * @n_ptr: pointer to associated node * @b_ptr: pointer to associated bearer - * @peer: network address of node at other end of link * @media_addr: media address to use when sending messages over link * * Returns pointer to link. */ -struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, +struct link *tipc_link_create(struct tipc_node *n_ptr, + struct tipc_bearer *b_ptr, const struct tipc_media_addr *media_addr) { struct link *l_ptr; struct tipc_msg *msg; char *if_name; + char addr_string[16]; + u32 peer = n_ptr->addr; + + if (n_ptr->link_cnt >= 2) { + tipc_addr_string_fill(addr_string, n_ptr->addr); + err("Attempt to establish third link to %s\n", addr_string); + return NULL; + } + + if (n_ptr->links[b_ptr->identity]) { + tipc_addr_string_fill(addr_string, n_ptr->addr); + err("Attempt to establish second link on <%s> to %s\n", + b_ptr->name, addr_string); + return NULL; + } l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); if (!l_ptr) { @@ -317,7 +331,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, } l_ptr->addr = peer; - if_name = strchr(b_ptr->publ.name, ':') + 1; + if_name = strchr(b_ptr->name, ':') + 1; sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), tipc_node(tipc_own_addr), @@ -325,6 +339,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); /* note: peer i/f is appended to link name by reset/activate */ memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); + l_ptr->owner = n_ptr; l_ptr->checkpoint = 1; l_ptr->b_ptr = b_ptr; link_set_supervision_props(l_ptr, b_ptr->media->tolerance); @@ -348,11 +363,7 @@ struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, link_reset_statistics(l_ptr); - l_ptr->owner = tipc_node_attach_link(l_ptr); - if (!l_ptr->owner) { - kfree(l_ptr); - return NULL; - } + tipc_node_attach_link(n_ptr, l_ptr); k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); list_add_tail(&l_ptr->link_list, &b_ptr->links); @@ -391,7 +402,9 @@ void tipc_link_delete(struct link *l_ptr) static void link_start(struct link *l_ptr) { + tipc_node_lock(l_ptr->owner); link_state_event(l_ptr, STARTING_EVT); + tipc_node_unlock(l_ptr->owner); } /** @@ -406,7 +419,7 @@ static void link_start(struct link *l_ptr) static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) { - struct port *p_ptr; + struct tipc_port *p_ptr; spin_lock_bh(&tipc_port_list_lock); p_ptr = tipc_port_lock(origport); @@ -415,7 +428,7 @@ static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) goto exit; if (!list_empty(&p_ptr->wait_list)) goto exit; - p_ptr->publ.congested = 1; + p_ptr->congested = 1; p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); l_ptr->stats.link_congs++; @@ -428,8 +441,8 @@ exit: void tipc_link_wakeup_ports(struct link *l_ptr, int all) { - struct port *p_ptr; - struct port *temp_p_ptr; + struct tipc_port *p_ptr; + struct tipc_port *temp_p_ptr; int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; if (all) @@ -445,11 +458,11 @@ void tipc_link_wakeup_ports(struct link *l_ptr, int all) if (win <= 0) break; list_del_init(&p_ptr->wait_list); - spin_lock_bh(p_ptr->publ.lock); - p_ptr->publ.congested = 0; - p_ptr->wakeup(&p_ptr->publ); + spin_lock_bh(p_ptr->lock); + p_ptr->congested = 0; + p_ptr->wakeup(p_ptr); win -= p_ptr->waiting_pkts; - spin_unlock_bh(p_ptr->publ.lock); + spin_unlock_bh(p_ptr->lock); } exit: @@ -549,7 +562,7 @@ void tipc_link_reset(struct link *l_ptr) tipc_node_link_down(l_ptr->owner, l_ptr); tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); - if (was_active_link && tipc_node_has_active_links(l_ptr->owner) && + if (was_active_link && tipc_node_active_links(l_ptr->owner) && l_ptr->owner->permit_changeover) { l_ptr->reset_checkpoint = checkpoint; l_ptr->exp_msg_count = START_CHANGEOVER; @@ -824,7 +837,29 @@ static void link_add_to_outqueue(struct link *l_ptr, l_ptr->last_out = buf; } else l_ptr->first_out = l_ptr->last_out = buf; + l_ptr->out_queue_size++; + if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) + l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; +} + +static void link_add_chain_to_outqueue(struct link *l_ptr, + struct sk_buff *buf_chain, + u32 long_msgno) +{ + struct sk_buff *buf; + struct tipc_msg *msg; + + if (!l_ptr->next_out) + l_ptr->next_out = buf_chain; + while (buf_chain) { + buf = buf_chain; + buf_chain = buf_chain->next; + + msg = buf_msg(buf); + msg_set_long_msgno(msg, long_msgno); + link_add_to_outqueue(l_ptr, buf, msg); + } } /* @@ -849,8 +884,9 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf) if (unlikely(queue_size >= queue_limit)) { if (imp <= TIPC_CRITICAL_IMPORTANCE) { - return link_schedule_port(l_ptr, msg_origport(msg), - size); + link_schedule_port(l_ptr, msg_origport(msg), size); + buf_discard(buf); + return -ELINKCONG; } buf_discard(buf); if (imp > CONN_MANAGER) { @@ -867,9 +903,6 @@ int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf) /* Packet can be queued or sent: */ - if (queue_size > l_ptr->stats.max_queue_sz) - l_ptr->stats.max_queue_sz = queue_size; - if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && !link_congested(l_ptr))) { link_add_to_outqueue(l_ptr, buf, msg); @@ -1027,12 +1060,13 @@ int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode) * except for total message length. * Returns user data length or errno. */ -int tipc_link_send_sections_fast(struct port *sender, +int tipc_link_send_sections_fast(struct tipc_port *sender, struct iovec const *msg_sect, const u32 num_sect, + unsigned int total_len, u32 destaddr) { - struct tipc_msg *hdr = &sender->publ.phdr; + struct tipc_msg *hdr = &sender->phdr; struct link *l_ptr; struct sk_buff *buf; struct tipc_node *node; @@ -1045,8 +1079,8 @@ again: * (Must not hold any locks while building message.) */ - res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt, - !sender->user_port, &buf); + res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, + sender->max_pkt, !sender->user_port, &buf); read_lock_bh(&tipc_net_lock); node = tipc_node_find(destaddr); @@ -1056,9 +1090,7 @@ again: if (likely(l_ptr)) { if (likely(buf)) { res = link_send_buf_fast(l_ptr, buf, - &sender->publ.max_pkt); - if (unlikely(res < 0)) - buf_discard(buf); + &sender->max_pkt); exit: tipc_node_unlock(node); read_unlock_bh(&tipc_net_lock); @@ -1075,7 +1107,7 @@ exit: if (link_congested(l_ptr) || !list_empty(&l_ptr->b_ptr->cong_links)) { res = link_schedule_port(l_ptr, - sender->publ.ref, res); + sender->ref, res); goto exit; } @@ -1084,16 +1116,17 @@ exit: * then re-try fast path or fragment the message */ - sender->publ.max_pkt = l_ptr->max_pkt; + sender->max_pkt = l_ptr->max_pkt; tipc_node_unlock(node); read_unlock_bh(&tipc_net_lock); - if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt) + if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) goto again; return link_send_sections_long(sender, msg_sect, - num_sect, destaddr); + num_sect, total_len, + destaddr); } tipc_node_unlock(node); } @@ -1105,7 +1138,7 @@ exit: return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); if (res >= 0) return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, - TIPC_ERR_NO_NODE); + total_len, TIPC_ERR_NO_NODE); return res; } @@ -1123,15 +1156,16 @@ exit: * * Returns user data length or errno. */ -static int link_send_sections_long(struct port *sender, +static int link_send_sections_long(struct tipc_port *sender, struct iovec const *msg_sect, u32 num_sect, + unsigned int total_len, u32 destaddr) { struct link *l_ptr; struct tipc_node *node; - struct tipc_msg *hdr = &sender->publ.phdr; - u32 dsz = msg_data_sz(hdr); + struct tipc_msg *hdr = &sender->phdr; + u32 dsz = total_len; u32 max_pkt, fragm_sz, rest; struct tipc_msg fragm_hdr; struct sk_buff *buf, *buf_chain, *prev; @@ -1142,7 +1176,7 @@ static int link_send_sections_long(struct port *sender, again: fragm_no = 1; - max_pkt = sender->publ.max_pkt - INT_H_SIZE; + max_pkt = sender->max_pkt - INT_H_SIZE; /* leave room for tunnel header in case of link changeover */ fragm_sz = max_pkt - INT_H_SIZE; /* leave room for fragmentation header in each fragment */ @@ -1157,7 +1191,6 @@ again: tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(hdr)); - msg_set_link_selector(&fragm_hdr, sender->publ.ref); msg_set_size(&fragm_hdr, max_pkt); msg_set_fragm_no(&fragm_hdr, 1); @@ -1238,13 +1271,13 @@ error: node = tipc_node_find(destaddr); if (likely(node)) { tipc_node_lock(node); - l_ptr = node->active_links[sender->publ.ref & 1]; + l_ptr = node->active_links[sender->ref & 1]; if (!l_ptr) { tipc_node_unlock(node); goto reject; } if (l_ptr->max_pkt < max_pkt) { - sender->publ.max_pkt = l_ptr->max_pkt; + sender->max_pkt = l_ptr->max_pkt; tipc_node_unlock(node); for (; buf_chain; buf_chain = buf) { buf = buf_chain->next; @@ -1259,28 +1292,15 @@ reject: buf_discard(buf_chain); } return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, - TIPC_ERR_NO_NODE); + total_len, TIPC_ERR_NO_NODE); } - /* Append whole chain to send queue: */ + /* Append chain of fragments to send queue & send them */ - buf = buf_chain; - l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); - if (!l_ptr->next_out) - l_ptr->next_out = buf_chain; + l_ptr->long_msg_seq_no++; + link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); + l_ptr->stats.sent_fragments += fragm_no; l_ptr->stats.sent_fragmented++; - while (buf) { - struct sk_buff *next = buf->next; - struct tipc_msg *msg = buf_msg(buf); - - l_ptr->stats.sent_fragments++; - msg_set_long_msgno(msg, l_ptr->long_msg_seq_no); - link_add_to_outqueue(l_ptr, buf, msg); - buf = next; - } - - /* Send it, if possible: */ - tipc_link_push_queue(l_ptr); tipc_node_unlock(node); return dsz; @@ -1441,7 +1461,7 @@ static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf) info("Outstanding acks: %lu\n", (unsigned long) TIPC_SKB_CB(buf)->handle); - n_ptr = l_ptr->owner->next; + n_ptr = tipc_bclink_retransmit_to(); tipc_node_lock(n_ptr); tipc_addr_string_fill(addr_string, n_ptr->addr); @@ -1595,11 +1615,10 @@ static int link_recv_buf_validate(struct sk_buff *buf) * structure (i.e. cannot be NULL), but bearer can be inactive. */ -void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr) +void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) { read_lock_bh(&tipc_net_lock); while (head) { - struct bearer *b_ptr = (struct bearer *)tb_ptr; struct tipc_node *n_ptr; struct link *l_ptr; struct sk_buff *crs; @@ -1735,10 +1754,6 @@ deliver: tipc_node_unlock(n_ptr); tipc_link_recv_bundle(buf); continue; - case ROUTE_DISTRIBUTOR: - tipc_node_unlock(n_ptr); - buf_discard(buf); - continue; case NAME_DISTRIBUTOR: tipc_node_unlock(n_ptr); tipc_named_recv(buf); @@ -1765,6 +1780,10 @@ deliver: goto protocol_check; } break; + default: + buf_discard(buf); + buf = NULL; + break; } } tipc_node_unlock(n_ptr); @@ -1900,6 +1919,7 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, struct sk_buff *buf = NULL; struct tipc_msg *msg = l_ptr->pmsg; u32 msg_size = sizeof(l_ptr->proto_msg); + int r_flag; if (link_blocked(l_ptr)) return; @@ -1950,15 +1970,14 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); msg_set_seq_gap(msg, 0); msg_set_next_sent(msg, 1); + msg_set_probe(msg, 0); msg_set_link_tolerance(msg, l_ptr->tolerance); msg_set_linkprio(msg, l_ptr->priority); msg_set_max_pkt(msg, l_ptr->max_pkt_target); } - if (tipc_node_has_redundant_links(l_ptr->owner)) - msg_set_redundant_link(msg); - else - msg_clear_redundant_link(msg); + r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); + msg_set_redundant_link(msg, r_flag); msg_set_linkprio(msg, l_ptr->priority); /* Ensure sequence number will not fit : */ @@ -1978,7 +1997,6 @@ void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); return; } - msg_set_timestamp(msg, jiffies_to_msecs(jiffies)); /* Message can be sent */ @@ -2066,7 +2084,7 @@ static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf) l_ptr->peer_bearer_id = msg_bearer_id(msg); /* Synchronize broadcast sequence numbers */ - if (!tipc_node_has_redundant_links(l_ptr->owner)) + if (!tipc_node_redundant_links(l_ptr->owner)) l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); break; case STATE_MSG: @@ -2397,6 +2415,8 @@ void tipc_link_recv_bundle(struct sk_buff *buf) */ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) { + struct sk_buff *buf_chain = NULL; + struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain; struct tipc_msg *inmsg = buf_msg(buf); struct tipc_msg fragm_hdr; u32 insize = msg_size(inmsg); @@ -2405,7 +2425,7 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) u32 rest = insize; u32 pack_sz = l_ptr->max_pkt; u32 fragm_sz = pack_sz - INT_H_SIZE; - u32 fragm_no = 1; + u32 fragm_no = 0; u32 destaddr; if (msg_short(inmsg)) @@ -2413,17 +2433,10 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) else destaddr = msg_destnode(inmsg); - if (msg_routed(inmsg)) - msg_set_prevnode(inmsg, tipc_own_addr); - /* Prepare reusable fragment header: */ tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, INT_H_SIZE, destaddr); - msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg)); - msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++)); - msg_set_fragm_no(&fragm_hdr, fragm_no); - l_ptr->stats.sent_fragmented++; /* Chop up message: */ @@ -2436,27 +2449,37 @@ static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) } fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); if (fragm == NULL) { - warn("Link unable to fragment message\n"); - dsz = -ENOMEM; - goto exit; + buf_discard(buf); + while (buf_chain) { + buf = buf_chain; + buf_chain = buf_chain->next; + buf_discard(buf); + } + return -ENOMEM; } msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); + fragm_no++; + msg_set_fragm_no(&fragm_hdr, fragm_no); skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, fragm_sz); - /* Send queued messages first, if any: */ + buf_chain_tail->next = fragm; + buf_chain_tail = fragm; - l_ptr->stats.sent_fragments++; - tipc_link_send_buf(l_ptr, fragm); - if (!tipc_link_is_up(l_ptr)) - return dsz; - msg_set_fragm_no(&fragm_hdr, ++fragm_no); rest -= fragm_sz; crs += fragm_sz; msg_set_type(&fragm_hdr, FRAGMENT); } -exit: buf_discard(buf); + + /* Append chain of fragments to send queue & send them */ + + l_ptr->long_msg_seq_no++; + link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no); + l_ptr->stats.sent_fragments += fragm_no; + l_ptr->stats.sent_fragmented++; + tipc_link_push_queue(l_ptr); + return dsz; } @@ -2464,7 +2487,7 @@ exit: * A pending message being re-assembled must store certain values * to handle subsequent fragments correctly. The following functions * help storing these values in unused, available fields in the - * pending message. This makes dynamic memory allocation unecessary. + * pending message. This makes dynamic memory allocation unnecessary. */ static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno) @@ -2618,6 +2641,9 @@ static void link_check_defragm_bufs(struct link *l_ptr) static void link_set_supervision_props(struct link *l_ptr, u32 tolerance) { + if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) + return; + l_ptr->tolerance = tolerance; l_ptr->continuity_interval = ((tolerance / 4) > 500) ? 500 : tolerance / 4; @@ -2658,7 +2684,7 @@ void tipc_link_set_queue_limits(struct link *l_ptr, u32 window) static struct link *link_find_link(const char *name, struct tipc_node **node) { struct link_name link_name_parts; - struct bearer *b_ptr; + struct tipc_bearer *b_ptr; struct link *l_ptr; if (!link_name_validate(name, &link_name_parts)) @@ -2961,7 +2987,7 @@ static void link_print(struct link *l_ptr, const char *str) tipc_printf(buf, str); tipc_printf(buf, "Link %x<%s>:", - l_ptr->addr, l_ptr->b_ptr->publ.name); + l_ptr->addr, l_ptr->b_ptr->name); #ifdef CONFIG_TIPC_DEBUG if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr)) @@ -2981,9 +3007,9 @@ static void link_print(struct link *l_ptr, const char *str) != (l_ptr->out_queue_size - 1)) || (l_ptr->last_out->next != NULL)) { tipc_printf(buf, "\nSend queue inconsistency\n"); - tipc_printf(buf, "first_out= %x ", l_ptr->first_out); - tipc_printf(buf, "next_out= %x ", l_ptr->next_out); - tipc_printf(buf, "last_out= %x ", l_ptr->last_out); + tipc_printf(buf, "first_out= %p ", l_ptr->first_out); + tipc_printf(buf, "next_out= %p ", l_ptr->next_out); + tipc_printf(buf, "last_out= %p ", l_ptr->last_out); } } else tipc_printf(buf, "[]"); |