diff options
Diffstat (limited to 'net/9p/trans_rdma.c')
| -rw-r--r-- | net/9p/trans_rdma.c | 182 | 
1 files changed, 113 insertions, 69 deletions
diff --git a/net/9p/trans_rdma.c b/net/9p/trans_rdma.c index 17c5ba7551a..14ad43b5cf8 100644 --- a/net/9p/trans_rdma.c +++ b/net/9p/trans_rdma.c @@ -26,6 +26,8 @@   *   */ +#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt +  #include <linux/in.h>  #include <linux/module.h>  #include <linux/net.h> @@ -55,11 +57,8 @@  #define P9_RDMA_IRD		0  #define P9_RDMA_ORD		0  #define P9_RDMA_TIMEOUT		30000		/* 30 seconds */ -#define P9_RDMA_MAXSIZE		(4*4096)	/* Min SGE is 4, so we can -						 * safely advertise a maxsize -						 * of 64k */ +#define P9_RDMA_MAXSIZE		(1024*1024)	/* 1MB */ -#define P9_RDMA_MAX_SGE (P9_RDMA_MAXSIZE >> PAGE_SHIFT)  /**   * struct p9_trans_rdma - RDMA transport instance   * @@ -74,7 +73,9 @@   * @sq_depth: The depth of the Send Queue   * @sq_sem: Semaphore for the SQ   * @rq_depth: The depth of the Receive Queue. - * @rq_count: Count of requests in the Receive Queue. + * @rq_sem: Semaphore for the RQ + * @excess_rc : Amount of posted Receive Contexts without a pending request. + *		See rdma_request()   * @addr: The remote peer's address   * @req_lock: Protects the active request list   * @cm_done: Completion event for connection management tracking @@ -99,7 +100,8 @@ struct p9_trans_rdma {  	int sq_depth;  	struct semaphore sq_sem;  	int rq_depth; -	atomic_t rq_count; +	struct semaphore rq_sem; +	atomic_t excess_rc;  	struct sockaddr_in addr;  	spinlock_t req_lock; @@ -168,7 +170,6 @@ static int parse_opts(char *params, struct p9_rdma_opts *opts)  	substring_t args[MAX_OPT_ARGS];  	int option;  	char *options, *tmp_options; -	int ret;  	opts->port = P9_PORT;  	opts->sq_depth = P9_RDMA_SQ_DEPTH; @@ -180,8 +181,8 @@ static int parse_opts(char *params, struct p9_rdma_opts *opts)  	tmp_options = kstrdup(params, GFP_KERNEL);  	if (!tmp_options) { -		P9_DPRINTK(P9_DEBUG_ERROR, -			   "failed to allocate copy of option string\n"); +		p9_debug(P9_DEBUG_ERROR, +			 "failed to allocate copy of option string\n");  		return -ENOMEM;  	}  	options = tmp_options; @@ -192,11 +193,12 @@ static int parse_opts(char *params, struct p9_rdma_opts *opts)  		if (!*p)  			continue;  		token = match_token(p, tokens, args); +		if (token == Opt_err) +			continue;  		r = match_int(&args[0], &option);  		if (r < 0) { -			P9_DPRINTK(P9_DEBUG_ERROR, -				   "integer field, but no integer?\n"); -			ret = r; +			p9_debug(P9_DEBUG_ERROR, +				 "integer field, but no integer?\n");  			continue;  		}  		switch (token) { @@ -297,15 +299,20 @@ handle_recv(struct p9_client *client, struct p9_trans_rdma *rdma,  	if (!req)  		goto err_out; +	/* Check that we have not yet received a reply for this request. +	 */ +	if (unlikely(req->rc)) { +		pr_err("Duplicate reply for request %d", tag); +		goto err_out; +	} +  	req->rc = c->rc; -	req->status = REQ_STATUS_RCVD; -	p9_client_cb(client, req); +	p9_client_cb(client, req, REQ_STATUS_RCVD);  	return;   err_out: -	P9_DPRINTK(P9_DEBUG_ERROR, "req %p err %d status %d\n", -		   req, err, status); +	p9_debug(P9_DEBUG_ERROR, "req %p err %d status %d\n", req, err, status);  	rdma->state = P9_RDMA_FLUSHING;  	client->status = Disconnected;  } @@ -321,8 +328,8 @@ handle_send(struct p9_client *client, struct p9_trans_rdma *rdma,  static void qp_event_handler(struct ib_event *event, void *context)  { -	P9_DPRINTK(P9_DEBUG_ERROR, "QP event %d context %p\n", event->event, -								context); +	p9_debug(P9_DEBUG_ERROR, "QP event %d context %p\n", +		 event->event, context);  }  static void cq_comp_handler(struct ib_cq *cq, void *cq_context) @@ -338,8 +345,8 @@ static void cq_comp_handler(struct ib_cq *cq, void *cq_context)  		switch (c->wc_op) {  		case IB_WC_RECV: -			atomic_dec(&rdma->rq_count);  			handle_recv(client, rdma, c, wc.status, wc.byte_len); +			up(&rdma->rq_sem);  			break;  		case IB_WC_SEND: @@ -348,8 +355,7 @@ static void cq_comp_handler(struct ib_cq *cq, void *cq_context)  			break;  		default: -			printk(KERN_ERR "9prdma: unexpected completion type, " -			       "c->wc_op=%d, wc.opcode=%d, status=%d\n", +			pr_err("unexpected completion type, c->wc_op=%d, wc.opcode=%d, status=%d\n",  			       c->wc_op, wc.opcode, wc.status);  			break;  		} @@ -359,7 +365,7 @@ static void cq_comp_handler(struct ib_cq *cq, void *cq_context)  static void cq_event_handler(struct ib_event *e, void *v)  { -	P9_DPRINTK(P9_DEBUG_ERROR, "CQ event %d context %p\n", e->event, v); +	p9_debug(P9_DEBUG_ERROR, "CQ event %d context %p\n", e->event, v);  }  static void rdma_destroy_trans(struct p9_trans_rdma *rdma) @@ -410,7 +416,7 @@ post_recv(struct p9_client *client, struct p9_rdma_context *c)  	return ib_post_recv(rdma->qp, &wr, &bad_wr);   error: -	P9_DPRINTK(P9_DEBUG_ERROR, "EIO\n"); +	p9_debug(P9_DEBUG_ERROR, "EIO\n");  	return -EIO;  } @@ -424,32 +430,33 @@ static int rdma_request(struct p9_client *client, struct p9_req_t *req)  	struct p9_rdma_context *c = NULL;  	struct p9_rdma_context *rpl_context = NULL; +	/* When an error occurs between posting the recv and the send, +	 * there will be a receive context posted without a pending request. +	 * Since there is no way to "un-post" it, we remember it and skip +	 * post_recv() for the next request. +	 * So here, +	 * see if we are this `next request' and need to absorb an excess rc. +	 * If yes, then drop and free our own, and do not recv_post(). +	 **/ +	if (unlikely(atomic_read(&rdma->excess_rc) > 0)) { +		if ((atomic_sub_return(1, &rdma->excess_rc) >= 0)) { +			/* Got one ! */ +			kfree(req->rc); +			req->rc = NULL; +			goto dont_need_post_recv; +		} else { +			/* We raced and lost. */ +			atomic_inc(&rdma->excess_rc); +		} +	} +  	/* Allocate an fcall for the reply */ -	rpl_context = kmalloc(sizeof *rpl_context, GFP_KERNEL); +	rpl_context = kmalloc(sizeof *rpl_context, GFP_NOFS);  	if (!rpl_context) {  		err = -ENOMEM; -		goto err_close; -	} - -	/* -	 * If the request has a buffer, steal it, otherwise -	 * allocate a new one.  Typically, requests should already -	 * have receive buffers allocated and just swap them around -	 */ -	if (!req->rc) { -		req->rc = kmalloc(sizeof(struct p9_fcall)+client->msize, -								GFP_KERNEL); -		if (req->rc) { -			req->rc->sdata = (char *) req->rc + -						sizeof(struct p9_fcall); -			req->rc->capacity = client->msize; -		} +		goto recv_error;  	}  	rpl_context->rc = req->rc; -	if (!rpl_context->rc) { -		err = -ENOMEM; -		goto err_free2; -	}  	/*  	 * Post a receive buffer for this request. We need to ensure @@ -458,29 +465,35 @@ static int rdma_request(struct p9_client *client, struct p9_req_t *req)  	 * outstanding request, so we must keep a count to avoid  	 * overflowing the RQ.  	 */ -	if (atomic_inc_return(&rdma->rq_count) <= rdma->rq_depth) { -		err = post_recv(client, rpl_context); -		if (err) -			goto err_free1; -	} else -		atomic_dec(&rdma->rq_count); +	if (down_interruptible(&rdma->rq_sem)) { +		err = -EINTR; +		goto recv_error; +	} +	err = post_recv(client, rpl_context); +	if (err) { +		p9_debug(P9_DEBUG_FCALL, "POST RECV failed\n"); +		goto recv_error; +	}  	/* remove posted receive buffer from request structure */  	req->rc = NULL; +dont_need_post_recv:  	/* Post the request */ -	c = kmalloc(sizeof *c, GFP_KERNEL); +	c = kmalloc(sizeof *c, GFP_NOFS);  	if (!c) {  		err = -ENOMEM; -		goto err_free1; +		goto send_error;  	}  	c->req = req;  	c->busa = ib_dma_map_single(rdma->cm_id->device,  				    c->req->tc->sdata, c->req->tc->size,  				    DMA_TO_DEVICE); -	if (ib_dma_mapping_error(rdma->cm_id->device, c->busa)) -		goto error; +	if (ib_dma_mapping_error(rdma->cm_id->device, c->busa)) { +		err = -EIO; +		goto send_error; +	}  	sge.addr = c->busa;  	sge.length = c->req->tc->size; @@ -494,22 +507,38 @@ static int rdma_request(struct p9_client *client, struct p9_req_t *req)  	wr.sg_list = &sge;  	wr.num_sge = 1; -	if (down_interruptible(&rdma->sq_sem)) -		goto error; +	if (down_interruptible(&rdma->sq_sem)) { +		err = -EINTR; +		goto send_error; +	} -	return ib_post_send(rdma->qp, &wr, &bad_wr); +	/* Mark request as `sent' *before* we actually send it, +	 * because doing if after could erase the REQ_STATUS_RCVD +	 * status in case of a very fast reply. +	 */ +	req->status = REQ_STATUS_SENT; +	err = ib_post_send(rdma->qp, &wr, &bad_wr); +	if (err) +		goto send_error; - error: +	/* Success */ +	return 0; + + /* Handle errors that happened during or while preparing the send: */ + send_error: +	req->status = REQ_STATUS_ERROR;  	kfree(c); -	kfree(rpl_context->rc); -	kfree(rpl_context); -	P9_DPRINTK(P9_DEBUG_ERROR, "EIO\n"); -	return -EIO; - err_free1: -	kfree(rpl_context->rc); - err_free2: +	p9_debug(P9_DEBUG_ERROR, "Error %d in rdma_request()\n", err); + +	/* Ach. +	 *  We did recv_post(), but not send. We have one recv_post in excess. +	 */ +	atomic_inc(&rdma->excess_rc); +	return err; + + /* Handle errors that happened during or while preparing post_recv(): */ + recv_error:  	kfree(rpl_context); - err_close:  	spin_lock_irqsave(&rdma->req_lock, flags);  	if (rdma->state < P9_RDMA_CLOSING) {  		rdma->state = P9_RDMA_CLOSING; @@ -554,17 +583,30 @@ static struct p9_trans_rdma *alloc_rdma(struct p9_rdma_opts *opts)  	spin_lock_init(&rdma->req_lock);  	init_completion(&rdma->cm_done);  	sema_init(&rdma->sq_sem, rdma->sq_depth); -	atomic_set(&rdma->rq_count, 0); +	sema_init(&rdma->rq_sem, rdma->rq_depth); +	atomic_set(&rdma->excess_rc, 0);  	return rdma;  } -/* its not clear to me we can do anything after send has been posted */  static int rdma_cancel(struct p9_client *client, struct p9_req_t *req)  { +	/* Nothing to do here. +	 * We will take care of it (if we have to) in rdma_cancelled() +	 */  	return 1;  } +/* A request has been fully flushed without a reply. + * That means we have posted one buffer in excess. + */ +static int rdma_cancelled(struct p9_client *client, struct p9_req_t *req) +{ +	struct p9_trans_rdma *rdma = client->trans; +	atomic_inc(&rdma->excess_rc); +	return 0; +} +  /**   * trans_create_rdma - Transport method for creating atransport instance   * @client: client instance @@ -592,7 +634,8 @@ rdma_create_trans(struct p9_client *client, const char *addr, char *args)  		return -ENOMEM;  	/* Create the RDMA CM ID */ -	rdma->cm_id = rdma_create_id(p9_cm_event_handler, client, RDMA_PS_TCP); +	rdma->cm_id = rdma_create_id(p9_cm_event_handler, client, RDMA_PS_TCP, +				     IB_QPT_RC);  	if (IS_ERR(rdma->cm_id))  		goto error; @@ -697,6 +740,7 @@ static struct p9_trans_module p9_rdma_trans = {  	.close = rdma_close,  	.request = rdma_request,  	.cancel = rdma_cancel, +	.cancelled = rdma_cancelled,  };  /**  | 
