diff options
-rw-r--r-- | net/rds/ib.h | 7 | ||||
-rw-r--r-- | net/rds/ib_cm.c | 9 | ||||
-rw-r--r-- | net/rds/ib_send.c | 242 | ||||
-rw-r--r-- | net/rds/rds.h | 4 | ||||
-rw-r--r-- | net/rds/send.c | 6 |
5 files changed, 142 insertions, 126 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h index d64b5087eef..202140a84f0 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -53,8 +53,7 @@ struct rds_ib_connect_private { }; struct rds_ib_send_work { - struct rds_message *s_rm; - struct rm_rdma_op *s_op; + void *s_op; struct ib_send_wr s_wr; struct ib_sge s_sge[RDS_IB_MAX_SGE]; unsigned long s_queued; @@ -92,7 +91,7 @@ struct rds_ib_connection { /* tx */ struct rds_ib_work_ring i_send_ring; - struct rds_message *i_rm; + struct rm_data_op *i_data_op; struct rds_header *i_send_hdrs; u64 i_send_hdrs_dma; struct rds_ib_send_work *i_sends; @@ -336,7 +335,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits); void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, u32 *adv_credits, int need_posted, int max_posted); -int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm); +int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op); /* ib_stats.c */ DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 8b0c743c090..b5b5ebbc0bb 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -673,9 +673,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) BUG_ON(ic->rds_ibdev); /* Clear pending transmit */ - if (ic->i_rm) { - rds_message_put(ic->i_rm); - ic->i_rm = NULL; + if (ic->i_data_op) { + struct rds_message *rm; + + rm = container_of(ic->i_data_op, struct rds_message, data); + rds_message_put(rm); + ic->i_data_op = NULL; } /* Clear the ACK state */ diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index 95f15247acd..6461a152bd5 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm, complete(rm, notify_status); } -static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, - struct rds_ib_send_work *send, - int wc_status) +static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, + struct rm_data_op *op, + int wc_status) { - struct rds_message *rm = send->s_rm; - - rdsdebug("ic %p send %p rm %p\n", ic, send, rm); - - ib_dma_unmap_sg(ic->i_cm_id->device, - rm->data.op_sg, rm->data.op_nents, - DMA_TO_DEVICE); + if (op->op_nents) + ib_dma_unmap_sg(ic->i_cm_id->device, + op->op_sg, op->op_nents, + DMA_TO_DEVICE); +} - if (rm->rdma.op_active) { - struct rm_rdma_op *op = &rm->rdma; +static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, + struct rm_rdma_op *op, + int wc_status) +{ + if (op->op_mapped) { + ib_dma_unmap_sg(ic->i_cm_id->device, + op->op_sg, op->op_nents, + op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); + op->op_mapped = 0; + } - if (op->op_mapped) { - ib_dma_unmap_sg(ic->i_cm_id->device, - op->op_sg, op->op_nents, - op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); - op->op_mapped = 0; - } + /* If the user asked for a completion notification on this + * message, we can implement three different semantics: + * 1. Notify when we received the ACK on the RDS message + * that was queued with the RDMA. This provides reliable + * notification of RDMA status at the expense of a one-way + * packet delay. + * 2. Notify when the IB stack gives us the completion event for + * the RDMA operation. + * 3. Notify when the IB stack gives us the completion event for + * the accompanying RDS messages. + * Here, we implement approach #3. To implement approach #2, + * we would need to take an event for the rdma WR. To implement #1, + * don't call rds_rdma_send_complete at all, and fall back to the notify + * handling in the ACK processing code. + * + * Note: There's no need to explicitly sync any RDMA buffers using + * ib_dma_sync_sg_for_cpu - the completion for the RDMA + * operation itself unmapped the RDMA buffers, which takes care + * of synching. + */ + rds_ib_send_complete(container_of(op, struct rds_message, rdma), + wc_status, rds_rdma_send_complete); - /* If the user asked for a completion notification on this - * message, we can implement three different semantics: - * 1. Notify when we received the ACK on the RDS message - * that was queued with the RDMA. This provides reliable - * notification of RDMA status at the expense of a one-way - * packet delay. - * 2. Notify when the IB stack gives us the completion event for - * the RDMA operation. - * 3. Notify when the IB stack gives us the completion event for - * the accompanying RDS messages. - * Here, we implement approach #3. To implement approach #2, - * call rds_rdma_send_complete from the cq_handler. To implement #1, - * don't call rds_rdma_send_complete at all, and fall back to the notify - * handling in the ACK processing code. - * - * Note: There's no need to explicitly sync any RDMA buffers using - * ib_dma_sync_sg_for_cpu - the completion for the RDMA - * operation itself unmapped the RDMA buffers, which takes care - * of synching. - */ - rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete); + if (op->op_write) + rds_stats_add(s_send_rdma_bytes, op->op_bytes); + else + rds_stats_add(s_recv_rdma_bytes, op->op_bytes); +} - if (rm->rdma.op_write) - rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes); - else - rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes); +static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic, + struct rm_atomic_op *op, + int wc_status) +{ + /* unmap atomic recvbuf */ + if (op->op_mapped) { + ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1, + DMA_FROM_DEVICE); + op->op_mapped = 0; } - if (rm->atomic.op_active) { - struct rm_atomic_op *op = &rm->atomic; - - /* unmap atomic recvbuf */ - if (op->op_mapped) { - ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1, - DMA_FROM_DEVICE); - op->op_mapped = 0; - } + rds_ib_send_complete(container_of(op, struct rds_message, atomic), + wc_status, rds_atomic_send_complete); - rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete); + if (op->op_type == RDS_ATOMIC_TYPE_CSWP) + rds_stats_inc(s_atomic_cswp); + else + rds_stats_inc(s_atomic_fadd); +} - if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP) - rds_stats_inc(s_atomic_cswp); - else - rds_stats_inc(s_atomic_fadd); +/* + * Unmap the resources associated with a struct send_work. + * + * Returns the rm for no good reason other than it is unobtainable + * other than by switching on wr.opcode, currently, and the caller, + * the event handler, needs it. + */ +static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic, + struct rds_ib_send_work *send, + int wc_status) +{ + struct rds_message *rm = NULL; + + /* In the error case, wc.opcode sometimes contains garbage */ + switch (send->s_wr.opcode) { + case IB_WR_SEND: + if (send->s_op) { + rm = container_of(send->s_op, struct rds_message, data); + rds_ib_send_unmap_data(ic, send->s_op, wc_status); + } + break; + case IB_WR_RDMA_WRITE: + case IB_WR_RDMA_READ: + if (send->s_op) { + rm = container_of(send->s_op, struct rds_message, rdma); + rds_ib_send_unmap_rdma(ic, send->s_op, wc_status); + } + break; + case IB_WR_ATOMIC_FETCH_AND_ADD: + case IB_WR_ATOMIC_CMP_AND_SWP: + if (send->s_op) { + rm = container_of(send->s_op, struct rds_message, atomic); + rds_ib_send_unmap_atomic(ic, send->s_op, wc_status); + } + break; + default: + if (printk_ratelimit()) + printk(KERN_NOTICE + "RDS/IB: %s: unexpected opcode 0x%x in WR!\n", + __func__, send->s_wr.opcode); + break; } - /* If anyone waited for this message to get flushed out, wake - * them up now */ - rds_message_unmapped(rm); + send->s_wr.opcode = 0xdead; - rds_message_put(rm); - send->s_rm = NULL; + return rm; } void rds_ib_send_init_ring(struct rds_ib_connection *ic) @@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic) for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { struct ib_sge *sge; - send->s_rm = NULL; send->s_op = NULL; send->s_wr.wr_id = i; @@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic) u32 i; for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { - if (!send->s_rm || send->s_wr.opcode == 0xdead) - continue; - rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR); + if (send->s_op && send->s_wr.opcode != 0xdead) + rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR); } } @@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) { struct rds_connection *conn = context; struct rds_ib_connection *ic = conn->c_transport_data; + struct rds_message *rm = NULL; struct ib_wc wc; struct rds_ib_send_work *send; u32 completed; @@ -222,42 +263,18 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context) for (i = 0; i < completed; i++) { send = &ic->i_sends[oldest]; - /* In the error case, wc.opcode sometimes contains garbage */ - switch (send->s_wr.opcode) { - case IB_WR_SEND: - case IB_WR_RDMA_WRITE: - case IB_WR_RDMA_READ: - case IB_WR_ATOMIC_FETCH_AND_ADD: - case IB_WR_ATOMIC_CMP_AND_SWP: - if (send->s_rm) - rds_ib_send_unmap_rm(ic, send, wc.status); - break; - default: - if (printk_ratelimit()) - printk(KERN_NOTICE - "RDS/IB: %s: unexpected opcode 0x%x in WR!\n", - __func__, send->s_wr.opcode); - break; - } + rm = rds_ib_send_unmap_op(ic, send, wc.status); - send->s_wr.opcode = 0xdead; - send->s_wr.num_sge = 1; if (send->s_queued + HZ/2 < jiffies) rds_ib_stats_inc(s_ib_tx_stalled); - /* If a RDMA operation produced an error, signal this right - * away. If we don't, the subsequent SEND that goes with this - * RDMA will be canceled with ERR_WFLUSH, and the application - * never learn that the RDMA failed. */ - if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) { - struct rds_message *rm; - - rm = rds_send_get_message(conn, send->s_op); - if (rm) { - rds_ib_send_unmap_rm(ic, send, wc.status); - rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete); - rds_message_put(rm); - } + if (&send->s_op == &rm->m_final_op) { + /* If anyone waited for this message to get flushed out, wake + * them up now */ + rds_message_unmapped(rm); + + rds_message_put(rm); + send->s_op = NULL; } oldest = (oldest + 1) % ic->i_send_ring.w_nr; @@ -512,7 +529,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, } /* map the message the first time we see it */ - if (!ic->i_rm) { + if (!ic->i_data_op) { if (rm->data.op_nents) { rm->data.op_count = ib_dma_map_sg(dev, rm->data.op_sg, @@ -530,7 +547,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, } rds_message_addref(rm); - ic->i_rm = rm; + ic->i_data_op = &rm->data; /* Finalize the header */ if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) @@ -583,7 +600,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, send = &ic->i_sends[pos]; first = send; prev = NULL; - scat = &rm->data.op_sg[sg]; + scat = &ic->i_data_op->op_sg[sg]; i = 0; do { unsigned int len = 0; @@ -658,9 +675,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, /* if we finished the message then send completion owns it */ if (scat == &rm->data.op_sg[rm->data.op_count]) { - prev->s_rm = ic->i_rm; + prev->s_op = ic->i_data_op; prev->s_wr.send_flags |= IB_SEND_SOLICITED; - ic->i_rm = NULL; + ic->i_data_op = NULL; } /* Put back wrs & credits we didn't use */ @@ -681,9 +698,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); - if (prev->s_rm) { - ic->i_rm = prev->s_rm; - prev->s_rm = NULL; + if (prev->s_op) { + ic->i_data_op = prev->s_op; + prev->s_op = NULL; } rds_ib_conn_error(ic->conn, "ib_post_send failed\n"); @@ -701,10 +718,9 @@ out: * A simplified version of the rdma case, we always map 1 SG, and * only 8 bytes, for the return value from the atomic operation. */ -int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) +int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) { struct rds_ib_connection *ic = conn->c_transport_data; - struct rm_atomic_op *op = &rm->atomic; struct rds_ib_send_work *send = NULL; struct ib_send_wr *failed_wr; struct rds_ib_device *rds_ibdev; @@ -741,14 +757,6 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; send->s_wr.wr.atomic.rkey = op->op_rkey; - /* - * If there is no data or rdma ops in the message, then - * we must fill in s_rm ourselves, so we properly clean up - * on completion. - */ - if (!rm->rdma.op_active && !rm->data.op_active) - send->s_rm = rm; - /* map 8 byte retval buffer to the device */ ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); @@ -809,7 +817,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); - /* map the message the first time we see it */ + /* map the op the first time we see it */ if (!op->op_mapped) { op->op_count = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, op->op_nents, (op->op_write) ? diff --git a/net/rds/rds.h b/net/rds/rds.h index 23b921000e7..7291f006f36 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -308,6 +308,8 @@ struct rds_message { unsigned int m_used_sgs; unsigned int m_total_sgs; + void *m_final_op; + struct { struct rm_atomic_op { int op_type; @@ -421,7 +423,7 @@ struct rds_transport { int (*xmit_cong_map)(struct rds_connection *conn, struct rds_cong_map *map, unsigned long offset); int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); - int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm); + int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); int (*recv)(struct rds_connection *conn); int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, size_t size); diff --git a/net/rds/send.c b/net/rds/send.c index 69ab1040d02..d1f364e44e3 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -252,6 +252,7 @@ int rds_send_xmit(struct rds_connection *conn) /* The transport either sends the whole rdma or none of it */ if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { + rm->m_final_op = &rm->rdma; ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); if (ret) break; @@ -263,10 +264,12 @@ int rds_send_xmit(struct rds_connection *conn) } if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { - ret = conn->c_trans->xmit_atomic(conn, rm); + rm->m_final_op = &rm->atomic; + ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); if (ret) break; conn->c_xmit_atomic_sent = 1; + /* The transport owns the mapped memory for now. * You can't unmap it while it's on the send queue */ set_bit(RDS_MSG_MAPPED, &rm->m_flags); @@ -295,6 +298,7 @@ int rds_send_xmit(struct rds_connection *conn) } if (rm->data.op_active && !conn->c_xmit_data_sent) { + rm->m_final_op = &rm->data; ret = conn->c_trans->xmit(conn, rm, conn->c_xmit_hdr_off, conn->c_xmit_sg, |