summary refs log tree commit diff
path: root/net/sunrpc/xprtrdma/verbs.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r--net/sunrpc/xprtrdma/verbs.c131
1 files changed, 64 insertions, 67 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ec912cf9c618..1e965a380896 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -101,6 +101,12 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 	struct rdma_cm_id *id = ep->re_id;
 
+	/* Wait for rpcrdma_post_recvs() to leave its critical
+	 * section.
+	 */
+	if (atomic_inc_return(&ep->re_receiving) > 1)
+		wait_for_completion(&ep->re_done);
+
 	/* Flush Receives, then wait for deferred Reply work
 	 * to complete.
 	 */
@@ -114,22 +120,6 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 	rpcrdma_ep_put(ep);
 }
 
-/**
- * rpcrdma_qp_event_handler - Handle one QP event (error notification)
- * @event: details of the event
- * @context: ep that owns QP where event occurred
- *
- * Called from the RDMA provider (device driver) possibly in an interrupt
- * context. The QP is always destroyed before the ID, so the ID will be
- * reliably available when this handler is invoked.
- */
-static void rpcrdma_qp_event_handler(struct ib_event *event, void *context)
-{
-	struct rpcrdma_ep *ep = context;
-
-	trace_xprtrdma_qp_event(ep, event);
-}
-
 /* Ensure xprt_force_disconnect() is invoked exactly once when a
  * connection is closed or lost. (The important thing is it needs
  * to be invoked "at least" once).
@@ -205,7 +195,7 @@ static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 
 out_flushed:
 	rpcrdma_flush_disconnect(r_xprt, wc);
-	rpcrdma_rep_destroy(rep);
+	rpcrdma_rep_put(&r_xprt->rx_buf, rep);
 }
 
 static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
@@ -414,6 +404,7 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 	__module_get(THIS_MODULE);
 	device = id->device;
 	ep->re_id = id;
+	reinit_completion(&ep->re_done);
 
 	ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
 	ep->re_inline_send = xprt_rdma_max_inline_write;
@@ -424,8 +415,6 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 
 	r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
 
-	ep->re_attr.event_handler = rpcrdma_qp_event_handler;
-	ep->re_attr.qp_context = ep;
 	ep->re_attr.srq = NULL;
 	ep->re_attr.cap.max_inline_data = 0;
 	ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
@@ -535,7 +524,7 @@ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
 	 * outstanding Receives.
 	 */
 	rpcrdma_ep_get(ep);
-	rpcrdma_post_recvs(r_xprt, true);
+	rpcrdma_post_recvs(r_xprt, 1, true);
 
 	rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
 	if (rc)
@@ -954,13 +943,11 @@ static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
 		rpcrdma_req_reset(req);
 }
 
-/* No locking needed here. This function is called only by the
- * Receive completion handler.
- */
 static noinline
 struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
 				       bool temp)
 {
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_rep *rep;
 
 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
@@ -987,7 +974,10 @@ struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
 	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	rep->rr_recv_wr.num_sge = 1;
 	rep->rr_temp = temp;
-	list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
+
+	spin_lock(&buf->rb_lock);
+	list_add(&rep->rr_all, &buf->rb_all_reps);
+	spin_unlock(&buf->rb_lock);
 	return rep;
 
 out_free_regbuf:
@@ -998,16 +988,23 @@ out:
 	return NULL;
 }
 
-/* No locking needed here. This function is invoked only by the
- * Receive completion handler, or during transport shutdown.
- */
-static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
 {
-	list_del(&rep->rr_all);
 	rpcrdma_regbuf_free(rep->rr_rdmabuf);
 	kfree(rep);
 }
 
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+	struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf;
+
+	spin_lock(&buf->rb_lock);
+	list_del(&rep->rr_all);
+	spin_unlock(&buf->rb_lock);
+
+	rpcrdma_rep_free(rep);
+}
+
 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
 {
 	struct llist_node *node;
@@ -1019,12 +1016,21 @@ static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
 	return llist_entry(node, struct rpcrdma_rep, rr_node);
 }
 
-static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
-			    struct rpcrdma_rep *rep)
+/**
+ * rpcrdma_rep_put - Release rpcrdma_rep back to free list
+ * @buf: buffer pool
+ * @rep: rep to release
+ *
+ */
+void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
 {
 	llist_add(&rep->rr_node, &buf->rb_free_reps);
 }
 
