From 7c82eaf00ec7d460932be9314b29997006b799b6 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Fri, 19 Feb 2010 18:01:41 -0800 Subject: RDS: Rewrite rds_send_drop_to() for clarity This function has been the source of numerous bugs; it's just too complicated. Simplified to nest spinlocks cleanly within the second loop body, and kick out early if there are no rms to drop. This will be a little slower because conn lock is grabbed for each entry instead of "caching" the lock across rms, but this should be entirely irrelevant to fastpath performance. Signed-off-by: Andy Grover --- net/rds/send.c | 64 ++++++++++++++++++++++++++-------------------------------- 1 file changed, 29 insertions(+), 35 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 9c1c6bcaa6c..aee58f931b6 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -619,9 +619,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) { struct rds_message *rm, *tmp; struct rds_connection *conn; - unsigned long flags, flags2; + unsigned long flags; LIST_HEAD(list); - int wake = 0; /* get all the messages we're dropping under the rs lock */ spin_lock_irqsave(&rs->rs_lock, flags); @@ -631,59 +630,54 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) dest->sin_port != rm->m_inc.i_hdr.h_dport)) continue; - wake = 1; list_move(&rm->m_sock_item, &list); rds_send_sndbuf_remove(rs, rm); clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); } /* order flag updates with the rs lock */ - if (wake) - smp_mb__after_clear_bit(); + smp_mb__after_clear_bit(); spin_unlock_irqrestore(&rs->rs_lock, flags); - conn = NULL; + if (list_empty(&list)) + return; - /* now remove the messages from the conn list as needed */ + /* Remove the messages from the conn */ list_for_each_entry(rm, &list, m_sock_item) { - /* We do this here rather than in the loop above, so that - * we don't have to nest m_rs_lock under rs->rs_lock */ - spin_lock_irqsave(&rm->m_rs_lock, flags2); - /* If this is a RDMA operation, notify the app. */ - spin_lock(&rs->rs_lock); - __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); - spin_unlock(&rs->rs_lock); - rm->m_rs = NULL; - spin_unlock_irqrestore(&rm->m_rs_lock, flags2); + + conn = rm->m_inc.i_conn; + spin_lock_irqsave(&conn->c_lock, flags); /* - * If we see this flag cleared then we're *sure* that someone - * else beat us to removing it from the conn. If we race - * with their flag update we'll get the lock and then really - * see that the flag has been cleared. + * Maybe someone else beat us to removing rm from the conn. + * If we race with their flag update we'll get the lock and + * then really see that the flag has been cleared. */ - if (!test_bit(RDS_MSG_ON_CONN, &rm->m_flags)) + if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { + spin_unlock_irqrestore(&conn->c_lock, flags); continue; - - if (conn != rm->m_inc.i_conn) { - if (conn) - spin_unlock_irqrestore(&conn->c_lock, flags); - conn = rm->m_inc.i_conn; - spin_lock_irqsave(&conn->c_lock, flags); } - if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { - list_del_init(&rm->m_conn_item); - rds_message_put(rm); - } - } + /* + * Couldn't grab m_rs_lock in top loop (lock ordering), + * but we can now. + */ + spin_lock(&rm->m_rs_lock); - if (conn) + spin_lock(&rs->rs_lock); + __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); + spin_unlock(&rs->rs_lock); + + rm->m_rs = NULL; + spin_unlock(&rm->m_rs_lock); + + list_del_init(&rm->m_conn_item); + rds_message_put(rm); spin_unlock_irqrestore(&conn->c_lock, flags); + } - if (wake) - rds_wake_sk_sleep(rs); + rds_wake_sk_sleep(rs); while (!list_empty(&list)) { rm = list_entry(list.next, struct rds_message, m_sock_item); -- cgit v1.2.3-18-g5258 From 9de0864cf55927a7383b5ba6e48834ff3ef053de Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Mon, 29 Mar 2010 16:50:54 -0700 Subject: RDS: Fix locking in send on m_rs_lock Do not nest m_rs_lock under c_lock Disable interrupts in {rdma,atomic}_send_complete Signed-off-by: Andy Grover --- net/rds/send.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index aee58f931b6..725fb041979 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -415,8 +415,9 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) struct rds_sock *rs = NULL; struct rds_rdma_op *ro; struct rds_notifier *notifier; + unsigned long flags; - spin_lock(&rm->m_rs_lock); + spin_lock_irqsave(&rm->m_rs_lock, flags); ro = rm->m_rdma_op; if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && @@ -433,7 +434,7 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) ro->r_notifier = NULL; } - spin_unlock(&rm->m_rs_lock); + spin_unlock_irqrestore(&rm->m_rs_lock, flags); if (rs) { rds_wake_sk_sleep(rs); @@ -647,8 +648,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) list_for_each_entry(rm, &list, m_sock_item) { conn = rm->m_inc.i_conn; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock_irqsave(&conn->c_lock, flags); /* * Maybe someone else beat us to removing rm from the conn. * If we race with their flag update we'll get the lock and @@ -658,23 +659,23 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) spin_unlock_irqrestore(&conn->c_lock, flags); continue; } + list_del_init(&rm->m_conn_item); + spin_unlock_irqrestore(&conn->c_lock, flags); /* * Couldn't grab m_rs_lock in top loop (lock ordering), * but we can now. */ - spin_lock(&rm->m_rs_lock); + spin_lock_irqsave(&rm->m_rs_lock, flags); spin_lock(&rs->rs_lock); __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); spin_unlock(&rs->rs_lock); rm->m_rs = NULL; - spin_unlock(&rm->m_rs_lock); + spin_unlock_irqrestore(&rm->m_rs_lock, flags); - list_del_init(&rm->m_conn_item); rds_message_put(rm); - spin_unlock_irqrestore(&conn->c_lock, flags); } rds_wake_sk_sleep(rs); -- cgit v1.2.3-18-g5258 From 8690bfa17aea4c42da1bcf90a7af93d161eca624 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 11:56:44 -0800 Subject: RDS: cleanup: remove "== NULL"s and "!= NULL"s in ptr comparisons Favor "if (foo)" style over "if (foo != NULL)". Signed-off-by: Andy Grover --- net/rds/send.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 725fb041979..817997daf78 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -164,7 +164,7 @@ int rds_send_xmit(struct rds_connection *conn) * offset and S/G temporaries. */ rm = conn->c_xmit_rm; - if (rm != NULL && + if (rm && conn->c_xmit_hdr_off == sizeof(struct rds_header) && conn->c_xmit_sg == rm->m_nents) { conn->c_xmit_rm = NULL; @@ -180,8 +180,8 @@ int rds_send_xmit(struct rds_connection *conn) /* If we're asked to send a cong map update, do so. */ - if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { - if (conn->c_trans->xmit_cong_map != NULL) { + if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { + if (conn->c_trans->xmit_cong_map) { conn->c_map_offset = 0; conn->c_map_bytes = sizeof(struct rds_header) + RDS_CONG_MAP_BYTES; @@ -204,7 +204,7 @@ int rds_send_xmit(struct rds_connection *conn) * the connction. We can use this ref while holding the * send_sem.. rds_send_reset() is serialized with it. */ - if (rm == NULL) { + if (!rm) { unsigned int len; spin_lock_irqsave(&conn->c_lock, flags); @@ -224,7 +224,7 @@ int rds_send_xmit(struct rds_connection *conn) spin_unlock_irqrestore(&conn->c_lock, flags); - if (rm == NULL) { + if (!rm) { was_empty = 1; break; } @@ -875,7 +875,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; if ((rm->m_rdma_cookie || rm->m_rdma_op) && - conn->c_trans->xmit_rdma == NULL) { + !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", rm->m_rdma_op, conn->c_trans->xmit_rdma); @@ -961,7 +961,7 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) int ret = 0; rm = rds_message_alloc(0, GFP_ATOMIC); - if (rm == NULL) { + if (!rm) { ret = -ENOMEM; goto out; } -- cgit v1.2.3-18-g5258 From e779137aa76d38d5c33a98ed887092ae4e4f016f Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 12:15:02 -0800 Subject: RDS: break out rdma and data ops into nested structs in rds_message Clearly separate rdma-related variables in rm from data-related ones. This is in anticipation of adding atomic support. Signed-off-by: Andy Grover --- net/rds/send.c | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 817997daf78..19dfd025498 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -166,7 +166,7 @@ int rds_send_xmit(struct rds_connection *conn) rm = conn->c_xmit_rm; if (rm && conn->c_xmit_hdr_off == sizeof(struct rds_header) && - conn->c_xmit_sg == rm->m_nents) { + conn->c_xmit_sg == rm->data.m_nents) { conn->c_xmit_rm = NULL; conn->c_xmit_sg = 0; conn->c_xmit_hdr_off = 0; @@ -236,7 +236,7 @@ int rds_send_xmit(struct rds_connection *conn) * connection. * Therefore, we never retransmit messages with RDMA ops. */ - if (rm->m_rdma_op && + if (rm->rdma.m_rdma_op && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { spin_lock_irqsave(&conn->c_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) @@ -268,8 +268,8 @@ int rds_send_xmit(struct rds_connection *conn) * keep this simple and require that the transport either * send the whole rdma or none of it. */ - if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) { - ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op); + if (rm->rdma.m_rdma_op && !conn->c_xmit_rdma_sent) { + ret = conn->c_trans->xmit_rdma(conn, rm->rdma.m_rdma_op); if (ret) break; conn->c_xmit_rdma_sent = 1; @@ -279,7 +279,7 @@ int rds_send_xmit(struct rds_connection *conn) } if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || - conn->c_xmit_sg < rm->m_nents) { + conn->c_xmit_sg < rm->data.m_nents) { ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, @@ -295,7 +295,7 @@ int rds_send_xmit(struct rds_connection *conn) ret -= tmp; } - sg = &rm->m_sg[conn->c_xmit_sg]; + sg = &rm->data.m_sg[conn->c_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - conn->c_xmit_data_off); @@ -306,7 +306,7 @@ int rds_send_xmit(struct rds_connection *conn) sg++; conn->c_xmit_sg++; BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->m_nents); + conn->c_xmit_sg == rm->data.m_nents); } } } @@ -419,7 +419,7 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) spin_lock_irqsave(&rm->m_rs_lock, flags); - ro = rm->m_rdma_op; + ro = rm->rdma.m_rdma_op; if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && ro && ro->r_notify && ro->r_notifier) { notifier = ro->r_notifier; @@ -453,7 +453,7 @@ __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status { struct rds_rdma_op *ro; - ro = rm->m_rdma_op; + ro = rm->rdma.m_rdma_op; if (ro && ro->r_notify && ro->r_notifier) { ro->r_notifier->n_status = status; list_add_tail(&ro->r_notifier->n_list, &rs->rs_notify_queue); @@ -477,7 +477,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, spin_lock_irqsave(&conn->c_lock, flags); list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (rm->m_rdma_op == op) { + if (rm->rdma.m_rdma_op == op) { atomic_inc(&rm->m_refcount); found = rm; goto out; @@ -485,7 +485,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, } list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (rm->m_rdma_op == op) { + if (rm->rdma.m_rdma_op == op) { atomic_inc(&rm->m_refcount); found = rm; break; @@ -545,7 +545,7 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) spin_lock(&rs->rs_lock); if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { - struct rds_rdma_op *ro = rm->m_rdma_op; + struct rds_rdma_op *ro = rm->rdma.m_rdma_op; struct rds_notifier *notifier; list_del_init(&rm->m_sock_item); @@ -557,7 +557,7 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) &rs->rs_notify_queue); if (!notifier->n_status) notifier->n_status = status; - rm->m_rdma_op->r_notifier = NULL; + rm->rdma.m_rdma_op->r_notifier = NULL; } was_on_sock = 1; rm->m_rs = NULL; @@ -874,11 +874,11 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, if (ret) goto out; - if ((rm->m_rdma_cookie || rm->m_rdma_op) && + if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op) && !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", - rm->m_rdma_op, conn->c_trans->xmit_rdma); + rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); ret = -EOPNOTSUPP; goto out; } -- cgit v1.2.3-18-g5258 From fc445084f185cdd877bec323bfe724a361e2292a Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 12:56:06 -0800 Subject: RDS: Explicitly allocate rm in sendmsg() r_m_copy_from_user used to allocate the rm as well as kernel buffers for the data, and then copy the data in. Now, sendmsg() allocates the rm, although the data buffer alloc still happens in r_m_copy_from_user. SGs are still allocated with rm, but now r_m_alloc_sgs() is used to reserve them. This allows multiple SG lists to be allocated from the one rm -- this is important once we also want to alloc our rdma sgl from this pool. Signed-off-by: Andy Grover --- net/rds/send.c | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 19dfd025498..28d09447207 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -758,6 +758,19 @@ out: return *queued; } +/* + * rds_message is getting to be quite complicated, and we'd like to allocate + * it all in one go. This figures out how big it needs to be up front. + */ +static int rds_rm_size(struct msghdr *msg, int data_len) +{ + int size = 0; + + size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); + + return size; +} + static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, struct msghdr *msg, int *allocated_mr) { @@ -845,13 +858,23 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; } - rm = rds_message_copy_from_user(msg->msg_iov, payload_len); - if (IS_ERR(rm)) { - ret = PTR_ERR(rm); - rm = NULL; + /* size of rm including all sgs */ + ret = rds_rm_size(msg, payload_len); + if (ret < 0) + goto out; + + rm = rds_message_alloc(ret, GFP_KERNEL); + if (!rm) { + ret = -ENOMEM; goto out; } + rm->data.m_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); + /* XXX fix this to not allocate memory */ + ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); + if (ret) + goto out; + rm->m_daddr = daddr; /* rds_conn_create has a spinlock that runs with IRQ off. -- cgit v1.2.3-18-g5258 From 21f79afa5fda2820671a8f64c3d0e43bb118053b Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 12:57:27 -0800 Subject: RDS: fold rdma.h into rds.h RDMA is now an intrinsic part of RDS, so it's easier to just have a single header. Signed-off-by: Andy Grover --- net/rds/send.c | 1 - 1 file changed, 1 deletion(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 28d09447207..89e26ffdc81 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -37,7 +37,6 @@ #include #include "rds.h" -#include "rdma.h" /* When transmitting messages in rds_send_xmit, we need to emerge from * time to time and briefly release the CPU. Otherwise the softlock watchdog -- cgit v1.2.3-18-g5258 From ff87e97a9d70c9ae133d3d3d7792b26ab85f4297 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 14:13:15 -0800 Subject: RDS: make m_rdma_op a member of rds_message This eliminates a separate memory alloc, although it is now necessary to add an "r_active" flag, since it is no longer to use the m_rdma_op pointer as an indicator of if an rdma op is present. rdma SGs allocated from rm sg pool. rds_rm_size also gets bigger. It's a little inefficient to run through CMSGs twice, but it makes later steps a lot smoother. Signed-off-by: Andy Grover --- net/rds/send.c | 59 +++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 15 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 89e26ffdc81..72dbe7fc4f5 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -235,7 +235,7 @@ int rds_send_xmit(struct rds_connection *conn) * connection. * Therefore, we never retransmit messages with RDMA ops. */ - if (rm->rdma.m_rdma_op && + if (rm->rdma.m_rdma_op.r_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { spin_lock_irqsave(&conn->c_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) @@ -267,8 +267,8 @@ int rds_send_xmit(struct rds_connection *conn) * keep this simple and require that the transport either * send the whole rdma or none of it. */ - if (rm->rdma.m_rdma_op && !conn->c_xmit_rdma_sent) { - ret = conn->c_trans->xmit_rdma(conn, rm->rdma.m_rdma_op); + if (rm->rdma.m_rdma_op.r_active && !conn->c_xmit_rdma_sent) { + ret = conn->c_trans->xmit_rdma(conn, &rm->rdma.m_rdma_op); if (ret) break; conn->c_xmit_rdma_sent = 1; @@ -418,9 +418,9 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) spin_lock_irqsave(&rm->m_rs_lock, flags); - ro = rm->rdma.m_rdma_op; + ro = &rm->rdma.m_rdma_op; if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && - ro && ro->r_notify && ro->r_notifier) { + ro->r_active && ro->r_notify && ro->r_notifier) { notifier = ro->r_notifier; rs = rm->m_rs; sock_hold(rds_rs_to_sk(rs)); @@ -452,8 +452,8 @@ __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status { struct rds_rdma_op *ro; - ro = rm->rdma.m_rdma_op; - if (ro && ro->r_notify && ro->r_notifier) { + ro = &rm->rdma.m_rdma_op; + if (ro->r_active && ro->r_notify && ro->r_notifier) { ro->r_notifier->n_status = status; list_add_tail(&ro->r_notifier->n_list, &rs->rs_notify_queue); ro->r_notifier = NULL; @@ -476,7 +476,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, spin_lock_irqsave(&conn->c_lock, flags); list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (rm->rdma.m_rdma_op == op) { + if (&rm->rdma.m_rdma_op == op) { atomic_inc(&rm->m_refcount); found = rm; goto out; @@ -484,7 +484,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, } list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (rm->rdma.m_rdma_op == op) { + if (&rm->rdma.m_rdma_op == op) { atomic_inc(&rm->m_refcount); found = rm; break; @@ -544,19 +544,20 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) spin_lock(&rs->rs_lock); if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { - struct rds_rdma_op *ro = rm->rdma.m_rdma_op; + struct rds_rdma_op *ro = &rm->rdma.m_rdma_op; struct rds_notifier *notifier; list_del_init(&rm->m_sock_item); rds_send_sndbuf_remove(rs, rm); - if (ro && ro->r_notifier && (status || ro->r_notify)) { + if (ro->r_active && ro->r_notifier && + (status || ro->r_notify)) { notifier = ro->r_notifier; list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); if (!notifier->n_status) notifier->n_status = status; - rm->rdma.m_rdma_op->r_notifier = NULL; + rm->rdma.m_rdma_op.r_notifier = NULL; } was_on_sock = 1; rm->m_rs = NULL; @@ -763,9 +764,37 @@ out: */ static int rds_rm_size(struct msghdr *msg, int data_len) { + struct cmsghdr *cmsg; int size = 0; + int retval; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + if (!CMSG_OK(msg, cmsg)) + return -EINVAL; + + if (cmsg->cmsg_level != SOL_RDS) + continue; + + switch (cmsg->cmsg_type) { + case RDS_CMSG_RDMA_ARGS: + retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); + if (retval < 0) + return retval; + size += retval; + break; + + case RDS_CMSG_RDMA_DEST: + case RDS_CMSG_RDMA_MAP: + /* these are valid but do no add any size */ + break; + + default: + return -EINVAL; + } + + } - size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); + size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); return size; } @@ -896,11 +925,11 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, if (ret) goto out; - if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op) && + if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) && !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", - rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); ret = -EOPNOTSUPP; goto out; } -- cgit v1.2.3-18-g5258 From a63273d4992603979ddb181b6a8f07082839b39f Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 14:19:32 -0800 Subject: RDS: Clear up some confusing code in send_remove_from_sock The previous code was correct, but made the assumption that if r_notifier was non-NULL then either r_recverr or r_notify was true. Valid, but fragile. Changed to explicitly check r_recverr (shows up in greps for recverr now, too.) Signed-off-by: Andy Grover --- net/rds/send.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 72dbe7fc4f5..b751a8e77c4 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -551,7 +551,7 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) rds_send_sndbuf_remove(rs, rm); if (ro->r_active && ro->r_notifier && - (status || ro->r_notify)) { + (ro->r_notify || (ro->r_recverr && status))) { notifier = ro->r_notifier; list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); -- cgit v1.2.3-18-g5258 From 15133f6e67d8d646d0744336b4daa3135452cb0d Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 14:33:38 -0800 Subject: RDS: Implement atomic operations Implement a CMSG-based interface to do FADD and CSWP ops. Alter send routines to handle atomic ops. Add atomic counters to stats. Add xmit_atomic() to struct rds_transport Inline rds_ib_send_unmap_rdma into unmap_rm Signed-off-by: Andy Grover --- net/rds/send.c | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 68 insertions(+), 3 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index b751a8e77c4..f3f4e79274b 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -73,6 +73,7 @@ void rds_send_reset(struct rds_connection *conn) conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; conn->c_map_queued = 0; @@ -171,6 +172,7 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; /* Release the reference to the previous message. */ rds_message_put(rm); @@ -262,6 +264,17 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_rm = rm; } + + if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); + if (ret) + break; + conn->c_xmit_atomic_sent = 1; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); + } + /* * Try and send an rdma message. Let's see if we can * keep this simple and require that the transport either @@ -442,6 +455,41 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) } EXPORT_SYMBOL_GPL(rds_rdma_send_complete); +/* + * Just like above, except looks at atomic op + */ +void rds_atomic_send_complete(struct rds_message *rm, int status) +{ + struct rds_sock *rs = NULL; + struct rm_atomic_op *ao; + struct rds_notifier *notifier; + + spin_lock(&rm->m_rs_lock); + + ao = &rm->atomic; + if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) + && ao->op_active && ao->op_notify && ao->op_notifier) { + notifier = ao->op_notifier; + rs = rm->m_rs; + sock_hold(rds_rs_to_sk(rs)); + + notifier->n_status = status; + spin_lock(&rs->rs_lock); + list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); + spin_unlock(&rs->rs_lock); + + ao->op_notifier = NULL; + } + + spin_unlock(&rm->m_rs_lock); + + if (rs) { + rds_wake_sk_sleep(rs); + sock_put(rds_rs_to_sk(rs)); + } +} +EXPORT_SYMBOL_GPL(rds_atomic_send_complete); + /* * This is the same as rds_rdma_send_complete except we * don't do any locking - we have all the ingredients (message, @@ -788,6 +836,11 @@ static int rds_rm_size(struct msghdr *msg, int data_len) /* these are valid but do no add any size */ break; + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + size += sizeof(struct scatterlist); + break; + default: return -EINVAL; } @@ -813,7 +866,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, continue; /* As a side effect, RDMA_DEST and RDMA_MAP will set - * rm->m_rdma_cookie and rm->m_rdma_mr. + * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. */ switch (cmsg->cmsg_type) { case RDS_CMSG_RDMA_ARGS: @@ -829,6 +882,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, if (!ret) *allocated_mr = 1; break; + case RDS_CMSG_ATOMIC_CSWP: + case RDS_CMSG_ATOMIC_FADD: + ret = rds_cmsg_atomic(rs, rm, cmsg); + break; default: return -EINVAL; @@ -926,10 +983,18 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) && - !conn->c_trans->xmit_rdma) { + !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", - &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + ret = -EOPNOTSUPP; + goto out; + } + + if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { + if (printk_ratelimit()) + printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", + &rm->atomic, conn->c_trans->xmit_atomic); ret = -EOPNOTSUPP; goto out; } -- cgit v1.2.3-18-g5258 From 6200ed7799d9225f363f157ab61f1566cfd80e19 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 12 Jan 2010 10:53:05 -0800 Subject: RDS: Whitespace Tidy up some whitespace issues. Signed-off-by: Andy Grover --- net/rds/send.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index f3f4e79274b..5bc35d2f40e 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -150,7 +150,7 @@ int rds_send_xmit(struct rds_connection *conn) */ if (conn->c_map_bytes) { ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, - conn->c_map_offset); + conn->c_map_offset); if (ret <= 0) break; -- cgit v1.2.3-18-g5258 From 241eef3e2f51fe4ad50abacd7f79c4e2d468197e Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 19 Jan 2010 21:25:26 -0800 Subject: RDS: Implement silent atomics Signed-off-by: Andy Grover --- net/rds/send.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 5bc35d2f40e..42fb934293b 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -266,7 +266,7 @@ int rds_send_xmit(struct rds_connection *conn) if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { - ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); + ret = conn->c_trans->xmit_atomic(conn, rm); if (ret) break; conn->c_xmit_atomic_sent = 1; @@ -285,13 +285,18 @@ int rds_send_xmit(struct rds_connection *conn) if (ret) break; conn->c_xmit_rdma_sent = 1; + + /* rdmas need data sent, even if just the header */ + rm->data.op_active = 1; + /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); } - if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || - conn->c_xmit_sg < rm->data.m_nents) { + if (rm->data.op_active + && (conn->c_xmit_hdr_off < sizeof(struct rds_header) || + conn->c_xmit_sg < rm->data.m_nents)) { ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, -- cgit v1.2.3-18-g5258 From f8b3aaf2ba8ca9e27b47f8bfdff07c8b968f2c05 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Mon, 1 Mar 2010 14:11:53 -0800 Subject: RDS: Remove struct rds_rdma_op A big changeset, but it's all pretty dumb. struct rds_rdma_op was already embedded in struct rm_rdma_op. Remove rds_rdma_op and put its members in rm_rdma_op. Rename members with "op_" prefix instead of "r_", for consistency. Of course this breaks a lot, so fixup the code accordingly. Signed-off-by: Andy Grover --- net/rds/send.c | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 42fb934293b..08df279ced2 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -237,7 +237,7 @@ int rds_send_xmit(struct rds_connection *conn) * connection. * Therefore, we never retransmit messages with RDMA ops. */ - if (rm->rdma.m_rdma_op.r_active && + if (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { spin_lock_irqsave(&conn->c_lock, flags); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) @@ -280,8 +280,8 @@ int rds_send_xmit(struct rds_connection *conn) * keep this simple and require that the transport either * send the whole rdma or none of it. */ - if (rm->rdma.m_rdma_op.r_active && !conn->c_xmit_rdma_sent) { - ret = conn->c_trans->xmit_rdma(conn, &rm->rdma.m_rdma_op); + if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); if (ret) break; conn->c_xmit_rdma_sent = 1; @@ -430,16 +430,16 @@ int rds_send_acked_before(struct rds_connection *conn, u64 seq) void rds_rdma_send_complete(struct rds_message *rm, int status) { struct rds_sock *rs = NULL; - struct rds_rdma_op *ro; + struct rm_rdma_op *ro; struct rds_notifier *notifier; unsigned long flags; spin_lock_irqsave(&rm->m_rs_lock, flags); - ro = &rm->rdma.m_rdma_op; + ro = &rm->rdma; if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && - ro->r_active && ro->r_notify && ro->r_notifier) { - notifier = ro->r_notifier; + ro->op_active && ro->op_notify && ro->op_notifier) { + notifier = ro->op_notifier; rs = rm->m_rs; sock_hold(rds_rs_to_sk(rs)); @@ -448,7 +448,7 @@ void rds_rdma_send_complete(struct rds_message *rm, int status) list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); spin_unlock(&rs->rs_lock); - ro->r_notifier = NULL; + ro->op_notifier = NULL; } spin_unlock_irqrestore(&rm->m_rs_lock, flags); @@ -503,13 +503,13 @@ EXPORT_SYMBOL_GPL(rds_atomic_send_complete); static inline void __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) { - struct rds_rdma_op *ro; + struct rm_rdma_op *ro; - ro = &rm->rdma.m_rdma_op; - if (ro->r_active && ro->r_notify && ro->r_notifier) { - ro->r_notifier->n_status = status; - list_add_tail(&ro->r_notifier->n_list, &rs->rs_notify_queue); - ro->r_notifier = NULL; + ro = &rm->rdma; + if (ro->op_active && ro->op_notify && ro->op_notifier) { + ro->op_notifier->n_status = status; + list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); + ro->op_notifier = NULL; } /* No need to wake the app - caller does this */ @@ -521,7 +521,7 @@ __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status * So speed is not an issue here. */ struct rds_message *rds_send_get_message(struct rds_connection *conn, - struct rds_rdma_op *op) + struct rm_rdma_op *op) { struct rds_message *rm, *tmp, *found = NULL; unsigned long flags; @@ -529,7 +529,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, spin_lock_irqsave(&conn->c_lock, flags); list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { - if (&rm->rdma.m_rdma_op == op) { + if (&rm->rdma == op) { atomic_inc(&rm->m_refcount); found = rm; goto out; @@ -537,7 +537,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn, } list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { - if (&rm->rdma.m_rdma_op == op) { + if (&rm->rdma == op) { atomic_inc(&rm->m_refcount); found = rm; break; @@ -597,20 +597,20 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) spin_lock(&rs->rs_lock); if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { - struct rds_rdma_op *ro = &rm->rdma.m_rdma_op; + struct rm_rdma_op *ro = &rm->rdma; struct rds_notifier *notifier; list_del_init(&rm->m_sock_item); rds_send_sndbuf_remove(rs, rm); - if (ro->r_active && ro->r_notifier && - (ro->r_notify || (ro->r_recverr && status))) { - notifier = ro->r_notifier; + if (ro->op_active && ro->op_notifier && + (ro->op_notify || (ro->op_recverr && status))) { + notifier = ro->op_notifier; list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); if (!notifier->n_status) notifier->n_status = status; - rm->rdma.m_rdma_op.r_notifier = NULL; + rm->rdma.op_notifier = NULL; } was_on_sock = 1; rm->m_rs = NULL; @@ -987,11 +987,11 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, if (ret) goto out; - if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) && - !conn->c_trans->xmit_rdma) { + if ((rm->m_rdma_cookie || rm->rdma.op_active) && + !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", - &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); + &rm->rdma, conn->c_trans->xmit_rdma); ret = -EOPNOTSUPP; goto out; } -- cgit v1.2.3-18-g5258 From 6c7cc6e4694dc464ae884332f2a322973497e3cf Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Wed, 27 Jan 2010 18:04:18 -0800 Subject: RDS: Rename data op members prefix from m_ to op_ For consistency. Signed-off-by: Andy Grover --- net/rds/send.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 08df279ced2..d60d3130903 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -166,7 +166,7 @@ int rds_send_xmit(struct rds_connection *conn) rm = conn->c_xmit_rm; if (rm && conn->c_xmit_hdr_off == sizeof(struct rds_header) && - conn->c_xmit_sg == rm->data.m_nents) { + conn->c_xmit_sg == rm->data.op_nents) { conn->c_xmit_rm = NULL; conn->c_xmit_sg = 0; conn->c_xmit_hdr_off = 0; @@ -296,7 +296,7 @@ int rds_send_xmit(struct rds_connection *conn) if (rm->data.op_active && (conn->c_xmit_hdr_off < sizeof(struct rds_header) || - conn->c_xmit_sg < rm->data.m_nents)) { + conn->c_xmit_sg < rm->data.op_nents)) { ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, @@ -312,7 +312,7 @@ int rds_send_xmit(struct rds_connection *conn) ret -= tmp; } - sg = &rm->data.m_sg[conn->c_xmit_sg]; + sg = &rm->data.op_sg[conn->c_xmit_sg]; while (ret) { tmp = min_t(int, ret, sg->length - conn->c_xmit_data_off); @@ -323,7 +323,7 @@ int rds_send_xmit(struct rds_connection *conn) sg++; conn->c_xmit_sg++; BUG_ON(ret != 0 && - conn->c_xmit_sg == rm->data.m_nents); + conn->c_xmit_sg == rm->data.op_nents); } } } @@ -959,7 +959,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; } - rm->data.m_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); + rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); /* XXX fix this to not allocate memory */ ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); if (ret) -- cgit v1.2.3-18-g5258 From 5b2366bd2835919e2e6a836e837eab4a9274bd46 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Wed, 3 Feb 2010 19:36:44 -0800 Subject: RDS: Rewrite rds_send_xmit Simplify rds_send_xmit(). Send a congestion map (via xmit_cong_map) without decrementing send_quota. Move resetting of conn xmit variables to end of loop. Update comments. Implement a special case to turn off sending an rds header when there is an atomic op and no other data. Signed-off-by: Andy Grover --- net/rds/send.c | 131 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 71 insertions(+), 60 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index d60d3130903..66dc6b04526 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -72,8 +72,9 @@ void rds_send_reset(struct rds_connection *conn) conn->c_xmit_sg = 0; conn->c_xmit_hdr_off = 0; conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; conn->c_xmit_atomic_sent = 0; + conn->c_xmit_rdma_sent = 0; + conn->c_xmit_data_sent = 0; conn->c_map_queued = 0; @@ -137,69 +138,54 @@ int rds_send_xmit(struct rds_connection *conn) /* * spin trying to push headers and data down the connection until - * the connection doens't make forward progress. + * the connection doesn't make forward progress. */ while (--send_quota) { - /* - * See if need to send a congestion map update if we're - * between sending messages. The send_sem protects our sole - * use of c_map_offset and _bytes. - * Note this is used only by transports that define a special - * xmit_cong_map function. For all others, we create allocate - * a cong_map message and treat it just like any other send. - */ - if (conn->c_map_bytes) { - ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, - conn->c_map_offset); - if (ret <= 0) - break; - conn->c_map_offset += ret; - conn->c_map_bytes -= ret; - if (conn->c_map_bytes) - continue; - } - - /* If we're done sending the current message, clear the - * offset and S/G temporaries. - */ rm = conn->c_xmit_rm; - if (rm && - conn->c_xmit_hdr_off == sizeof(struct rds_header) && - conn->c_xmit_sg == rm->data.op_nents) { - conn->c_xmit_rm = NULL; - conn->c_xmit_sg = 0; - conn->c_xmit_hdr_off = 0; - conn->c_xmit_data_off = 0; - conn->c_xmit_rdma_sent = 0; - conn->c_xmit_atomic_sent = 0; - - /* Release the reference to the previous message. */ - rds_message_put(rm); - rm = NULL; - } - /* If we're asked to send a cong map update, do so. + /* + * If between sending messages, we can send a pending congestion + * map update. + * + * Transports either define a special xmit_cong_map function, + * or we allocate a cong_map message and treat it just like any + * other send. */ if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { if (conn->c_trans->xmit_cong_map) { - conn->c_map_offset = 0; - conn->c_map_bytes = sizeof(struct rds_header) + + unsigned long map_offset = 0; + unsigned long map_bytes = sizeof(struct rds_header) + RDS_CONG_MAP_BYTES; - continue; - } - rm = rds_cong_update_alloc(conn); - if (IS_ERR(rm)) { - ret = PTR_ERR(rm); - break; - } + while (map_bytes) { + ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, + map_offset); + if (ret <= 0) { + /* too far down the rabbithole! */ + mutex_unlock(&conn->c_send_lock); + rds_conn_error(conn, "Cong map xmit failed\n"); + goto out; + } + + map_offset += ret; + map_bytes -= ret; + } + } else { + /* send cong update like a normal rm */ + rm = rds_cong_update_alloc(conn); + if (IS_ERR(rm)) { + ret = PTR_ERR(rm); + break; + } + rm->data.op_active = 1; - conn->c_xmit_rm = rm; + conn->c_xmit_rm = rm; + } } /* - * Grab the next message from the send queue, if there is one. + * If not already working on one, grab the next message. * * c_xmit_rm holds a ref while we're sending this message down * the connction. We can use this ref while holding the @@ -264,7 +250,6 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_rm = rm; } - if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { ret = conn->c_trans->xmit_atomic(conn, rm); if (ret) @@ -273,13 +258,20 @@ int rds_send_xmit(struct rds_connection *conn) /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); + + /* + * This is evil, muahaha. + * We permit 0-byte sends. (rds-ping depends on this.) + * BUT if there is an atomic op and no sent data, + * we turn off sending the header, to achieve + * "silent" atomics. + * But see below; RDMA op might toggle this back on! + */ + if (rm->data.op_nents == 0) + rm->data.op_active = 0; } - /* - * Try and send an rdma message. Let's see if we can - * keep this simple and require that the transport either - * send the whole rdma or none of it. - */ + /* The transport either sends the whole rdma or none of it */ if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); if (ret) @@ -294,9 +286,7 @@ int rds_send_xmit(struct rds_connection *conn) set_bit(RDS_MSG_MAPPED, &rm->m_flags); } - if (rm->data.op_active - && (conn->c_xmit_hdr_off < sizeof(struct rds_header) || - conn->c_xmit_sg < rm->data.op_nents)) { + if (rm->data.op_active && !conn->c_xmit_data_sent) { ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, @@ -326,6 +316,27 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_sg == rm->data.op_nents); } } + + if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && + (conn->c_xmit_sg == rm->data.op_nents)) + conn->c_xmit_data_sent = 1; + } + + /* + * A rm will only take multiple times through this loop + * if there is a data op. Thus, if the data is sent (or there was + * none), then we're done with the rm. + */ + if (!rm->data.op_active || conn->c_xmit_data_sent) { + conn->c_xmit_rm = NULL; + conn->c_xmit_sg = 0; + conn->c_xmit_hdr_off = 0; + conn->c_xmit_data_off = 0; + conn->c_xmit_rdma_sent = 0; + conn->c_xmit_atomic_sent = 0; + conn->c_xmit_data_sent = 0; + + rds_message_put(rm); } } @@ -350,7 +361,7 @@ int rds_send_xmit(struct rds_connection *conn) */ mutex_unlock(&conn->c_send_lock); - if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { + if (send_quota == 0 && !was_empty) { /* We exhausted the send quota, but there's work left to * do. Return and (re-)schedule the send worker. */ -- cgit v1.2.3-18-g5258 From 372cd7dedfd1ea93a9ae8d9c282e910dc1b76773 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Wed, 3 Feb 2010 19:40:32 -0800 Subject: RDS: Do not set op_active in r_m_copy_from_user(). Do not allocate sgs for data for 0-length datagrams Set data.op_active in rds_sendmsg() instead of rds_message_copy_from_user(). Signed-off-by: Andy Grover --- net/rds/send.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 66dc6b04526..ad89a63c430 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -970,11 +970,14 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, goto out; } - rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); - /* XXX fix this to not allocate memory */ - ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); - if (ret) - goto out; + /* Attach data to the rm */ + if (payload_len) { + rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); + ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len); + if (ret) + goto out; + } + rm->data.op_active = 1; rm->m_daddr = daddr; -- cgit v1.2.3-18-g5258 From 940786eb0a0faf3f30898a1cc7c1540d54c1aff6 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Fri, 19 Feb 2010 18:04:58 -0800 Subject: RDS: queue failure notifications for dropped atomic ops When dropping ops in the send queue, we notify the client of failed rdma ops they asked for notifications on, but not atomic ops. It should be for both. Signed-off-by: Andy Grover --- net/rds/send.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index ad89a63c430..cdca9747fcb 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -512,9 +512,10 @@ EXPORT_SYMBOL_GPL(rds_atomic_send_complete); * socket, socket lock) and can just move the notifier. */ static inline void -__rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) +__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) { struct rm_rdma_op *ro; + struct rm_atomic_op *ao; ro = &rm->rdma; if (ro->op_active && ro->op_notify && ro->op_notifier) { @@ -523,6 +524,13 @@ __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status ro->op_notifier = NULL; } + ao = &rm->atomic; + if (ao->op_active && ao->op_notify && ao->op_notifier) { + ao->op_notifier->n_status = status; + list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); + ao->op_notifier = NULL; + } + /* No need to wake the app - caller does this */ } @@ -733,7 +741,7 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) spin_lock_irqsave(&rm->m_rs_lock, flags); spin_lock(&rs->rs_lock); - __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED); + __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); spin_unlock(&rs->rs_lock); rm->m_rs = NULL; -- cgit v1.2.3-18-g5258 From 2c3a5f9abb1dc5efdab8ba9a568b1661c65fd1e3 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Mon, 1 Mar 2010 16:10:40 -0800 Subject: RDS: Add flag for silent ops. Do atomic op before RDMA Add a flag to the API so users can indicate they want silent operations. This is needed because silent ops cannot be used with USE_ONCE MRs, so we can't just assume silent. Also, change send_xmit to do atomic op before rdma op if both are present, and centralize the hairy logic to determine if we want to attempt silent, or not. Signed-off-by: Andy Grover --- net/rds/send.c | 55 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 24 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index cdca9747fcb..38567f3ee7e 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -250,42 +250,50 @@ int rds_send_xmit(struct rds_connection *conn) conn->c_xmit_rm = rm; } - if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { - ret = conn->c_trans->xmit_atomic(conn, rm); + /* The transport either sends the whole rdma or none of it */ + if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); if (ret) break; - conn->c_xmit_atomic_sent = 1; + conn->c_xmit_rdma_sent = 1; + /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); - - /* - * This is evil, muahaha. - * We permit 0-byte sends. (rds-ping depends on this.) - * BUT if there is an atomic op and no sent data, - * we turn off sending the header, to achieve - * "silent" atomics. - * But see below; RDMA op might toggle this back on! - */ - if (rm->data.op_nents == 0) - rm->data.op_active = 0; } - /* The transport either sends the whole rdma or none of it */ - if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { - ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); + if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { + ret = conn->c_trans->xmit_atomic(conn, rm); if (ret) break; - conn->c_xmit_rdma_sent = 1; - - /* rdmas need data sent, even if just the header */ - rm->data.op_active = 1; - + conn->c_xmit_atomic_sent = 1; /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); } + /* + * A number of cases require an RDS header to be sent + * even if there is no data. + * We permit 0-byte sends; rds-ping depends on this. + * However, if there are exclusively attached silent ops, + * we skip the hdr/data send, to enable silent operation. + */ + if (rm->data.op_nents == 0) { + int ops_present; + int all_ops_are_silent = 1; + + ops_present = (rm->atomic.op_active || rm->rdma.op_active); + if (rm->atomic.op_active && !rm->atomic.op_silent) + all_ops_are_silent = 0; + if (rm->rdma.op_active && !rm->rdma.op_silent) + all_ops_are_silent = 0; + + if (ops_present && all_ops_are_silent + && !rm->m_rdma_cookie) + rm->data.op_active = 0; + } + if (rm->data.op_active && !conn->c_xmit_data_sent) { ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, @@ -1009,8 +1017,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, if (ret) goto out; - if ((rm->m_rdma_cookie || rm->rdma.op_active) && - !conn->c_trans->xmit_rdma) { + if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { if (printk_ratelimit()) printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", &rm->rdma, conn->c_trans->xmit_rdma); -- cgit v1.2.3-18-g5258 From aa0a4ef4ac3a3c5ffa35e32520bfbc0922ef3630 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 13 Apr 2010 12:00:35 -0700 Subject: RDS: Make sure cmsgs aren't used in improper ways It hasn't cropped up in the field, but this code ensures it is impossible to issue operations that pass an rdma cookie (DEST, MAP) in the same sendmsg call that's actually initiating rdma or atomic ops. Disallowing this perverse-but-technically-allowed usage makes silent RDMA heuristics slightly easier. Signed-off-by: Andy Grover --- net/rds/send.c | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 38567f3ee7e..69ab1040d02 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -846,6 +846,7 @@ static int rds_rm_size(struct msghdr *msg, int data_len) { struct cmsghdr *cmsg; int size = 0; + int cmsg_groups = 0; int retval; for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { @@ -857,19 +858,23 @@ static int rds_rm_size(struct msghdr *msg, int data_len) switch (cmsg->cmsg_type) { case RDS_CMSG_RDMA_ARGS: + cmsg_groups |= 1; retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); if (retval < 0) return retval; size += retval; + break; case RDS_CMSG_RDMA_DEST: case RDS_CMSG_RDMA_MAP: + cmsg_groups |= 2; /* these are valid but do no add any size */ break; case RDS_CMSG_ATOMIC_CSWP: case RDS_CMSG_ATOMIC_FADD: + cmsg_groups |= 1; size += sizeof(struct scatterlist); break; @@ -881,6 +886,10 @@ static int rds_rm_size(struct msghdr *msg, int data_len) size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); + /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ + if (cmsg_groups == 3) + return -EINVAL; + return size; } -- cgit v1.2.3-18-g5258 From ff3d7d36134ef7138803734fdbf91cc986ea7976 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Mon, 1 Mar 2010 14:03:09 -0800 Subject: RDS: Perform unmapping ops in stages Previously, RDS would wait until the final send WR had completed and then handle cleanup. With silent ops, we do not know if an atomic, rdma, or data op will be last. This patch handles any of these cases by keeping a pointer to the last op in the message in m_last_op. When the TX completion event fires, rds dispatches to per-op-type cleanup functions, and then does whole-message cleanup, if the last op equalled m_last_op. This patch also moves towards having op-specific functions take the op struct, instead of the overall rm struct. rds_ib_connection has a pointer to keep track of a a partially- completed data send operation. This patch changes it from an rds_message pointer to the narrower rm_data_op pointer, and modifies places that use this pointer as needed. Signed-off-by: Andy Grover --- net/rds/send.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 69ab1040d02..d1f364e44e3 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -252,6 +252,7 @@ int rds_send_xmit(struct rds_connection *conn) /* The transport either sends the whole rdma or none of it */ if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + rm->m_final_op = &rm->rdma; ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); if (ret) break; @@ -263,10 +264,12 @@ int rds_send_xmit(struct rds_connection *conn) } if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { - ret = conn->c_trans->xmit_atomic(conn, rm); + rm->m_final_op = &rm->atomic; + ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); if (ret) break; conn->c_xmit_atomic_sent = 1; + /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); @@ -295,6 +298,7 @@ int rds_send_xmit(struct rds_connection *conn) } if (rm->data.op_active && !conn->c_xmit_data_sent) { + rm->m_final_op = &rm->data; ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, -- cgit v1.2.3-18-g5258 From 77dd550e5547846604ff6f90c4dc6bba4414e485 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Mon, 22 Mar 2010 15:22:04 -0700 Subject: RDS: Stop supporting old cong map sending method We now ask the transport to give us a rm for the congestion map, and then we handle it normally. Previously, the transport defined a function that we would call to send a congestion map. Convert TCP and loop transports to new cong map method. Signed-off-by: Andy Grover --- net/rds/send.c | 39 +++++++-------------------------------- 1 file changed, 7 insertions(+), 32 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index d1f364e44e3..8a0647af5d9 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -147,41 +147,16 @@ int rds_send_xmit(struct rds_connection *conn) /* * If between sending messages, we can send a pending congestion * map update. - * - * Transports either define a special xmit_cong_map function, - * or we allocate a cong_map message and treat it just like any - * other send. */ if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { - if (conn->c_trans->xmit_cong_map) { - unsigned long map_offset = 0; - unsigned long map_bytes = sizeof(struct rds_header) + - RDS_CONG_MAP_BYTES; - - while (map_bytes) { - ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong, - map_offset); - if (ret <= 0) { - /* too far down the rabbithole! */ - mutex_unlock(&conn->c_send_lock); - rds_conn_error(conn, "Cong map xmit failed\n"); - goto out; - } - - map_offset += ret; - map_bytes -= ret; - } - } else { - /* send cong update like a normal rm */ - rm = rds_cong_update_alloc(conn); - if (IS_ERR(rm)) { - ret = PTR_ERR(rm); - break; - } - rm->data.op_active = 1; - - conn->c_xmit_rm = rm; + rm = rds_cong_update_alloc(conn); + if (IS_ERR(rm)) { + ret = PTR_ERR(rm); + break; } + rm->data.op_active = 1; + + conn->c_xmit_rm = rm; } /* -- cgit v1.2.3-18-g5258 From 049ee3f500954176a87f22e6ee3e98aecb1b8958 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 23 Mar 2010 17:39:07 -0700 Subject: RDS: Change send lock from a mutex to a spinlock This change allows us to call rds_send_xmit() from a tasklet, which is crucial to our new operating model. * Change c_send_lock to a spinlock * Update stats fields "sem_" to "_lock" * Remove unneeded rds_conn_is_sending() About locking between shutdown and send -- send checks if the connection is up. Shutdown puts the connection into DISCONNECTING. After this, all threads entering send will exit immediately. However, a thread could be *in* send_xmit(), so shutdown acquires the c_send_lock to ensure everyone is out before proceeding with connection shutdown. Signed-off-by: Andy Grover --- net/rds/send.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 8a0647af5d9..d4feec6ad09 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -116,19 +116,18 @@ int rds_send_xmit(struct rds_connection *conn) int was_empty = 0; LIST_HEAD(to_be_dropped); + if (!rds_conn_up(conn)) + goto out; + /* * sendmsg calls here after having queued its message on the send * queue. We only have one task feeding the connection at a time. If * another thread is already feeding the queue then we back off. This * avoids blocking the caller and trading per-connection data between * caches per message. - * - * The sem holder will issue a retry if they notice that someone queued - * a message after they stopped walking the send queue but before they - * dropped the sem. */ - if (!mutex_trylock(&conn->c_send_lock)) { - rds_stats_inc(s_send_sem_contention); + if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) { + rds_stats_inc(s_send_lock_contention); ret = -ENOMEM; goto out; } @@ -346,7 +345,7 @@ int rds_send_xmit(struct rds_connection *conn) * stop processing the loop when the transport hasn't taken * responsibility for forward progress. */ - mutex_unlock(&conn->c_send_lock); + spin_unlock_irqrestore(&conn->c_send_lock, flags); if (send_quota == 0 && !was_empty) { /* We exhausted the send quota, but there's work left to @@ -360,7 +359,7 @@ int rds_send_xmit(struct rds_connection *conn) * spin lock */ spin_lock_irqsave(&conn->c_lock, flags); if (!list_empty(&conn->c_send_queue)) { - rds_stats_inc(s_send_sem_queue_raced); + rds_stats_inc(s_send_lock_queue_raced); ret = -EAGAIN; } spin_unlock_irqrestore(&conn->c_lock, flags); -- cgit v1.2.3-18-g5258 From 2ad8099b58f274dc23bc866ca259d7e5db87fa1a Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Tue, 23 Mar 2010 17:48:04 -0700 Subject: RDS: rds_send_xmit() locking/irq fixes rds_message_put() cannot be called with irqs off, so move it after irqs are re-enabled. Spinlocks throughout the function do not to use _irqsave because the lock of c_send_lock at top already disabled irqs. Signed-off-by: Andy Grover --- net/rds/send.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index d4feec6ad09..624a3dc7f06 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -168,7 +168,7 @@ int rds_send_xmit(struct rds_connection *conn) if (!rm) { unsigned int len; - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock(&conn->c_lock); if (!list_empty(&conn->c_send_queue)) { rm = list_entry(conn->c_send_queue.next, @@ -183,7 +183,7 @@ int rds_send_xmit(struct rds_connection *conn) list_move_tail(&rm->m_conn_item, &conn->c_retrans); } - spin_unlock_irqrestore(&conn->c_lock, flags); + spin_unlock(&conn->c_lock); if (!rm) { was_empty = 1; @@ -199,11 +199,10 @@ int rds_send_xmit(struct rds_connection *conn) */ if (rm->rdma.op_active && test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { - spin_lock_irqsave(&conn->c_lock, flags); + spin_lock(&conn->c_lock); if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) list_move(&rm->m_conn_item, &to_be_dropped); - spin_unlock_irqrestore(&conn->c_lock, flags); - rds_message_put(rm); + spin_unlock(&conn->c_lock); continue; } @@ -326,10 +325,6 @@ int rds_send_xmit(struct rds_connection *conn) } } - /* Nuke any messages we decided not to retransmit. */ - if (!list_empty(&to_be_dropped)) - rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); - if (conn->c_trans->xmit_complete) conn->c_trans->xmit_complete(conn); @@ -347,6 +342,14 @@ int rds_send_xmit(struct rds_connection *conn) */ spin_unlock_irqrestore(&conn->c_send_lock, flags); + /* Nuke any messages we decided not to retransmit. */ + if (!list_empty(&to_be_dropped)) { + /* irqs on here, so we can put(), unlike above */ + list_for_each_entry(rm, &to_be_dropped, m_conn_item) + rds_message_put(rm); + rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); + } + if (send_quota == 0 && !was_empty) { /* We exhausted the send quota, but there's work left to * do. Return and (re-)schedule the send worker. -- cgit v1.2.3-18-g5258 From a7d3a281483684f77e350b045af7f80a149fc4c7 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Mon, 29 Mar 2010 16:20:18 -0700 Subject: RDS: Call rds_send_xmit() directly from sendmsg() rds_sendmsg() is calling the send worker function to send the just-queued datagrams, presumably because it wants the behavior where anything not sent will re-call the send worker. We now ensure all queued datagrams are sent by retrying from the send completion handler, so this isn't needed any more. Signed-off-by: Andy Grover --- net/rds/send.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 624a3dc7f06..15b715a85fd 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -1073,7 +1073,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg, rds_stats_inc(s_send_queued); if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - rds_send_worker(&conn->c_send_w.work); + rds_send_xmit(conn); rds_message_put(rm); return payload_len; -- cgit v1.2.3-18-g5258 From cf4b7389ee812817deeb11da1422004e01b50646 Mon Sep 17 00:00:00 2001 From: Andy Grover Date: Mon, 29 Mar 2010 16:50:54 -0700 Subject: RDS: Fix locking in send on m_rs_lock Do not nest m_rs_lock under c_lock Disable interrupts in {rdma,atomic}_send_complete Signed-off-by: Andy Grover --- net/rds/send.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'net/rds/send.c') diff --git a/net/rds/send.c b/net/rds/send.c index 15b715a85fd..ecda3e6c432 100644 --- a/net/rds/send.c +++