aboutsummaryrefslogtreecommitdiff
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/core.c11
-rw-r--r--net/tipc/core.h23
-rw-r--r--net/tipc/eth_media.c4
-rw-r--r--net/tipc/link.c111
-rw-r--r--net/tipc/msg.c16
-rw-r--r--net/tipc/msg.h47
-rw-r--r--net/tipc/port.c43
-rw-r--r--net/tipc/ref.c211
-rw-r--r--net/tipc/ref.h89
-rw-r--r--net/tipc/socket.c1192
10 files changed, 993 insertions, 754 deletions
diff --git a/net/tipc/core.c b/net/tipc/core.c
index d2d7d32c02c..740aac5cdfb 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -48,16 +48,8 @@
#include "subscr.h"
#include "config.h"
-int tipc_eth_media_start(void);
-void tipc_eth_media_stop(void);
-int tipc_handler_start(void);
-void tipc_handler_stop(void);
-int tipc_socket_init(void);
-void tipc_socket_stop(void);
-int tipc_netlink_start(void);
-void tipc_netlink_stop(void);
-#define TIPC_MOD_VER "1.6.2"
+#define TIPC_MOD_VER "1.6.3"
#ifndef CONFIG_TIPC_ZONES
#define CONFIG_TIPC_ZONES 3
@@ -277,7 +269,6 @@ EXPORT_SYMBOL(tipc_register_media);
/* TIPC API for external APIs (see tipc_port.h) */
EXPORT_SYMBOL(tipc_createport_raw);
-EXPORT_SYMBOL(tipc_set_msg_option);
EXPORT_SYMBOL(tipc_reject_msg);
EXPORT_SYMBOL(tipc_send_buf_fast);
EXPORT_SYMBOL(tipc_acknowledge);
diff --git a/net/tipc/core.h b/net/tipc/core.h
index feabca58082..325404fd4eb 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -180,6 +180,12 @@ extern int tipc_core_start(void);
extern void tipc_core_stop(void);
extern int tipc_core_start_net(void);
extern void tipc_core_stop_net(void);
+extern int tipc_handler_start(void);
+extern void tipc_handler_stop(void);
+extern int tipc_netlink_start(void);
+extern void tipc_netlink_stop(void);
+extern int tipc_socket_init(void);
+extern void tipc_socket_stop(void);
static inline int delimit(int val, int min, int max)
{
@@ -310,7 +316,7 @@ static inline struct sk_buff *buf_acquire(u32 size)
struct sk_buff *skb;
unsigned int buf_size = (BUF_HEADROOM + size + 3) & ~3u;
- skb = alloc_skb(buf_size, GFP_ATOMIC);
+ skb = alloc_skb_fclone(buf_size, GFP_ATOMIC);
if (skb) {
skb_reserve(skb, BUF_HEADROOM);
skb_put(skb, size);
@@ -328,8 +334,19 @@ static inline struct sk_buff *buf_acquire(u32 size)
static inline void buf_discard(struct sk_buff *skb)
{
- if (likely(skb != NULL))
- kfree_skb(skb);
+ kfree_skb(skb);
+}
+
+/**
+ * buf_linearize - convert a TIPC message buffer into a single contiguous piece
+ * @skb: message buffer
+ *
+ * Returns 0 on success.
+ */
+
+static inline int buf_linearize(struct sk_buff *skb)
+{
+ return skb_linearize(skb);
}
#endif
diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c
index 3bbef2ab22a..9cd35eec3e7 100644
--- a/net/tipc/eth_media.c
+++ b/net/tipc/eth_media.c
@@ -101,7 +101,7 @@ static int recv_msg(struct sk_buff *buf, struct net_device *dev,
struct eth_bearer *eb_ptr = (struct eth_bearer *)pt->af_packet_priv;
u32 size;
- if (dev->nd_net != &init_net) {
+ if (dev_net(dev) != &init_net) {
kfree_skb(buf);
return 0;
}
@@ -198,7 +198,7 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
struct eth_bearer *eb_ptr = &eth_bearers[0];
struct eth_bearer *stop = &eth_bearers[MAX_ETH_BEARERS];
- if (dev->nd_net != &init_net)
+ if (dev_net(dev) != &init_net)
return NOTIFY_DONE;
while ((eb_ptr->dev != dev)) {
diff --git a/net/tipc/link.c b/net/tipc/link.c
index cefa99824c5..2a26a16e269 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1785,6 +1785,56 @@ static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
return buf;
}
+/**
+ * link_recv_buf_validate - validate basic format of received message
+ *
+ * This routine ensures a TIPC message has an acceptable header, and at least
+ * as much data as the header indicates it should. The routine also ensures
+ * that the entire message header is stored in the main fragment of the message
+ * buffer, to simplify future access to message header fields.
+ *
+ * Note: Having extra info present in the message header or data areas is OK.
+ * TIPC will ignore the excess, under the assumption that it is optional info
+ * introduced by a later release of the protocol.
+ */
+
+static int link_recv_buf_validate(struct sk_buff *buf)
+{
+ static u32 min_data_hdr_size[8] = {
+ SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
+ MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
+ };
+
+ struct tipc_msg *msg;
+ u32 tipc_hdr[2];
+ u32 size;
+ u32 hdr_size;
+ u32 min_hdr_size;
+
+ if (unlikely(buf->len < MIN_H_SIZE))
+ return 0;
+
+ msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
+ if (msg == NULL)
+ return 0;
+
+ if (unlikely(msg_version(msg) != TIPC_VERSION))
+ return 0;
+
+ size = msg_size(msg);
+ hdr_size = msg_hdr_sz(msg);
+ min_hdr_size = msg_isdata(msg) ?
+ min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
+
+ if (unlikely((hdr_size < min_hdr_size) ||
+ (size < hdr_size) ||
+ (buf->len < size) ||
+ (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
+ return 0;
+
+ return pskb_may_pull(buf, hdr_size);
+}
+
void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
{
read_lock_bh(&tipc_net_lock);
@@ -1794,9 +1844,9 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
struct link *l_ptr;
struct sk_buff *crs;
struct sk_buff *buf = head;
- struct tipc_msg *msg = buf_msg(buf);
- u32 seq_no = msg_seqno(msg);
- u32 ackd = msg_ack(msg);
+ struct tipc_msg *msg;
+ u32 seq_no;
+ u32 ackd;
u32 released = 0;
int type;
@@ -1804,12 +1854,21 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
TIPC_SKB_CB(buf)->handle = b_ptr;
head = head->next;
- if (unlikely(msg_version(msg) != TIPC_VERSION))
+
+ /* Ensure message is well-formed */
+
+ if (unlikely(!link_recv_buf_validate(buf)))
goto cont;
-#if 0
- if (msg_user(msg) != LINK_PROTOCOL)
-#endif
- msg_dbg(msg,"<REC<");
+
+ /* Ensure message data is a single contiguous unit */
+
+ if (unlikely(buf_linearize(buf))) {
+ goto cont;
+ }
+
+ /* Handle arrival of a non-unicast link message */
+
+ msg = buf_msg(buf);
if (unlikely(msg_non_seq(msg))) {
link_recv_non_seq(buf);
@@ -1820,19 +1879,26 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
(msg_destnode(msg) != tipc_own_addr)))
goto cont;
+ /* Locate unicast link endpoint that should handle message */
+
n_ptr = tipc_node_find(msg_prevnode(msg));
if (unlikely(!n_ptr))
goto cont;
-
tipc_node_lock(n_ptr);
+
l_ptr = n_ptr->links[b_ptr->identity];
if (unlikely(!l_ptr)) {
tipc_node_unlock(n_ptr);
goto cont;
}
- /*
- * Release acked messages
- */
+
+ /* Validate message sequence number info */
+
+ seq_no = msg_seqno(msg);
+ ackd = msg_ack(msg);
+
+ /* Release acked messages */
+
if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
@@ -1851,6 +1917,9 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
l_ptr->first_out = crs;
l_ptr->out_queue_size -= released;
}
+
+ /* Try sending any messages link endpoint has pending */
+
if (unlikely(l_ptr->next_out))
tipc_link_push_queue(l_ptr);
if (unlikely(!list_empty(&l_ptr->waiting_ports)))
@@ -1860,6 +1929,8 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
}
+ /* Now (finally!) process the incoming message */
+
protocol_check:
if (likely(link_working_working(l_ptr))) {
if (likely(seq_no == mod(l_ptr->next_in_no))) {
@@ -2832,15 +2903,15 @@ static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
{
/* Data messages from this node, inclusive FIRST_FRAGM */
- l_ptr->queue_limit[DATA_LOW] = window;
- l_ptr->queue_limit[DATA_MEDIUM] = (window / 3) * 4;
- l_ptr->queue_limit[DATA_HIGH] = (window / 3) * 5;
- l_ptr->queue_limit[DATA_CRITICAL] = (window / 3) * 6;
+ l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
+ l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
+ l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
+ l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
/* Transiting data messages,inclusive FIRST_FRAGM */
- l_ptr->queue_limit[DATA_LOW + 4] = 300;
- l_ptr->queue_limit[DATA_MEDIUM + 4] = 600;
- l_ptr->queue_limit[DATA_HIGH + 4] = 900;
- l_ptr->queue_limit[DATA_CRITICAL + 4] = 1200;
+ l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
+ l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
+ l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
+ l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
l_ptr->queue_limit[CONN_MANAGER] = 1200;
l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 782485468fb..696a8633df7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -73,10 +73,10 @@ void tipc_msg_print(struct print_buf *buf, struct tipc_msg *msg, const char *str
tipc_printf(buf, "NO(%u/%u):",msg_long_msgno(msg),
msg_fragm_no(msg));
break;
- case DATA_LOW:
- case DATA_MEDIUM:
- case DATA_HIGH:
- case DATA_CRITICAL:
+ case TIPC_LOW_IMPORTANCE:
+ case TIPC_MEDIUM_IMPORTANCE:
+ case TIPC_HIGH_IMPORTANCE:
+ case TIPC_CRITICAL_IMPORTANCE:
tipc_printf(buf, "DAT%u:", msg_user(msg));
if (msg_short(msg)) {
tipc_printf(buf, "CON:");
@@ -229,10 +229,10 @@ void tipc_msg_print(struct print_buf *buf, struct tipc_msg *msg, const char *str
switch (usr) {
case CONN_MANAGER:
case NAME_DISTRIBUTOR:
- case DATA_LOW:
- case DATA_MEDIUM:
- case DATA_HIGH:
- case DATA_CRITICAL:
+ case TIPC_LOW_IMPORTANCE:
+ case TIPC_MEDIUM_IMPORTANCE:
+ case TIPC_HIGH_IMPORTANCE:
+ case TIPC_CRITICAL_IMPORTANCE:
if (msg_short(msg))
break; /* No error */
switch (msg_errcode(msg)) {
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index e9ef6df2656..ad487e8abcc 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -40,18 +40,16 @@
#include "core.h"
#define TIPC_VERSION 2
-#define DATA_LOW TIPC_LOW_IMPORTANCE
-#define DATA_MEDIUM TIPC_MEDIUM_IMPORTANCE
-#define DATA_HIGH TIPC_HIGH_IMPORTANCE
-#define DATA_CRITICAL TIPC_CRITICAL_IMPORTANCE
-#define SHORT_H_SIZE 24 /* Connected,in cluster */
+
+#define SHORT_H_SIZE 24 /* Connected, in-cluster messages */
#define DIR_MSG_H_SIZE 32 /* Directly addressed messages */
-#define CONN_MSG_H_SIZE 36 /* Routed connected msgs*/
-#define LONG_H_SIZE 40 /* Named Messages */
+#define LONG_H_SIZE 40 /* Named messages */
#define MCAST_H_SIZE 44 /* Multicast messages */
-#define MAX_H_SIZE 60 /* Inclusive full options */
+#define INT_H_SIZE 40 /* Internal messages */
+#define MIN_H_SIZE 24 /* Smallest legal TIPC header size */
+#define MAX_H_SIZE 60 /* Largest possible TIPC header size */
+
#define MAX_MSG_SIZE (MAX_H_SIZE + TIPC_MAX_USER_MSG_SIZE)
-#define LINK_CONFIG 13
/*
@@ -72,7 +70,8 @@ static inline void msg_set_bits(struct tipc_msg *m, u32 w,
u32 pos, u32 mask, u32 val)
{
val = (val & mask) << pos;
- m->hdr[w] &= ~htonl(mask << pos);
+ mask = mask << pos;
+ m->hdr[w] &= ~htonl(mask);
m->hdr[w] |= htonl(val);
}
@@ -87,7 +86,7 @@ static inline u32 msg_version(struct tipc_msg *m)
static inline void msg_set_version(struct tipc_msg *m)
{
- msg_set_bits(m, 0, 29, 0xf, TIPC_VERSION);
+ msg_set_bits(m, 0, 29, 7, TIPC_VERSION);
}
static inline u32 msg_user(struct tipc_msg *m)
@@ -97,7 +96,7 @@ static inline u32 msg_user(struct tipc_msg *m)
static inline u32 msg_isdata(struct tipc_msg *m)
{
- return (msg_user(m) <= DATA_CRITICAL);
+ return (msg_user(m) <= TIPC_CRITICAL_IMPORTANCE);
}
static inline void msg_set_user(struct tipc_msg *m, u32 n)
@@ -190,18 +189,6 @@ static inline void msg_set_lookup_scope(struct tipc_msg *m, u32 n)
msg_set_bits(m, 1, 19, 0x3, n);
}
-static inline void msg_set_options(struct tipc_msg *m, const char *opt, u32 sz)
-{
- u32 hsz = msg_hdr_sz(m);
- char *to = (char *)&m->hdr[hsz/4];
-
- if ((hsz < DIR_MSG_H_SIZE) || ((hsz + sz) > MAX_H_SIZE))
- return;
- msg_set_bits(m, 1, 16, 0x7, (hsz - 28)/4);
- msg_set_hdr_sz(m, hsz + sz);
- memcpy(to, opt, sz);
-}
-
static inline u32 msg_bcast_ack(struct tipc_msg *m)
{
return msg_bits(m, 1, 0, 0xffff);
@@ -330,17 +317,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
return (struct tipc_msg *)msg_data(m);
}
-static inline void msg_expand(struct tipc_msg *m, u32 destnode)
-{
- if (!msg_short(m))
- return;
- msg_set_hdr_sz(m, LONG_H_SIZE);
- msg_set_orignode(m, msg_prevnode(m));
- msg_set_destnode(m, destnode);
- memset(&m->hdr[8], 0, 12);
-}
-
-
/*
TIPC internal message header format, version 2
@@ -388,7 +364,6 @@ static inline void msg_expand(struct tipc_msg *m, u32 destnode)
#define NAME_DISTRIBUTOR 11
#define MSG_FRAGMENTER 12
#define LINK_CONFIG 13
-#define INT_H_SIZE 40
#define DSC_H_SIZE 40
/*
diff --git a/net/tipc/port.c b/net/tipc/port.c
index f508614ca59..2f5806410c6 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -242,7 +242,8 @@ u32 tipc_createport_raw(void *usr_handle,
p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;
p_ptr->publ.ref = ref;
msg = &p_ptr->publ.phdr;
- msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0);
+ msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE,
+ 0);
msg_set_orignode(msg, tipc_own_addr);
msg_set_prevnode(msg, tipc_own_addr);
msg_set_origport(msg, ref);
@@ -413,13 +414,6 @@ static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
return buf;
}
-int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz)
-{
- msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr));
- msg_set_options(&tp_ptr->phdr, opt, sz);
- return TIPC_OK;
-}
-
int tipc_reject_msg(struct sk_buff *buf, u32 err)
{
struct tipc_msg *msg = buf_msg(buf);
@@ -632,7 +626,7 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
msg_orignode(msg),
msg_destport(msg),
tipc_own_addr,
- DATA_HIGH,
+ TIPC_HIGH_IMPORTANCE,
TIPC_CONN_MSG,
err,
0,
@@ -1246,6 +1240,28 @@ exit:
return res;
}
+/**
+ * tipc_disconnect_port - disconnect port from peer
+ *
+ * Port must be locked.
+ */
+
+int tipc_disconnect_port(struct tipc_port *tp_ptr)
+{
+ int res;
+
+ if (tp_ptr->connected) {
+ tp_ptr->connected = 0;
+ /* let timer expire on it's own to avoid deadlock! */
+ tipc_nodesub_unsubscribe(
+ &((struct port *)tp_ptr)->subscription);
+ res = TIPC_OK;
+ } else {
+ res = -ENOTCONN;
+ }
+ return res;
+}
+
/*
* tipc_disconnect(): Disconnect port form peer.
* This is a node local operation.
@@ -1254,17 +1270,12 @@ exit:
int tipc_disconnect(u32 ref)
{
struct port *p_ptr;
- int res = -ENOTCONN;
+ int res;
p_ptr = tipc_port_lock(ref);
if (!p_ptr)
return -EINVAL;
- if (p_ptr->publ.connected) {
- p_ptr->publ.connected = 0;
- /* let timer expire on it's own to avoid deadlock! */
- tipc_nodesub_unsubscribe(&p_ptr->subscription);
- res = TIPC_OK;
- }
+ res = tipc_disconnect_port((struct tipc_port *)p_ptr);
tipc_port_unlock(p_ptr);
return res;
}
diff --git a/net/tipc/ref.c b/net/tipc/ref.c
index c38744c96ed..89cbab24d08 100644
--- a/net/tipc/ref.c
+++ b/net/tipc/ref.c
@@ -2,7 +2,7 @@
* net/tipc/ref.c: TIPC object registry code
*
* Copyright (c) 1991-2006, Ericsson AB
- * Copyright (c) 2004-2005, Wind River Systems
+ * Copyright (c) 2004-2007, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,32 +36,60 @@
#include "core.h"
#include "ref.h"
-#include "port.h"
-#include "subscr.h"
-#include "name_distr.h"
-#include "name_table.h"
-#include "config.h"
-#include "discover.h"
-#include "bearer.h"
-#include "node.h"
-#include "bcast.h"
+
+/**
+ * struct reference - TIPC object reference entry
+ * @object: pointer to object associated with reference entry
+ * @lock: spinlock controlling access to object
+ * @ref: reference value for object (combines instance & array index info)
+ */
+
+struct reference {
+ void *object;
+ spinlock_t lock;
+ u32 ref;
+};
+
+/**
+ * struct tipc_ref_table - table of TIPC object reference entries
+ * @entries: pointer to array of reference entries
+ * @capacity: array index of first unusable entry
+ * @init_point: array index of first uninitialized entry
+ * @first_free: array index of first unused object reference entry
+ * @last_free: array index of last unused object reference entry
+ * @index_mask: bitmask for array index portion of reference values
+ * @start_mask: initial value for instance value portion of reference values
+ */
+
+struct ref_table {
+ struct reference *entries;
+ u32 capacity;
+ u32 init_point;
+ u32 first_free;
+ u32 last_free;
+ u32 index_mask;
+ u32 start_mask;
+};
/*
* Object reference table consists of 2**N entries.
*
- * A used entry has object ptr != 0, reference == XXXX|own index
- * (XXXX changes each time entry is acquired)
- * A free entry has object ptr == 0, reference == YYYY|next free index
- * (YYYY is one more than last used XXXX)
+ * State Object ptr Reference
+ * ----- ---------- ---------
+ * In use non-NULL XXXX|own index
+ * (XXXX changes each time entry is acquired)
+ * Free NULL YYYY|next free index
+ * (YYYY is one more than last used XXXX)
+ * Uninitialized NULL 0
*
- * Free list is initially chained from entry (2**N)-1 to entry 1.
- * Entry 0 is not used to allow index 0 to indicate the end of the free list.
+ * Entry 0 is not used; this allows index 0 to denote the end of the free list.
*
- * Note: Any accidental reference of the form XXXX|0--0 won't match entry 0
- * because entry 0's reference field has the form XXXX|1--1.
+ * Note that a reference value of 0 does not necessarily indicate that an
+ * entry is uninitialized, since the last entry in the free list could also
+ * have a reference value of 0 (although this is unlikely).
*/
-struct ref_table tipc_ref_table = { NULL };
+static struct ref_table tipc_ref_table = { NULL };
static DEFINE_RWLOCK(ref_table_lock);
@@ -72,29 +100,29 @@ static DEFINE_RWLOCK(ref_table_lock);
int tipc_ref_table_init(u32 requested_size, u32 start)
{
struct reference *table;
- u32 sz = 1 << 4;
- u32 index_mask;
- int i;
+ u32 actual_size;
- while (sz < requested_size) {
- sz <<= 1;
- }
- table = vmalloc(sz * sizeof(*table));
+ /* account for unused entry, then round up size to a power of 2 */
+
+ requested_size++;
+ for (actual_size = 16; actual_size < requested_size; actual_size <<= 1)
+ /* do nothing */ ;
+
+ /* allocate table & mark all entries as uninitialized */
+
+ table = __vmalloc(actual_size * sizeof(struct reference),
+ GFP_KERNEL | __GFP_HIGHMEM | __GFP_ZERO, PAGE_KERNEL);
if (table == NULL)
return -ENOMEM;
- write_lock_bh(&ref_table_lock);
- index_mask = sz - 1;
- for (i = sz - 1; i >= 0; i--) {
- table[i].object = NULL;
- spin_lock_init(&table[i].lock);
- table[i].data.next_plus_upper = (start & ~index_mask) + i - 1;
- }
tipc_ref_table.entries = table;
- tipc_ref_table.index_mask = index_mask;
- tipc_ref_table.first_free = sz - 1;
- tipc_ref_table.last_free = 1;
- write_unlock_bh(&ref_table_lock);
+ tipc_ref_table.capacity = requested_size;
+ tipc_ref_table.init_point = 1;
+ tipc_ref_table.first_free = 0;
+ tipc_ref_table.last_free = 0;
+ tipc_ref_table.index_mask = actual_size - 1;
+ tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
+
return TIPC_OK;
}
@@ -125,7 +153,7 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)
u32 index;
u32 index_mask;
u32 next_plus_upper;
- u32 reference = 0;
+ u32 ref;
if (!object) {
err("Attempt to acquire reference to non-existent object\n");
@@ -136,6 +164,8 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)
return 0;
}
+ /* take a free entry, if available; otherwise initialize a new entry */
+
write_lock_bh(&ref_table_lock);
if (tipc_ref_table.first_free) {
index = tipc_ref_table.first_free;
@@ -143,17 +173,29 @@ u32 tipc_ref_acquire(void *object, spinlock_t **lock)
index_mask = tipc_ref_table.index_mask;
/* take lock in case a previous user of entry still holds it */
spin_lock_bh(&entry->lock);
- next_plus_upper = entry->data.next_plus_upper;
+ next_plus_upper = entry->ref;
tipc_ref_table.first_free = next_plus_upper & index_mask;
- reference = (next_plus_upper & ~index_mask) + index;
- entry->data.reference = reference;
+ ref = (next_plus_upper & ~index_mask) + index;
+ entry->ref = ref;
entry->object = object;
- if (lock != NULL)
- *lock = &entry->lock;
spin_unlock_bh(&entry->lock);
+ *lock = &entry->lock;
+ }
+ else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
+ index = tipc_ref_table.init_point++;
+ entry = &(tipc_ref_table.entries[index]);
+ spin_lock_init(&entry->lock);
+ ref = tipc_ref_table.start_mask + index;
+ entry->ref = ref;
+ entry->object = object;
+ *lock = &entry->lock;
+ }
+ else {
+ ref = 0;
}
write_unlock_bh(&ref_table_lock);
- return reference;
+
+ return ref;
}
/**
@@ -169,42 +211,99 @@ void tipc_ref_discard(u32 ref)
u32 index;
u32 index_mask;
- if (!ref) {
- err("Attempt to discard reference 0\n");
- return;
- }
if (!tipc_ref_table.entries) {
err("Reference table not found during discard attempt\n");
return;
}
- write_lock_bh(&ref_table_lock);
index_mask = tipc_ref_table.index_mask;
index = ref & index_mask;
entry = &(tipc_ref_table.entries[index]);
+ write_lock_bh(&ref_table_lock);
+
if (!entry->object) {
err("Attempt to discard reference to non-existent object\n");
goto exit;
}
- if (entry->data.reference != ref) {
+ if (entry->ref != ref) {
err("Attempt to discard non-existent reference\n");
goto exit;
}
- /* mark entry as unused */
+ /*
+ * mark entry as unused; increment instance part of entry's reference
+ * to invalidate any subsequent references
+ */
+
entry->object = NULL;
+ entry->ref = (ref & ~index_mask) + (index_mask + 1);
+
+ /* append entry to free entry list */
+
if (tipc_ref_table.first_free == 0)
tipc_ref_table.first_free = index;
else
- /* next_plus_upper is always XXXX|0--0 for last free entry */
- tipc_ref_table.entries[tipc_ref_table.last_free].data.next_plus_upper
- |= index;
+ tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
tipc_ref_table.last_free = index;
- /* increment upper bits of entry to invalidate subsequent references */
- entry->data.next_plus_upper = (ref & ~index_mask) + (index_mask + 1);
exit:
write_unlock_bh(&ref_table_lock);
}
+/**
+ * tipc_ref_lock - lock referenced object and return pointer to it
+ */
+
+void *tipc_ref_lock(u32 ref)
+{
+ if (likely(tipc_ref_table.entries)) {
+ struct reference *entry;
+
+ entry = &tipc_ref_table.entries[ref &
+ tipc_ref_table.index_mask];
+ if (likely(entry->ref != 0)) {
+ spin_lock_bh(&entry->lock);
+ if (likely((entry->ref == ref) && (entry->object)))
+ return entry->object;
+ spin_unlock_bh(&entry->lock);
+ }
+ }
+ return NULL;
+}
+
+/**
+ * tipc_ref_unlock - unlock referenced object
+ */
+
+void tipc_ref_unlock(u32 ref)
+{
+ if (likely(tipc_ref_table.entries)) {
+ struct reference *entry;
+
+ entry = &tipc_ref_table.entries[ref &
+ tipc_ref_table.index_mask];
+ if (likely((entry->ref == ref) && (entry->object)))
+ spin_unlock_bh(&entry->lock);
+ else
+ err("Attempt to unlock non-existent reference\n");
+ }
+}
+
+/**
+ * tipc_ref_deref - return pointer referenced object (without locking it)
+ */
+
+void *tipc_ref_deref(u32 ref)
+{
+ if (likely(tipc_ref_table.entries)) {
+ struct reference *entry;
+
+ entry = &tipc_ref_table.entries[ref &
+ tipc_ref_table.index_mask];
+ if (likely(entry->ref == ref))
+ return entry->object;
+ }
+ return NULL;
+}
+
diff --git a/net/tipc/ref.h b/net/tipc/ref.h
index 38f3a7f4a78..7e3798ea93b 100644
--- a/net/tipc/ref.h
+++ b/net/tipc/ref.h
@@ -2,7 +2,7 @@
* net/tipc/ref.h: Include file for TIPC object registry code
*
* Copyright (c) 1991-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005-2006, Wind River Systems
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,95 +37,14 @@
#ifndef _TIPC_REF_H
#define _TIPC_REF_H
-/**
- * struct reference - TIPC object reference entry
- * @object: pointer to object associated with reference entry
- * @lock: spinlock controlling access to object
- * @data: reference value associated with object (or link to next unused entry)
- */
-
-struct reference {
- void *object;
- spinlock_t lock;
- union {
- u32 next_plus_upper;
- u32 reference;
- } data;
-};
-
-/**
- * struct tipc_ref_table - table of TIPC object reference entries
- * @entries: pointer to array of reference entries
- * @index_mask: bitmask for array index portion of reference values
- * @first_free: array index of first unused object reference entry
- * @last_free: array index of last unused object reference entry
- */
-
-struct ref_table {
- struct reference *entries;
- u32 index_mask;
- u32 first_free;
- u32 last_free;
-};
-
-extern struct ref_table tipc_ref_table;
-
int tipc_ref_table_init(u32 requested_size, u32 start);
void tipc_ref_table_stop(void);
u32 tipc_ref_acquire(void *object, spinlock_t **lock);
void tipc_ref_discard(u32 ref);
-
-/**
- * tipc_ref_lock - lock referenced object and return pointer to it
- */
-
-static inline void *tipc_ref_lock(u32 ref)
-{
- if (likely(tipc_ref_table.entries)) {
- struct reference *r =
- &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
-
- spin_lock_bh(&r->lock);
- if (likely(r->data.reference == ref))
- return r->object;
- spin_unlock_bh(&r->lock);
- }
- return NULL;
-}
-
-/**
- * tipc_ref_unlock - unlock referenced object
- */
-
-static inline void tipc_ref_unlock(u32 ref)
-{
- if (likely(tipc_ref_table.entries)) {
- struct reference *r =
- &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
-
- if (likely(r->data.reference == ref))
- spin_unlock_bh(&r->lock);
- else
- err("tipc_ref_unlock() invoked using obsolete reference\n");
- }
-}
-
-/**
- * tipc_ref_deref - return pointer referenced object (without locking it)
- */
-
-static inline void *tipc_ref_deref(u32 ref)
-{
- if (likely(tipc_ref_table.entries)) {
- struct reference *r =
- &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
-
- if (likely(r->data.reference == ref))
- return r->object;
- }
- return NULL;
-}
+void *tipc_ref_lock(u32 ref);
+void tipc_ref_unlock(u32 ref);
+void *tipc_ref_deref(u32 ref);
#endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 22909036b9b..230f9ca2ad6 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -43,7 +43,6 @@
#include <linux/slab.h>
#include <linux/poll.h>
#include <linux/fcntl.h>
-#include <asm/semaphore.h>
#include <asm/string.h>
#include <asm/atomic.h>
#include <net/sock.h>
@@ -58,16 +57,18 @@
#define SS_LISTENING -1 /* socket is listening */
#define SS_READY -2 /* socket is connectionless */
-#define OVERLOAD_LIMIT_BASE 5000
+#define OVERLOAD_LIMIT_BASE 5000
+#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
struct tipc_sock {
struct sock sk;
struct tipc_port *p;
- struct semaphore sem;
};
-#define tipc_sk(sk) ((struct tipc_sock*)sk)
+#define tipc_sk(sk) ((struct tipc_sock *)(sk))
+#define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
+static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
static void wakeupdispatch(struct tipc_port *tport);
@@ -81,93 +82,115 @@ static int sockets_enabled = 0;
static atomic_t tipc_queue_size = ATOMIC_INIT(0);
-
/*
- * sock_lock(): Lock a port/socket pair. lock_sock() can
- * not be used here, since the same lock must protect ports
- * with non-socket interfaces.
- * See net.c for description of locking policy.
+ * Revised TIPC socket locking policy:
+ *
+ * Most socket operations take the standard socket lock when they start
+ * and hold it until they finish (or until they need to sleep). Acquiring
+ * this lock grants the owner exclusive access to the fields of the socket
+ * data structures, with the exception of the backlog queue. A few socket
+ * operations can be done without taking the socket lock because they only
+ * read socket information that never changes during the life of the socket.
+ *
+ * Socket operations may acquire the lock for the associated TIPC port if they
+ * need to perform an operation on the port. If any routine needs to acquire
+ * both the socket lock and the port lock it must take the socket lock first
+ * to avoid the risk of deadlock.
+ *
+ * The dispatcher handling incoming messages cannot grab the socket lock in
+ * the standard fashion, since invoked it runs at the BH level and cannot block.
+ * Instead, it checks to see if the socket lock is currently owned by someone,
+ * and either handles the message itself or adds it to the socket's backlog
+ * queue; in the latter case the queued message is processed once the process
+ * owning the socket lock releases it.
+ *
+ * NOTE: Releasing the socket lock while an operation is sleeping overcomes
+ * the problem of a blocked socket operation preventing any other operations
+ * from occurring. However, applications must be careful if they have
+ * multiple threads trying to send (or receive) on the same socket, as these
+ * operations might interfere with each other. For example, doing a connect
+ * and a receive at the same time might allow the receive to consume the
+ * ACK message meant for the connect. While additional work could be done
+ * to try and overcome this, it doesn't seem to be worthwhile at the present.
+ *
+ * NOTE: Releasing the socket lock while an operation is sleeping also ensures
+ * that another operation that must be performed in a non-blocking manner is
+ * not delayed for very long because the lock has already been taken.
+ *
+ * NOTE: This code assumes that certain fields of a port/socket pair are
+ * constant over its lifetime; such fields can be examined without taking
+ * the socket lock and/or port lock, and do not need to be re-read even
+ * after resuming processing after waiting. These fields include:
+ * - socket type
+ * - pointer to socket sk structure (aka tipc_sock structure)
+ * - pointer to port structure
+ * - port reference
*/
-static void sock_lock(struct tipc_sock* tsock)
-{
- spin_lock_bh(tsock->p->lock);
-}
-/*
- * sock_unlock(): Unlock a port/socket pair
+/**
+ * advance_rx_queue - discard first buffer in socket receive queue
+ *
+ * Caller must hold socket lock
*/
-static void sock_unlock(struct tipc_sock* tsock)
+
+static void advance_rx_queue(struct sock *sk)
{
- spin_unlock_bh(tsock->p->lock);
+ buf_discard(__skb_dequeue(&sk->sk_receive_queue));
+ atomic_dec(&tipc_queue_size);
}
/**
- * pollmask - determine the current set of poll() events for a socket
- * @sock: socket structure
- *
- * TIPC sets the returned events as follows:
- * a) POLLRDNORM and POLLIN are set if the socket's receive queue is non-empty
- * or if a connection-oriented socket is does not have an active connection
- * (i.e. a read operation will not block).
- * b) POLLOUT is set except when a socket's connection has been terminated
- * (i.e. a write operation will not block).
- * c) POLLHUP is set when a socket's connection has been terminated.
- *
- * IMPORTANT: The fact that a read or write operation will not block does NOT
- * imply that the operation will succeed!
+ * discard_rx_queue - discard all buffers in socket receive queue
*