aboutsummaryrefslogtreecommitdiff
path: root/net/rds/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/connection.c')
-rw-r--r--net/rds/connection.c176
1 files changed, 126 insertions, 50 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 278f607ab60..378c3a6acf8 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -32,11 +32,12 @@
*/
#include <linux/kernel.h>
#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/export.h>
#include <net/inet_hashtables.h>
#include "rds.h"
#include "loop.h"
-#include "rdma.h"
#define RDS_CONNECTION_HASH_BITS 12
#define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
@@ -50,10 +51,16 @@ static struct kmem_cache *rds_conn_slab;
static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
{
+ static u32 rds_hash_secret __read_mostly;
+
+ unsigned long hash;
+
+ net_get_random_once(&rds_hash_secret, sizeof(rds_hash_secret));
+
/* Pass NULL, don't need struct net for hash */
- unsigned long hash = inet_ehashfn(NULL,
- be32_to_cpu(laddr), 0,
- be32_to_cpu(faddr), 0);
+ hash = __inet_ehashfn(be32_to_cpu(laddr), 0,
+ be32_to_cpu(faddr), 0,
+ rds_hash_secret);
return &rds_conn_hash[hash & RDS_CONNECTION_HASH_MASK];
}
@@ -62,26 +69,14 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
} while (0)
-static inline int rds_conn_is_sending(struct rds_connection *conn)
-{
- int ret = 0;
-
- if (!mutex_trylock(&conn->c_send_lock))
- ret = 1;
- else
- mutex_unlock(&conn->c_send_lock);
-
- return ret;
-}
-
+/* rcu read lock must be held or the connection spinlock */
static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
__be32 laddr, __be32 faddr,
struct rds_transport *trans)
{
struct rds_connection *conn, *ret = NULL;
- struct hlist_node *pos;
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, head, c_hash_node) {
if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
conn->c_trans == trans) {
ret = conn;
@@ -99,7 +94,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
* and receiving over this connection again in the future. It is up to
* the transport to have serialized this call with its send and recv.
*/
-void rds_conn_reset(struct rds_connection *conn)
+static void rds_conn_reset(struct rds_connection *conn)
{
rdsdebug("connection %pI4 to %pI4 reset\n",
&conn->c_laddr, &conn->c_faddr);
@@ -128,10 +123,11 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
{
struct rds_connection *conn, *parent = NULL;
struct hlist_head *head = rds_conn_bucket(laddr, faddr);
+ struct rds_transport *loop_trans;
unsigned long flags;
int ret;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
conn = rds_conn_lookup(head, laddr, faddr, trans);
if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
!is_outgoing) {
@@ -142,12 +138,12 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
parent = conn;
conn = parent->c_passive;
}
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
if (conn)
goto out;
conn = kmem_cache_zalloc(rds_conn_slab, gfp);
- if (conn == NULL) {
+ if (!conn) {
conn = ERR_PTR(-ENOMEM);
goto out;
}
@@ -158,7 +154,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
- mutex_init(&conn->c_send_lock);
+ init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
@@ -174,7 +170,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
* can bind to the destination address then we'd rather the messages
* flow through loopback rather than either transport.
*/
- if (rds_trans_get_preferred(faddr)) {
+ loop_trans = rds_trans_get_preferred(faddr);
+ if (loop_trans) {
+ rds_trans_put(loop_trans);
conn->c_loopback = 1;
if (is_outgoing && trans->t_prefer_loopback) {
/* "outgoing" connection - and the transport
@@ -237,7 +235,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
kmem_cache_free(rds_conn_slab, conn);
conn = found;
} else {
- hlist_add_head(&conn->c_hash_node, head);
+ hlist_add_head_rcu(&conn->c_hash_node, head);
rds_cong_add_conn(conn);
rds_conn_count++;
}
@@ -262,21 +260,91 @@ struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
}
EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
+void rds_conn_shutdown(struct rds_connection *conn)
+{
+ /* shut it down unless it's down already */
+ if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
+ /*
+ * Quiesce the connection mgmt handlers before we start tearing
+ * things down. We don't hold the mutex for the entire
+ * duration of the shutdown operation, else we may be
+ * deadlocking with the CM handler. Instead, the CM event
+ * handler is supposed to check for state DISCONNECTING
+ */
+ mutex_lock(&conn->c_cm_lock);
+ if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
+ && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
+ rds_conn_error(conn, "shutdown called in state %d\n",
+ atomic_read(&conn->c_state));
+ mutex_unlock(&conn->c_cm_lock);
+ return;
+ }
+ mutex_unlock(&conn->c_cm_lock);
+
+ wait_event(conn->c_waitq,
+ !test_bit(RDS_IN_XMIT, &conn->c_flags));
+
+ conn->c_trans->conn_shutdown(conn);
+ rds_conn_reset(conn);
+
+ if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
+ /* This can happen - eg when we're in the middle of tearing
+ * down the connection, and someone unloads the rds module.
+ * Quite reproduceable with loopback connections.
+ * Mostly harmless.
+ */
+ rds_conn_error(conn,
+ "%s: failed to transition to state DOWN, "
+ "current state is %d\n",
+ __func__,
+ atomic_read(&conn->c_state));
+ return;
+ }
+ }
+
+ /* Then reconnect if it's still live.
+ * The passive side of an IB loopback connection is never added
+ * to the conn hash, so we never trigger a reconnect on this
+ * conn - the reconnect is always triggered by the active peer. */
+ cancel_delayed_work_sync(&conn->c_conn_w);
+ rcu_read_lock();
+ if (!hlist_unhashed(&conn->c_hash_node)) {
+ rcu_read_unlock();
+ rds_queue_reconnect(conn);
+ } else {
+ rcu_read_unlock();
+ }
+}
+
+/*
+ * Stop and free a connection.
+ *
+ * This can only be used in very limited circumstances. It assumes that once
+ * the conn has been shutdown that no one else is referencing the connection.
+ * We can only ensure this in the rmmod path in the current code.
+ */
void rds_conn_destroy(struct rds_connection *conn)
{
struct rds_message *rm, *rtmp;
+ unsigned long flags;
rdsdebug("freeing conn %p for %pI4 -> "
"%pI4\n", conn, &conn->c_laddr,
&conn->c_faddr);
- hlist_del_init(&conn->c_hash_node);
+ /* Ensure conn will not be scheduled for reconnect */
+ spin_lock_irq(&rds_conn_lock);
+ hlist_del_init_rcu(&conn->c_hash_node);
+ spin_unlock_irq(&rds_conn_lock);
+ synchronize_rcu();
- /* wait for the rds thread to shut it down */
- atomic_set(&conn->c_state, RDS_CONN_ERROR);
- cancel_delayed_work(&conn->c_conn_w);
- queue_work(rds_wq, &conn->c_down_w);
- flush_workqueue(rds_wq);
+ /* shut the connection down */
+ rds_conn_drop(conn);
+ flush_work(&conn->c_down_w);
+
+ /* make sure lingering queued work won't try to ref the conn */
+ cancel_delayed_work_sync(&conn->c_send_w);
+ cancel_delayed_work_sync(&conn->c_recv_w);
/* tear down queued messages */
list_for_each_entry_safe(rm, rtmp,
@@ -301,7 +369,9 @@ void rds_conn_destroy(struct rds_connection *conn)
BUG_ON(!list_empty(&conn->c_retrans));
kmem_cache_free(rds_conn_slab, conn);
+ spin_lock_irqsave(&rds_conn_lock, flags);
rds_conn_count--;
+ spin_unlock_irqrestore(&rds_conn_lock, flags);
}
EXPORT_SYMBOL_GPL(rds_conn_destroy);
@@ -311,27 +381,26 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
int want_send)
{
struct hlist_head *head;
- struct hlist_node *pos;
struct list_head *list;
struct rds_connection *conn;
struct rds_message *rm;
- unsigned long flags;
unsigned int total = 0;
+ unsigned long flags;
size_t i;
len /= sizeof(struct rds_info_message);
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, head, c_hash_node) {
if (want_send)
list = &conn->c_send_queue;
else
list = &conn->c_retrans;
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
/* XXX too lazy to maintain counts.. */
list_for_each_entry(rm, list, m_conn_item) {
@@ -342,11 +411,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
conn->c_faddr, 0);
}
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
lens->nr = total;
lens->each = sizeof(struct rds_info_message);
@@ -375,20 +443,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
{
uint64_t buffer[(item_len + 7) / 8];
struct hlist_head *head;
- struct hlist_node *pos;
- struct hlist_node *tmp;
struct rds_connection *conn;
- unsigned long flags;
size_t i;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
lens->nr = 0;
lens->each = item_len;
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, head, c_hash_node) {
/* XXX no c_lock usage.. */
if (!visitor(conn, buffer))
@@ -404,8 +469,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
lens->nr++;
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
}
EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
@@ -422,8 +486,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
sizeof(cinfo->transport));
cinfo->flags = 0;
- rds_conn_info_set(cinfo->flags,
- rds_conn_is_sending(conn), SENDING);
+ rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+ SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
@@ -443,12 +507,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
sizeof(struct rds_info_connection));
}
-int __init rds_conn_init(void)
+int rds_conn_init(void)
{
rds_conn_slab = kmem_cache_create("rds_connection",
sizeof(struct rds_connection),
0, 0, NULL);
- if (rds_conn_slab == NULL)
+ if (!rds_conn_slab)
return -ENOMEM;
rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
@@ -486,6 +550,18 @@ void rds_conn_drop(struct rds_connection *conn)
EXPORT_SYMBOL_GPL(rds_conn_drop);
/*
+ * If the connection is down, trigger a connect. We may have scheduled a
+ * delayed reconnect however - in this case we should not interfere.
+ */
+void rds_conn_connect_if_down(struct rds_connection *conn)
+{
+ if (rds_conn_state(conn) == RDS_CONN_DOWN &&
+ !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
+ queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+}
+EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
+
+/*
* An error occurred on the connection
*/
void