aboutsummaryrefslogtreecommitdiff
path: root/net/rds/connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/connection.c')
-rw-r--r--net/rds/connection.c238
1 files changed, 164 insertions, 74 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 273f064930a..378c3a6acf8 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -32,11 +32,12 @@
*/
#include <linux/kernel.h>
#include <linux/list.h>
+#include <linux/slab.h>
+#include <linux/export.h>
#include <net/inet_hashtables.h>
#include "rds.h"
#include "loop.h"
-#include "rdma.h"
#define RDS_CONNECTION_HASH_BITS 12
#define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
@@ -50,10 +51,16 @@ static struct kmem_cache *rds_conn_slab;
static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
{
+ static u32 rds_hash_secret __read_mostly;
+
+ unsigned long hash;
+
+ net_get_random_once(&rds_hash_secret, sizeof(rds_hash_secret));
+
/* Pass NULL, don't need struct net for hash */
- unsigned long hash = inet_ehashfn(NULL,
- be32_to_cpu(laddr), 0,
- be32_to_cpu(faddr), 0);
+ hash = __inet_ehashfn(be32_to_cpu(laddr), 0,
+ be32_to_cpu(faddr), 0,
+ rds_hash_secret);
return &rds_conn_hash[hash & RDS_CONNECTION_HASH_MASK];
}
@@ -62,26 +69,14 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
var |= RDS_INFO_CONNECTION_FLAG_##suffix; \
} while (0)
-static inline int rds_conn_is_sending(struct rds_connection *conn)
-{
- int ret = 0;
-
- if (!mutex_trylock(&conn->c_send_lock))
- ret = 1;
- else
- mutex_unlock(&conn->c_send_lock);
-
- return ret;
-}
-
+/* rcu read lock must be held or the connection spinlock */
static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
__be32 laddr, __be32 faddr,
struct rds_transport *trans)
{
struct rds_connection *conn, *ret = NULL;
- struct hlist_node *pos;
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, head, c_hash_node) {
if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
conn->c_trans == trans) {
ret = conn;
@@ -99,7 +94,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
* and receiving over this connection again in the future. It is up to
* the transport to have serialized this call with its send and recv.
*/
-void rds_conn_reset(struct rds_connection *conn)
+static void rds_conn_reset(struct rds_connection *conn)
{
rdsdebug("connection %pI4 to %pI4 reset\n",
&conn->c_laddr, &conn->c_faddr);
@@ -126,17 +121,16 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
struct rds_transport *trans, gfp_t gfp,
int is_outgoing)
{
- struct rds_connection *conn, *tmp, *parent = NULL;
+ struct rds_connection *conn, *parent = NULL;
struct hlist_head *head = rds_conn_bucket(laddr, faddr);
+ struct rds_transport *loop_trans;
unsigned long flags;
int ret;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
conn = rds_conn_lookup(head, laddr, faddr, trans);
- if (conn
- && conn->c_loopback
- && conn->c_trans != &rds_loop_transport
- && !is_outgoing) {
+ if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
+ !is_outgoing) {
/* This is a looped back IB connection, and we're
* called by the code handling the incoming connect.
* We need a second connection object into which we
@@ -144,26 +138,23 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
parent = conn;
conn = parent->c_passive;
}
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
if (conn)
goto out;
- conn = kmem_cache_alloc(rds_conn_slab, gfp);
- if (conn == NULL) {
+ conn = kmem_cache_zalloc(rds_conn_slab, gfp);
+ if (!conn) {
conn = ERR_PTR(-ENOMEM);
goto out;
}
- memset(conn, 0, sizeof(*conn));
-
INIT_HLIST_NODE(&conn->c_hash_node);
- conn->c_version = RDS_PROTOCOL_3_0;
conn->c_laddr = laddr;
conn->c_faddr = faddr;
spin_lock_init(&conn->c_lock);
conn->c_next_tx_seq = 1;
- mutex_init(&conn->c_send_lock);
+ init_waitqueue_head(&conn->c_waitq);
INIT_LIST_HEAD(&conn->c_send_queue);
INIT_LIST_HEAD(&conn->c_retrans);
@@ -179,7 +170,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
* can bind to the destination address then we'd rather the messages
* flow through loopback rather than either transport.
*/
- if (rds_trans_get_preferred(faddr)) {
+ loop_trans = rds_trans_get_preferred(faddr);
+ if (loop_trans) {
+ rds_trans_put(loop_trans);
conn->c_loopback = 1;
if (is_outgoing && trans->t_prefer_loopback) {
/* "outgoing" connection - and the transport
@@ -213,26 +206,40 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
trans->t_name ? trans->t_name : "[unknown]",
is_outgoing ? "(outgoing)" : "");
+ /*
+ * Since we ran without holding the conn lock, someone could
+ * have created the same conn (either normal or passive) in the
+ * interim. We check while holding the lock. If we won, we complete
+ * init and return our conn. If we lost, we rollback and return the
+ * other one.
+ */
spin_lock_irqsave(&rds_conn_lock, flags);
- if (parent == NULL) {
- tmp = rds_conn_lookup(head, laddr, faddr, trans);
- if (tmp == NULL)
- hlist_add_head(&conn->c_hash_node, head);
- } else {
- tmp = parent->c_passive;
- if (!tmp)
+ if (parent) {
+ /* Creating passive conn */
+ if (parent->c_passive) {
+ trans->conn_free(conn->c_transport_data);
+ kmem_cache_free(rds_conn_slab, conn);
+ conn = parent->c_passive;
+ } else {
parent->c_passive = conn;
- }
-
- if (tmp) {
- trans->conn_free(conn->c_transport_data);
- kmem_cache_free(rds_conn_slab, conn);
- conn = tmp;
+ rds_cong_add_conn(conn);
+ rds_conn_count++;
+ }
} else {
- rds_cong_add_conn(conn);
- rds_conn_count++;
+ /* Creating normal conn */
+ struct rds_connection *found;
+
+ found = rds_conn_lookup(head, laddr, faddr, trans);
+ if (found) {
+ trans->conn_free(conn->c_transport_data);
+ kmem_cache_free(rds_conn_slab, conn);
+ conn = found;
+ } else {
+ hlist_add_head_rcu(&conn->c_hash_node, head);
+ rds_cong_add_conn(conn);
+ rds_conn_count++;
+ }
}
-
spin_unlock_irqrestore(&rds_conn_lock, flags);
out:
@@ -244,28 +251,100 @@ struct rds_connection *rds_conn_create(__be32 laddr, __be32 faddr,
{
return __rds_conn_create(laddr, faddr, trans, gfp, 0);
}
+EXPORT_SYMBOL_GPL(rds_conn_create);
struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr,
struct rds_transport *trans, gfp_t gfp)
{
return __rds_conn_create(laddr, faddr, trans, gfp, 1);
}
+EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
+
+void rds_conn_shutdown(struct rds_connection *conn)
+{
+ /* shut it down unless it's down already */
+ if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
+ /*
+ * Quiesce the connection mgmt handlers before we start tearing
+ * things down. We don't hold the mutex for the entire
+ * duration of the shutdown operation, else we may be
+ * deadlocking with the CM handler. Instead, the CM event
+ * handler is supposed to check for state DISCONNECTING
+ */
+ mutex_lock(&conn->c_cm_lock);
+ if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING)
+ && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) {
+ rds_conn_error(conn, "shutdown called in state %d\n",
+ atomic_read(&conn->c_state));
+ mutex_unlock(&conn->c_cm_lock);
+ return;
+ }
+ mutex_unlock(&conn->c_cm_lock);
+
+ wait_event(conn->c_waitq,
+ !test_bit(RDS_IN_XMIT, &conn->c_flags));
+
+ conn->c_trans->conn_shutdown(conn);
+ rds_conn_reset(conn);
+
+ if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
+ /* This can happen - eg when we're in the middle of tearing
+ * down the connection, and someone unloads the rds module.
+ * Quite reproduceable with loopback connections.
+ * Mostly harmless.
+ */
+ rds_conn_error(conn,
+ "%s: failed to transition to state DOWN, "
+ "current state is %d\n",
+ __func__,
+ atomic_read(&conn->c_state));
+ return;
+ }
+ }
+ /* Then reconnect if it's still live.
+ * The passive side of an IB loopback connection is never added
+ * to the conn hash, so we never trigger a reconnect on this
+ * conn - the reconnect is always triggered by the active peer. */
+ cancel_delayed_work_sync(&conn->c_conn_w);
+ rcu_read_lock();
+ if (!hlist_unhashed(&conn->c_hash_node)) {
+ rcu_read_unlock();
+ rds_queue_reconnect(conn);
+ } else {
+ rcu_read_unlock();
+ }
+}
+
+/*
+ * Stop and free a connection.
+ *
+ * This can only be used in very limited circumstances. It assumes that once
+ * the conn has been shutdown that no one else is referencing the connection.
+ * We can only ensure this in the rmmod path in the current code.
+ */
void rds_conn_destroy(struct rds_connection *conn)
{
struct rds_message *rm, *rtmp;
+ unsigned long flags;
rdsdebug("freeing conn %p for %pI4 -> "
"%pI4\n", conn, &conn->c_laddr,
&conn->c_faddr);
- hlist_del_init(&conn->c_hash_node);
+ /* Ensure conn will not be scheduled for reconnect */
+ spin_lock_irq(&rds_conn_lock);
+ hlist_del_init_rcu(&conn->c_hash_node);
+ spin_unlock_irq(&rds_conn_lock);
+ synchronize_rcu();
- /* wait for the rds thread to shut it down */
- atomic_set(&conn->c_state, RDS_CONN_ERROR);
- cancel_delayed_work(&conn->c_conn_w);
- queue_work(rds_wq, &conn->c_down_w);
- flush_workqueue(rds_wq);
+ /* shut the connection down */
+ rds_conn_drop(conn);
+ flush_work(&conn->c_down_w);
+
+ /* make sure lingering queued work won't try to ref the conn */
+ cancel_delayed_work_sync(&conn->c_send_w);
+ cancel_delayed_work_sync(&conn->c_recv_w);
/* tear down queued messages */
list_for_each_entry_safe(rm, rtmp,
@@ -290,8 +369,11 @@ void rds_conn_destroy(struct rds_connection *conn)
BUG_ON(!list_empty(&conn->c_retrans));
kmem_cache_free(rds_conn_slab, conn);
+ spin_lock_irqsave(&rds_conn_lock, flags);
rds_conn_count--;
+ spin_unlock_irqrestore(&rds_conn_lock, flags);
}
+EXPORT_SYMBOL_GPL(rds_conn_destroy);
static void rds_conn_message_info(struct socket *sock, unsigned int len,
struct rds_info_iterator *iter,
@@ -299,27 +381,26 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
int want_send)
{
struct hlist_head *head;
- struct hlist_node *pos;
struct list_head *list;
struct rds_connection *conn;
struct rds_message *rm;
- unsigned long flags;
unsigned int total = 0;
+ unsigned long flags;
size_t i;
len /= sizeof(struct rds_info_message);
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry(conn, pos, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, head, c_hash_node) {
if (want_send)
list = &conn->c_send_queue;
else
list = &conn->c_retrans;
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
/* XXX too lazy to maintain counts.. */
list_for_each_entry(rm, list, m_conn_item) {
@@ -330,11 +411,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
conn->c_faddr, 0);
}
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
lens->nr = total;
lens->each = sizeof(struct rds_info_message);
@@ -363,20 +443,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
{
uint64_t buffer[(item_len + 7) / 8];
struct hlist_head *head;
- struct hlist_node *pos;
- struct hlist_node *tmp;
struct rds_connection *conn;
- unsigned long flags;
size_t i;
- spin_lock_irqsave(&rds_conn_lock, flags);
+ rcu_read_lock();
lens->nr = 0;
lens->each = item_len;
for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
i++, head++) {
- hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
+ hlist_for_each_entry_rcu(conn, head, c_hash_node) {
/* XXX no c_lock usage.. */
if (!visitor(conn, buffer))
@@ -392,9 +469,9 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
lens->nr++;
}
}
-
- spin_unlock_irqrestore(&rds_conn_lock, flags);
+ rcu_read_unlock();
}
+EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
static int rds_conn_info_visitor(struct rds_connection *conn,
void *buffer)
@@ -409,8 +486,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
sizeof(cinfo->transport));
cinfo->flags = 0;
- rds_conn_info_set(cinfo->flags,
- rds_conn_is_sending(conn), SENDING);
+ rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+ SENDING);
/* XXX Future: return the state rather than these funky bits */
rds_conn_info_set(cinfo->flags,
atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
@@ -430,12 +507,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len,
sizeof(struct rds_info_connection));
}
-int __init rds_conn_init(void)
+int rds_conn_init(void)
{
rds_conn_slab = kmem_cache_create("rds_connection",
sizeof(struct rds_connection),
0, 0, NULL);
- if (rds_conn_slab == NULL)
+ if (!rds_conn_slab)
return -ENOMEM;
rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
@@ -470,6 +547,19 @@ void rds_conn_drop(struct rds_connection *conn)
atomic_set(&conn->c_state, RDS_CONN_ERROR);
queue_work(rds_wq, &conn->c_down_w);
}
+EXPORT_SYMBOL_GPL(rds_conn_drop);
+
+/*
+ * If the connection is down, trigger a connect. We may have scheduled a
+ * delayed reconnect however - in this case we should not interfere.
+ */
+void rds_conn_connect_if_down(struct rds_connection *conn)
+{
+ if (rds_conn_state(conn) == RDS_CONN_DOWN &&
+ !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
+ queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
+}
+EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
/*
* An error occurred on the connection