diff options
Diffstat (limited to 'net/rds')
40 files changed, 2552 insertions, 1577 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index aebfecbdb84..bb6ad81b671 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -39,7 +39,15 @@ #include <net/sock.h> #include "rds.h" -#include "rdma.h" + +char *rds_str_array(char **array, size_t elements, size_t index) +{ + if ((index < elements) && array[index]) + return array[index]; + else + return "unknown"; +} +EXPORT_SYMBOL(rds_str_array); /* this is just used for stats gathering :/ */ static DEFINE_SPINLOCK(rds_sock_lock); @@ -62,7 +70,7 @@ static int rds_release(struct socket *sock) struct rds_sock *rs; unsigned long flags; - if (sk == NULL) + if (!sk) goto out; rs = rds_sk_to_rs(sk); @@ -73,7 +81,15 @@ static int rds_release(struct socket *sock) * with the socket. */ rds_clear_recv_queue(rs); rds_cong_remove_socket(rs); + + /* + * the binding lookup hash uses rcu, we need to + * make sure we sychronize_rcu before we free our + * entry + */ rds_remove_bound(rs); + synchronize_rcu(); + rds_send_drop_to(rs, NULL); rds_rdma_drop_keys(rs); rds_notify_queue_get(rs, NULL); @@ -83,6 +99,8 @@ static int rds_release(struct socket *sock) rds_sock_count--; spin_unlock_irqrestore(&rds_sock_lock, flags); + rds_trans_put(rs->rs_transport); + sock->sk = NULL; sock_put(sk); out: @@ -514,7 +532,7 @@ out: spin_unlock_irqrestore(&rds_sock_lock, flags); } -static void __exit rds_exit(void) +static void rds_exit(void) { sock_unregister(rds_family_ops.family); proto_unregister(&rds_proto); @@ -529,7 +547,7 @@ static void __exit rds_exit(void) } module_exit(rds_exit); -static int __init rds_init(void) +static int rds_init(void) { int ret; diff --git a/net/rds/bind.c b/net/rds/bind.c index 5d95fc007f1..2f6b3fcc79f 100644 --- a/net/rds/bind.c +++ b/net/rds/bind.c @@ -34,45 +34,52 @@ #include <net/sock.h> #include <linux/in.h> #include <linux/if_arp.h> +#include <linux/jhash.h> #include "rds.h" -/* - * XXX this probably still needs more work.. no INADDR_ANY, and rbtrees aren't - * particularly zippy. - * - * This is now called for every incoming frame so we arguably care much more - * about it than we used to. - */ +#define BIND_HASH_SIZE 1024 +static struct hlist_head bind_hash_table[BIND_HASH_SIZE]; static DEFINE_SPINLOCK(rds_bind_lock); -static struct rb_root rds_bind_tree = RB_ROOT; -static struct rds_sock *rds_bind_tree_walk(__be32 addr, __be16 port, - struct rds_sock *insert) +static struct hlist_head *hash_to_bucket(__be32 addr, __be16 port) +{ + return bind_hash_table + (jhash_2words((u32)addr, (u32)port, 0) & + (BIND_HASH_SIZE - 1)); +} + +static struct rds_sock *rds_bind_lookup(__be32 addr, __be16 port, + struct rds_sock *insert) { - struct rb_node **p = &rds_bind_tree.rb_node; - struct rb_node *parent = NULL; struct rds_sock *rs; + struct hlist_node *node; + struct hlist_head *head = hash_to_bucket(addr, port); u64 cmp; u64 needle = ((u64)be32_to_cpu(addr) << 32) | be16_to_cpu(port); - while (*p) { - parent = *p; - rs = rb_entry(parent, struct rds_sock, rs_bound_node); - + rcu_read_lock(); + hlist_for_each_entry_rcu(rs, node, head, rs_bound_node) { cmp = ((u64)be32_to_cpu(rs->rs_bound_addr) << 32) | be16_to_cpu(rs->rs_bound_port); - if (needle < cmp) - p = &(*p)->rb_left; - else if (needle > cmp) - p = &(*p)->rb_right; - else + if (cmp == needle) { + rcu_read_unlock(); return rs; + } } + rcu_read_unlock(); if (insert) { - rb_link_node(&insert->rs_bound_node, parent, p); - rb_insert_color(&insert->rs_bound_node, &rds_bind_tree); + /* + * make sure our addr and port are set before + * we are added to the list, other people + * in rcu will find us as soon as the + * hlist_add_head_rcu is done + */ + insert->rs_bound_addr = addr; + insert->rs_bound_port = port; + rds_sock_addref(insert); + + hlist_add_head_rcu(&insert->rs_bound_node, head); } return NULL; } @@ -86,15 +93,13 @@ static struct rds_sock *rds_bind_tree_walk(__be32 addr, __be16 port, struct rds_sock *rds_find_bound(__be32 addr, __be16 port) { struct rds_sock *rs; - unsigned long flags; - spin_lock_irqsave(&rds_bind_lock, flags); - rs = rds_bind_tree_walk(addr, port, NULL); + rs = rds_bind_lookup(addr, port, NULL); + if (rs && !sock_flag(rds_rs_to_sk(rs), SOCK_DEAD)) rds_sock_addref(rs); else rs = NULL; - spin_unlock_irqrestore(&rds_bind_lock, flags); rdsdebug("returning rs %p for %pI4:%u\n", rs, &addr, ntohs(port)); @@ -121,22 +126,15 @@ static int rds_add_bound(struct rds_sock *rs, __be32 addr, __be16 *port) do { if (rover == 0) rover++; - if (rds_bind_tree_walk(addr, cpu_to_be16(rover), rs) == NULL) { - *port = cpu_to_be16(rover); + if (!rds_bind_lookup(addr, cpu_to_be16(rover), rs)) { + *port = rs->rs_bound_port; ret = 0; + rdsdebug("rs %p binding to %pI4:%d\n", + rs, &addr, (int)ntohs(*port)); break; } } while (rover++ != last); - if (ret == 0) { - rs->rs_bound_addr = addr; - rs->rs_bound_port = *port; - rds_sock_addref(rs); - - rdsdebug("rs %p binding to %pI4:%d\n", - rs, &addr, (int)ntohs(*port)); - } - spin_unlock_irqrestore(&rds_bind_lock, flags); return ret; @@ -153,7 +151,7 @@ void rds_remove_bound(struct rds_sock *rs) rs, &rs->rs_bound_addr, ntohs(rs->rs_bound_port)); - rb_erase(&rs->rs_bound_node, &rds_bind_tree); + hlist_del_init_rcu(&rs->rs_bound_node); rds_sock_put(rs); rs->rs_bound_addr = 0; } @@ -184,7 +182,7 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) goto out; trans = rds_trans_get_preferred(sin->sin_addr.s_addr); - if (trans == NULL) { + if (!trans) { ret = -EADDRNOTAVAIL; rds_remove_bound(rs); if (printk_ratelimit()) @@ -198,5 +196,9 @@ int rds_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len) out: release_sock(sk); + + /* we might have called rds_remove_bound on error */ + if (ret) + synchronize_rcu(); return ret; } diff --git a/net/rds/cong.c b/net/rds/cong.c index 0871a29f078..75ea686f27d 100644 --- a/net/rds/cong.c +++ b/net/rds/cong.c @@ -141,7 +141,7 @@ static struct rds_cong_map *rds_cong_from_addr(__be32 addr) unsigned long flags; map = kzalloc(sizeof(struct rds_cong_map), GFP_KERNEL); - if (map == NULL) + if (!map) return NULL; map->m_addr = addr; @@ -159,7 +159,7 @@ static struct rds_cong_map *rds_cong_from_addr(__be32 addr) ret = rds_cong_tree_walk(addr, map); spin_unlock_irqrestore(&rds_cong_lock, flags); - if (ret == NULL) { + if (!ret) { ret = map; map = NULL; } @@ -205,7 +205,7 @@ int rds_cong_get_maps(struct rds_connection *conn) conn->c_lcong = rds_cong_from_addr(conn->c_laddr); conn->c_fcong = rds_cong_from_addr(conn->c_faddr); - if (conn->c_lcong == NULL || conn->c_fcong == NULL) + if (!(conn->c_lcong && conn->c_fcong)) return -ENOMEM; return 0; @@ -221,7 +221,7 @@ void rds_cong_queue_updates(struct rds_cong_map *map) list_for_each_entry(conn, &map->m_conn_list, c_map_item) { if (!test_and_set_bit(0, &conn->c_map_queued)) { rds_stats_inc(s_cong_update_queued); - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + rds_send_xmit(conn); } } diff --git a/net/rds/connection.c b/net/rds/connection.c index 7619b671ca2..870992e08ca 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -37,7 +37,6 @@ #include "rds.h" #include "loop.h" -#include "rdma.h" #define RDS_CONNECTION_HASH_BITS 12 #define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS) @@ -63,18 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr) var |= RDS_INFO_CONNECTION_FLAG_##suffix; \ } while (0) -static inline int rds_conn_is_sending(struct rds_connection *conn) -{ - int ret = 0; - - if (!mutex_trylock(&conn->c_send_lock)) - ret = 1; - else - mutex_unlock(&conn->c_send_lock); - - return ret; -} - +/* rcu read lock must be held or the connection spinlock */ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, __be32 laddr, __be32 faddr, struct rds_transport *trans) @@ -82,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, struct rds_connection *conn, *ret = NULL; struct hlist_node *pos; - hlist_for_each_entry(conn, pos, head, c_hash_node) { + hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) { if (conn->c_faddr == faddr && conn->c_laddr == laddr && conn->c_trans == trans) { ret = conn; @@ -129,10 +117,11 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, { struct rds_connection *conn, *parent = NULL; struct hlist_head *head = rds_conn_bucket(laddr, faddr); + struct rds_transport *loop_trans; unsigned long flags; int ret; - spin_lock_irqsave(&rds_conn_lock, flags); + rcu_read_lock(); conn = rds_conn_lookup(head, laddr, faddr, trans); if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport && !is_outgoing) { @@ -143,12 +132,12 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, parent = conn; conn = parent->c_passive; } - spin_unlock_irqrestore(&rds_conn_lock, flags); + rcu_read_unlock(); if (conn) goto out; conn = kmem_cache_zalloc(rds_conn_slab, gfp); - if (conn == NULL) { + if (!conn) { conn = ERR_PTR(-ENOMEM); goto out; } @@ -159,7 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, spin_lock_init(&conn->c_lock); conn->c_next_tx_seq = 1; - mutex_init(&conn->c_send_lock); + init_waitqueue_head(&conn->c_waitq); INIT_LIST_HEAD(&conn->c_send_queue); INIT_LIST_HEAD(&conn->c_retrans); @@ -175,7 +164,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, * can bind to the destination address then we'd rather the messages * flow through loopback rather than either transport. */ - if (rds_trans_get_preferred(faddr)) { + loop_trans = rds_trans_get_preferred(faddr); + if (loop_trans) { + rds_trans_put(loop_trans); conn->c_loopback = 1; if (is_outgoing && trans->t_prefer_loopback) { /* "outgoing" connection - and the transport @@ -238,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, kmem_cache_free(rds_conn_slab, conn); conn = found; } else { - hlist_add_head(&conn->c_hash_node, head); + hlist_add_head_rcu(&conn->c_hash_node, head); rds_cong_add_conn(conn); rds_conn_count++; } @@ -263,21 +254,91 @@ struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, } EXPORT_SYMBOL_GPL(rds_conn_create_outgoing); +void rds_conn_shutdown(struct rds_connection *conn) +{ + /* shut it down unless it's down already */ + if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) { + /* + * Quiesce the connection mgmt handlers before we start tearing + * things down. We don't hold the mutex for the entire + * duration of the shutdown operation, else we may be + * deadlocking with the CM handler. Instead, the CM event + * handler is supposed to check for state DISCONNECTING + */ + mutex_lock(&conn->c_cm_lock); + if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING) + && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) { + rds_conn_error(conn, "shutdown called in state %d\n", + atomic_read(&conn->c_state)); + mutex_unlock(&conn->c_cm_lock); + return; + } + mutex_unlock(&conn->c_cm_lock); + + wait_event(conn->c_waitq, + !test_bit(RDS_IN_XMIT, &conn->c_flags)); + + conn->c_trans->conn_shutdown(conn); + rds_conn_reset(conn); + + if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { + /* This can happen - eg when we're in the middle of tearing + * down the connection, and someone unloads the rds module. + * Quite reproduceable with loopback connections. + * Mostly harmless. + */ + rds_conn_error(conn, + "%s: failed to transition to state DOWN, " + "current state is %d\n", + __func__, + atomic_read(&conn->c_state)); + return; + } + } + + /* Then reconnect if it's still live. + * The passive side of an IB loopback connection is never added + * to the conn hash, so we never trigger a reconnect on this + * conn - the reconnect is always triggered by the active peer. */ + cancel_delayed_work_sync(&conn->c_conn_w); + rcu_read_lock(); + if (!hlist_unhashed(&conn->c_hash_node)) { + rcu_read_unlock(); + rds_queue_reconnect(conn); + } else { + rcu_read_unlock(); + } +} + +/* + * Stop and free a connection. + * + * This can only be used in very limited circumstances. It assumes that once + * the conn has been shutdown that no one else is referencing the connection. + * We can only ensure this in the rmmod path in the current code. + */ void rds_conn_destroy(struct rds_connection *conn) { struct rds_message *rm, *rtmp; + unsigned long flags; rdsdebug("freeing conn %p for %pI4 -> " "%pI4\n", conn, &conn->c_laddr, &conn->c_faddr); - hlist_del_init(&conn->c_hash_node); + /* Ensure conn will not be scheduled for reconnect */ + spin_lock_irq(&rds_conn_lock); + hlist_del_init_rcu(&conn->c_hash_node); + spin_unlock_irq(&rds_conn_lock); + synchronize_rcu(); - /* wait for the rds thread to shut it down */ - atomic_set(&conn->c_state, RDS_CONN_ERROR); - cancel_delayed_work(&conn->c_conn_w); - queue_work(rds_wq, &conn->c_down_w); - flush_workqueue(rds_wq); + /* shut the connection down */ + rds_conn_drop(conn); + flush_work(&conn->c_down_w); + + /* make sure lingering queued work won't try to ref the conn */ + cancel_delayed_work_sync(&conn->c_send_w); + cancel_delayed_work_sync(&conn->c_recv_w); /* tear down queued messages */ list_for_each_entry_safe(rm, rtmp, @@ -302,7 +363,9 @@ void rds_conn_destroy(struct rds_connection *conn) BUG_ON(!list_empty(&conn->c_retrans)); kmem_cache_free(rds_conn_slab, conn); + spin_lock_irqsave(&rds_conn_lock, flags); rds_conn_count--; + spin_unlock_irqrestore(&rds_conn_lock, flags); } EXPORT_SYMBOL_GPL(rds_conn_destroy); @@ -316,23 +379,23 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, struct list_head *list; struct rds_connection *conn; struct rds_message *rm; - unsigned long flags; unsigned int total = 0; + unsigned long flags; size_t i; len /= sizeof(struct rds_info_message); - spin_lock_irqsave(&rds_conn_lock, flags); + rcu_read_lock(); for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { - hlist_for_each_entry(conn, pos, head, c_hash_node) { + hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) { if (want_send) list = &conn->c_send_queue; else list = &conn->c_retrans; - spin_lock(&conn->c_lock); + spin_lock_irqsave(&conn->c_lock, flags); /* XXX too lazy to maintain counts.. */ list_for_each_entry(rm, list, m_conn_item) { @@ -343,11 +406,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, conn->c_faddr, 0); } - spin_unlock(&conn->c_lock); + spin_unlock_irqrestore(&conn->c_lock, flags); } } - - spin_unlock_irqrestore(&rds_conn_lock, flags); + rcu_read_unlock(); lens->nr = total; lens->each = sizeof(struct rds_info_message); @@ -377,19 +439,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, uint64_t buffer[(item_len + 7) / 8]; struct hlist_head *head; struct hlist_node *pos; - struct hlist_node *tmp; struct rds_connection *conn; - unsigned long flags; size_t i; - spin_lock_irqsave(&rds_conn_lock, flags); + rcu_read_lock(); lens->nr = 0; lens->each = item_len; for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { - hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) { + hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) { /* XXX no c_lock usage.. */ if (!visitor(conn, buffer)) @@ -405,8 +465,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, lens->nr++; } } - - spin_unlock_irqrestore(&rds_conn_lock, flags); + rcu_read_unlock(); } EXPORT_SYMBOL_GPL(rds_for_each_conn_info); @@ -423,8 +482,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn, sizeof(cinfo->transport)); cinfo->flags = 0; - rds_conn_info_set(cinfo->flags, - rds_conn_is_sending(conn), SENDING); + rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags), + SENDING); /* XXX Future: return the state rather than these funky bits */ rds_conn_info_set(cinfo->flags, atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, @@ -444,12 +503,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len, sizeof(struct rds_info_connection)); } -int __init rds_conn_init(void) +int rds_conn_init(void) { rds_conn_slab = kmem_cache_create("rds_connection", sizeof(struct rds_connection), 0, 0, NULL); - if (rds_conn_slab == NULL) + if (!rds_conn_slab) return -ENOMEM; rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info); @@ -487,6 +546,18 @@ void rds_conn_drop(struct rds_connection *conn) EXPORT_SYMBOL_GPL(rds_conn_drop); /* + * If the connection is down, trigger a connect. We may have scheduled a + * delayed reconnect however - in this case we should not interfere. + */ +void rds_conn_connect_if_down(struct rds_connection *conn) +{ + if (rds_conn_state(conn) == RDS_CONN_DOWN && + !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) + queue_delayed_work(rds_wq, &conn->c_conn_w, 0); +} +EXPORT_SYMBOL_GPL(rds_conn_connect_if_down); + +/* * An error occurred on the connection */ void diff --git a/net/rds/ib.c b/net/rds/ib.c index 8f2d6dd7700..b12a3951167 100644 --- a/net/rds/ib.c +++ b/net/rds/ib.c @@ -53,12 +53,71 @@ MODULE_PARM_DESC(fmr_message_size, " Max size of a RDMA transfer"); module_param(rds_ib_retry_count, int, 0444); MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error"); +/* + * we have a clumsy combination of RCU and a rwsem protecting this list + * because it is used both in the get_mr fast path and while blocking in + * the FMR flushing path. + */ +DECLARE_RWSEM(rds_ib_devices_lock); struct list_head rds_ib_devices; /* NOTE: if also grabbing ibdev lock, grab this first */ DEFINE_SPINLOCK(ib_nodev_conns_lock); LIST_HEAD(ib_nodev_conns); +void rds_ib_nodev_connect(void) +{ + struct rds_ib_connection *ic; + + spin_lock(&ib_nodev_conns_lock); + list_for_each_entry(ic, &ib_nodev_conns, ib_node) + rds_conn_connect_if_down(ic->conn); + spin_unlock(&ib_nodev_conns_lock); +} + +void rds_ib_dev_shutdown(struct rds_ib_device *rds_ibdev) +{ + struct rds_ib_connection *ic; + unsigned long flags; + + spin_lock_irqsave(&rds_ibdev->spinlock, flags); + list_for_each_entry(ic, &rds_ibdev->conn_list, ib_node) + rds_conn_drop(ic->conn); + spin_unlock_irqrestore(&rds_ibdev->spinlock, flags); +} + +/* + * rds_ib_destroy_mr_pool() blocks on a few things and mrs drop references + * from interrupt context so we push freing off into a work struct in krdsd. + */ +static void rds_ib_dev_free(struct work_struct *work) +{ + struct rds_ib_ipaddr *i_ipaddr, *i_next; + struct rds_ib_device *rds_ibdev = container_of(work, + struct rds_ib_device, free_work); + + if (rds_ibdev->mr_pool) + rds_ib_destroy_mr_pool(rds_ibdev->mr_pool); + if (rds_ibdev->mr) + ib_dereg_mr(rds_ibdev->mr); + if (rds_ibdev->pd) + ib_dealloc_pd(rds_ibdev->pd); + + list_for_each_entry_safe(i_ipaddr, i_next, &rds_ibdev->ipaddr_list, list) { + list_del(&i_ipaddr->list); + kfree(i_ipaddr); + } + + kfree(rds_ibdev); +} + +void rds_ib_dev_put(struct rds_ib_device *rds_ibdev) +{ + BUG_ON(atomic_read(&rds_ibdev->refcount) <= 0); + if (atomic_dec_and_test(&rds_ibdev->refcount)) + queue_work(rds_wq, &rds_ibdev->free_work); +} + void rds_ib_add_one(struct ib_device *device) { struct rds_ib_device *rds_ibdev; @@ -77,11 +136,14 @@ void rds_ib_add_one(struct ib_device *device) goto free_attr; } - rds_ibdev = kmalloc(sizeof *rds_ibdev, GFP_KERNEL); + rds_ibdev = kzalloc_node(sizeof(struct rds_ib_device), GFP_KERNEL, + ibdev_to_node(device)); if (!rds_ibdev) goto free_attr; spin_lock_init(&rds_ibdev->spinlock); + atomic_set(&rds_ibdev->refcount, 1); + INIT_WORK(&rds_ibdev->free_work, rds_ib_dev_free); rds_ibdev->max_wrs = dev_attr->max_qp_wr; rds_ibdev->max_sge = min(dev_attr->max_sge, RDS_IB_MAX_SGE); @@ -91,68 +153,107 @@ void rds_ib_add_one(struct ib_device *device) min_t(unsigned int, dev_attr->max_fmr, fmr_pool_size) : fmr_pool_size; + rds_ibdev->max_initiator_depth = dev_attr->max_qp_init_rd_atom; + rds_ibdev->max_responder_resources = dev_attr->max_qp_rd_atom; + rds_ibdev->dev = device; rds_ibdev->pd = ib_alloc_pd(device); - if (IS_ERR(rds_ibdev->pd)) - goto free_dev; + if (IS_ERR(rds_ibdev->pd)) { + rds_ibdev->pd = NULL; + goto put_dev; + } - rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, - IB_ACCESS_LOCAL_WRITE); - if (IS_ERR(rds_ibdev->mr)) - goto err_pd; + rds_ibdev->mr = ib_get_dma_mr(rds_ibdev->pd, IB_ACCESS_LOCAL_WRITE); + if (IS_ERR(rds_ibdev->mr)) { + rds_ibdev->mr = NULL; + goto put_dev; + } rds_ibdev->mr_pool = rds_ib_create_mr_pool(rds_ibdev); if (IS_ERR(rds_ibdev->mr_pool)) { rds_ibdev->mr_pool = NULL; - goto err_mr; + goto put_dev; } INIT_LIST_HEAD(&rds_ibdev->ipaddr_list); INIT_LIST_HEAD(&rds_ibdev->conn_list); - list_add_tail(&rds_ibdev->list, &rds_ib_devices); + + down_write(&rds_ib_devices_lock); + list_add_tail_rcu(&rds_ibdev->list, &rds_ib_devices); + up_write(&rds_ib_devices_lock); + atomic_inc(&rds_ibdev->refcount); ib_set_client_data(device, &rds_ib_client, rds_ibdev); + atomic_inc(&rds_ibdev->refcount); - goto free_attr; + rds_ib_nodev_connect(); -err_mr: - ib_dereg_mr(rds_ibdev->mr); -err_pd: - ib_dealloc_pd(rds_ibdev->pd); -free_dev: - kfree(rds_ibdev); +put_dev: + rds_ib_dev_put(rds_ibdev); free_attr: kfree(dev_attr); } +/* + * New connections use this to find the device to associate with the + * connection. It's not in the fast path so we're not concerned about the + * performance of the IB call. (As of this writing, it uses an interrupt + * blocking spinlock to serialize walking a per-device list of all registered + * clients.) + * + * RCU is used to handle incoming connections racing with device teardown. + * Rather than use a lock to serialize removal from the client_data and + * getting a new reference, we use an RCU grace period. The destruction + * path removes the device from client_data and then waits for all RCU + * readers to finish. + * + * A new connection can get NULL from this if its arriving on a + * device that is in the process of being removed. + */ +struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device) +{ + struct rds_ib_device *rds_ibdev; + + rcu_read_lock(); + rds_ibdev = ib_get_client_data(device, &rds_ib_client); + if (rds_ibdev) + atomic_inc(&rds_ibdev->refcount); + rcu_read_unlock(); + return rds_ibdev; +} + +/* + * The IB stack is letting us know that a device is going away. This can + * happen if the underlying HCA driver is removed or if PCI hotplug is removing + * the pci function, for example. + * + * This can be called at any time and can be racing with any other RDS path. + */ void rds_ib_remove_one(struct |