+/* Caller must ensure the QP is quiescent (RQ is drained) before
+ * invoking this function, to guarantee rb_all_reps is not
+ * changing.
+ */
 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
@@ -1032,7 +1038,7 @@ static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
 
 	list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
 		rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
-		rep->rr_temp = true;
+		rep->rr_temp = true;	/* Mark this rep for destruction */
 	}
 }
 
@@ -1040,8 +1046,18 @@ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_rep *rep;
 
-	while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
-		rpcrdma_rep_destroy(rep);
+	spin_lock(&buf->rb_lock);
+	while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
+					       struct rpcrdma_rep,
+					       rr_all)) != NULL) {
+		list_del(&rep->rr_all);
+		spin_unlock(&buf->rb_lock);
+
+		rpcrdma_rep_free(rep);
+
+		spin_lock(&buf->rb_lock);
+	}
+	spin_unlock(&buf->rb_lock);
 }
 
 /**
@@ -1104,7 +1120,7 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req)
 		list_del(&mr->mr_all);
 		spin_unlock(&buf->rb_lock);
 
-		frwr_release_mr(mr);
+		frwr_mr_release(mr);
 	}
 
 	rpcrdma_regbuf_free(req->rl_recvbuf);
@@ -1135,7 +1151,7 @@ static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
 		list_del(&mr->mr_all);
 		spin_unlock(&buf->rb_lock);
 
-		frwr_release_mr(mr);
+		frwr_mr_release(mr);
 
 		spin_lock(&buf->rb_lock);
 	}
@@ -1221,17 +1237,6 @@ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 	spin_unlock(&buffers->rb_lock);
 }
 
-/**
- * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
- * @rep: rep to release
- *
- * Used after error conditions.
- */
-void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
-{
-	rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
-}
-
 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
  *
  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
@@ -1342,21 +1347,7 @@ static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
  */
 int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
-	struct ib_send_wr *send_wr = &req->rl_wr;
-	struct rpcrdma_ep *ep = r_xprt->rx_ep;
-	int rc;
-
-	if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) {
-		send_wr->send_flags |= IB_SEND_SIGNALED;
-		ep->re_send_count = ep->re_send_batch;
-	} else {
-		send_wr->send_flags &= ~IB_SEND_SIGNALED;
-		--ep->re_send_count;
-	}
-
-	trace_xprtrdma_post_send(req);
-	rc = frwr_send(r_xprt, req);
-	if (rc)
+	if (frwr_send(r_xprt, req))
 		return -ENOTCONN;
 	return 0;
 }
@@ -1364,27 +1355,30 @@ int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 /**
  * rpcrdma_post_recvs - Refill the Receive Queue
  * @r_xprt: controlling transport instance
- * @temp: mark Receive buffers to be deleted after use
+ * @needed: current credit grant
+ * @temp: mark Receive buffers to be deleted after one use
  *
  */
-void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 	struct ib_recv_wr *wr, *bad_wr;
 	struct rpcrdma_rep *rep;
-	int needed, count, rc;
+	int count, rc;
 
 	rc = 0;
 	count = 0;
 
-	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
 	if (likely(ep->re_receive_count > needed))
 		goto out;
 	needed -= ep->re_receive_count;
 	if (!temp)
 		needed += RPCRDMA_MAX_RECV_BATCH;
 
+	if (atomic_inc_return(&ep->re_receiving) > 1)
+		goto out;
+
 	/* fast path: all needed reps can be found on the free list */
 	wr = NULL;
 	while (needed) {
@@ -1410,6 +1404,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 
 	rc = ib_post_recv(ep->re_id->qp, wr,
 			  (const struct ib_recv_wr **)&bad_wr);
+	if (atomic_dec_return(&ep->re_receiving) > 0)
+		complete(&ep->re_done);
+
 out:
 	trace_xprtrdma_post_recvs(r_xprt, count, rc);
 	if (rc) {
@@ -1418,7 +1415,7 @@ out:
 
 			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
 			wr = wr->next;
-			rpcrdma_recv_buffer_put(rep);
+			rpcrdma_rep_put(buf, rep);
 			--count;
 		}
 	}