summary refs log tree commit diff
path: root/net/sunrpc
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2021-05-07 11:23:41 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2021-05-07 11:23:41 -0700
commita647034fe26b92702d5084b518c061e3cebefbaf (patch)
tree7b76983fac97c7ccc821dfb7addc72a6ba7251ee /net/sunrpc
parente22e9832798df81393d09d40fa34b01aea53cf39 (diff)
parent9e895cd9649abe4392c59d14e31b0f5667d082d2 (diff)
downloadlinux-a647034fe26b92702d5084b518c061e3cebefbaf.tar.gz
Merge tag 'nfs-for-5.13-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Stable fixes:

   - Add validation of the UDP retrans parameter to prevent shift
     out-of-bounds

   - Don't discard pNFS layout segments that are marked for return

  Bugfixes:

   - Fix a NULL dereference crash in xprt_complete_bc_request() when the
     NFSv4.1 server misbehaves.

   - Fix the handling of NFS READDIR cookie verifiers

   - Sundry fixes to ensure attribute revalidation works correctly when
     the server does not return post-op attributes.

   - nfs4_bitmask_adjust() must not change the server global bitmasks

   - Fix major timeout handling in the RPC code.

   - NFSv4.2 fallocate() fixes.

   - Fix the NFSv4.2 SEEK_HOLE/SEEK_DATA end-of-file handling

   - Copy offload attribute revalidation fixes

   - Fix an incorrect filehandle size check in the pNFS flexfiles driver

   - Fix several RDMA transport setup/teardown races

   - Fix several RDMA queue wrapping issues

   - Fix a misplaced memory read barrier in sunrpc's call_decode()

  Features:

   - Micro optimisation of the TCP transmission queue using TCP_CORK

   - statx() performance improvements by further splitting up the
     tracking of invalid cached file metadata.

   - Support the NFSv4.2 'change_attr_type' attribute and use it to
     optimise handling of change attribute updates"

* tag 'nfs-for-5.13-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (85 commits)
  xprtrdma: Fix a NULL dereference in frwr_unmap_sync()
  sunrpc: Fix misplaced barrier in call_decode
  NFSv4.2: Remove ifdef CONFIG_NFSD from NFSv4.2 client SSC code.
  xprtrdma: Move fr_mr field to struct rpcrdma_mr
  xprtrdma: Move the Work Request union to struct rpcrdma_mr
  xprtrdma: Move fr_linv_done field to struct rpcrdma_mr
  xprtrdma: Move cqe to struct rpcrdma_mr
  xprtrdma: Move fr_cid to struct rpcrdma_mr
  xprtrdma: Remove the RPC/RDMA QP event handler
  xprtrdma: Don't display r_xprt memory addresses in tracepoints
  xprtrdma: Add an rpcrdma_mr_completion_class
  xprtrdma: Add tracepoints showing FastReg WRs and remote invalidation
  xprtrdma: Avoid Send Queue wrapping
  xprtrdma: Do not wake RPC consumer on a failed LocalInv
  xprtrdma: Do not recycle MR after FastReg/LocalInv flushes
  xprtrdma: Clarify use of barrier in frwr_wc_localinv_done()
  xprtrdma: Rename frwr_release_mr()
  xprtrdma: rpcrdma_mr_pop() already does list_del_init()
  xprtrdma: Delete rpcrdma_recv_buffer_put()
  xprtrdma: Fix cwnd update ordering
  ...
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/clnt.c12
-rw-r--r--net/sunrpc/rpcb_clnt.c7
-rw-r--r--net/sunrpc/xprt.c18
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c4
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c209
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c39
-rw-r--r--net/sunrpc/xprtrdma/transport.c6
-rw-r--r--net/sunrpc/xprtrdma/verbs.c131
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h29
-rw-r--r--net/sunrpc/xprtsock.c9
10 files changed, 237 insertions, 227 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 612f0a641f4c..f555d335e910 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1799,7 +1799,6 @@ call_allocate(struct rpc_task *task)
 
 	status = xprt->ops->buf_alloc(task);
 	trace_rpc_buf_alloc(task, status);
-	xprt_inject_disconnect(xprt);
 	if (status == 0)
 		return;
 	if (status != -ENOMEM) {
@@ -2458,12 +2457,6 @@ call_decode(struct rpc_task *task)
 	}
 
 	/*
-	 * Ensure that we see all writes made by xprt_complete_rqst()
-	 * before it changed req->rq_reply_bytes_recvd.
-	 */
-	smp_rmb();
-
-	/*
 	 * Did we ever call xprt_complete_rqst()? If not, we should assume
 	 * the message is incomplete.
 	 */
@@ -2471,6 +2464,11 @@ call_decode(struct rpc_task *task)
 	if (!req->rq_reply_bytes_recvd)
 		goto out;
 
+	/* Ensure that we see all writes made by xprt_complete_rqst()
+	 * before it changed req->rq_reply_bytes_recvd.
+	 */
+	smp_rmb();
+
 	req->rq_rcv_buf.len = req->rq_private_buf.len;
 	trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
 
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 38fe2ce8a5aa..647b323cc1d5 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -344,13 +344,15 @@ static struct rpc_clnt *rpcb_create(struct net *net, const char *nodename,
 				    const char *hostname,
 				    struct sockaddr *srvaddr, size_t salen,
 				    int proto, u32 version,
-				    const struct cred *cred)
+				    const struct cred *cred,
+				    const struct rpc_timeout *timeo)
 {
 	struct rpc_create_args args = {
 		.net		= net,
 		.protocol	= proto,
 		.address	= srvaddr,
 		.addrsize	= salen,
+		.timeout	= timeo,
 		.servername	= hostname,
 		.nodename	= nodename,
 		.program	= &rpcb_program,
@@ -705,7 +707,8 @@ void rpcb_getport_async(struct rpc_task *task)
 				clnt->cl_nodename,
 				xprt->servername, sap, salen,
 				xprt->prot, bind_version,
-				clnt->cl_cred);
+				clnt->cl_cred,
+				task->tk_client->cl_timeout);
 	if (IS_ERR(rpcb_clnt)) {
 		status = PTR_ERR(rpcb_clnt);
 		goto bailout_nofree;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 691ccf8049a4..e5b5a960a69b 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -698,9 +698,9 @@ int xprt_adjust_timeout(struct rpc_rqst *req)
 	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
 	int status = 0;
 
-	if (time_before(jiffies, req->rq_minortimeo))
-		return status;
 	if (time_before(jiffies, req->rq_majortimeo)) {
+		if (time_before(jiffies, req->rq_minortimeo))
+			return status;
 		if (to->to_exponential)
 			req->rq_timeout <<= 1;
 		else
@@ -1352,6 +1352,7 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
 		list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
 		INIT_LIST_HEAD(&req->rq_xmit2);
 out:
+		atomic_long_inc(&xprt->xmit_queuelen);
 		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 		spin_unlock(&xprt->queue_lock);
 	}
@@ -1381,6 +1382,7 @@ xprt_request_dequeue_transmit_locked(struct rpc_task *task)
 		}
 	} else
 		list_del(&req->rq_xmit2);
+	atomic_long_dec(&req->rq_xprt->xmit_queuelen);
 }
 
 /**
@@ -1469,8 +1471,6 @@ bool xprt_prepare_transmit(struct rpc_task *task)
 	struct rpc_xprt	*xprt = req->rq_xprt;
 
 	if (!xprt_lock_write(xprt, task)) {
-		trace_xprt_transmit_queued(xprt, task);
-
 		/* Race breaker: someone may have transmitted us */
 		if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
 			rpc_wake_up_queued_task_set_status(&xprt->sending,
@@ -1483,7 +1483,10 @@ bool xprt_prepare_transmit(struct rpc_task *task)
 
 void xprt_end_transmit(struct rpc_task *task)
 {
-	xprt_release_write(task->tk_rqstp->rq_xprt, task);
+	struct rpc_xprt	*xprt = task->tk_rqstp->rq_xprt;
+
+	xprt_inject_disconnect(xprt);
+	xprt_release_write(xprt, task);
 }
 
 /**
@@ -1537,8 +1540,10 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 		return status;
 	}
 
-	if (is_retrans)
+	if (is_retrans) {
 		task->tk_client->cl_stats->rpcretrans++;
+		trace_xprt_retransmit(req);
+	}
 
 	xprt_inject_disconnect(xprt);
 
@@ -1885,7 +1890,6 @@ void xprt_release(struct rpc_task *task)
 	spin_unlock(&xprt->transport_lock);
 	if (req->rq_buffer)
 		xprt->ops->buf_free(task);
-	xprt_inject_disconnect(xprt);
 	xdr_free_bvec(&req->rq_rcv_buf);
 	xdr_free_bvec(&req->rq_snd_buf);
 	if (req->rq_cred != NULL)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index a249837d6a55..1151efd09b27 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -155,9 +155,11 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
 {
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct rpcrdma_rep *rep = req->rl_reply;
 	struct rpc_xprt *xprt = rqst->rq_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 
-	rpcrdma_recv_buffer_put(req->rl_reply);
+	rpcrdma_rep_put(&r_xprt->rx_buf, rep);
 	req->rl_reply = NULL;
 
 	spin_lock(&xprt->bc_pa_lock);
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 766a1048a48a..229fcc9a9064 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -49,20 +49,13 @@
 # define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-/**
- * frwr_release_mr - Destroy one MR
- * @mr: MR allocated by frwr_mr_init
- *
- */
-void frwr_release_mr(struct rpcrdma_mr *mr)
+static void frwr_cid_init(struct rpcrdma_ep *ep,
+			  struct rpcrdma_mr *mr)
 {
-	int rc;
+	struct rpc_rdma_cid *cid = &mr->mr_cid;
 
-	rc = ib_dereg_mr(mr->frwr.fr_mr);
-	if (rc)
-		trace_xprtrdma_frwr_dereg(mr, rc);
-	kfree(mr->mr_sg);
-	kfree(mr);
+	cid->ci_queue_id = ep->re_attr.send_cq->res.id;
+	cid->ci_completion_id = mr->mr_ibmr->res.id;
 }
 
 static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
@@ -75,20 +68,22 @@ static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
 	}
 }
 
-static void frwr_mr_recycle(struct rpcrdma_mr *mr)
+/**
+ * frwr_mr_release - Destroy one MR
+ * @mr: MR allocated by frwr_mr_init
+ *
+ */
+void frwr_mr_release(struct rpcrdma_mr *mr)
 {
-	struct rpcrdma_xprt *r_xprt = mr->mr_xprt;
-
-	trace_xprtrdma_mr_recycle(mr);
-
-	frwr_mr_unmap(r_xprt, mr);
+	int rc;
 
-	spin_lock(&r_xprt->rx_buf.rb_lock);
-	list_del(&mr->mr_all);
-	r_xprt->rx_stats.mrs_recycled++;
-	spin_unlock(&r_xprt->rx_buf.rb_lock);
+	frwr_mr_unmap(mr->mr_xprt, mr);
 
-	frwr_release_mr(mr);
+	rc = ib_dereg_mr(mr->mr_ibmr);
+	if (rc)
+		trace_xprtrdma_frwr_dereg(mr, rc);
+	kfree(mr->mr_sg);
+	kfree(mr);
 }
 
 static void frwr_mr_put(struct rpcrdma_mr *mr)
@@ -144,10 +139,11 @@ int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr)
 		goto out_list_err;
 
 	mr->mr_xprt = r_xprt;
-	mr->frwr.fr_mr = frmr;
+	mr->mr_ibmr = frmr;
 	mr->mr_device = NULL;
 	INIT_LIST_HEAD(&mr->mr_list);
-	init_completion(&mr->frwr.fr_linv_done);
+	init_completion(&mr->mr_linv_done);
+	frwr_cid_init(ep, mr);
 
 	sg_init_table(sg, depth);
 	mr->mr_sg = sg;
@@ -257,6 +253,7 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device)
 	ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */
 	ep->re_attr.cap.max_recv_wr = ep->re_max_requests;
 	ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
+	ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH;
 	ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */
 
 	ep->re_max_rdma_segs =
@@ -326,7 +323,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
 		goto out_dmamap_err;
 	mr->mr_device = ep->re_id->device;
 
-	ibmr = mr->frwr.fr_mr;
+	ibmr = mr->mr_ibmr;
 	n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE);
 	if (n != dma_nents)
 		goto out_mapmr_err;
@@ -336,7 +333,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
 	key = (u8)(ibmr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(ibmr, ++key);
 
-	reg_wr = &mr->frwr.fr_regwr;
+	reg_wr = &mr->mr_regwr;
 	reg_wr->mr = ibmr;
 	reg_wr->key = ibmr->rkey;
 	reg_wr->access = writing ?
@@ -364,29 +361,19 @@ out_mapmr_err:
  * @cq: completion queue
  * @wc: WCE for a completed FastReg WR
  *
+ * Each flushed MR gets destroyed after the QP has drained.
  */
 static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
 {
 	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr =
-		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
+	struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
 
 	/* WARNING: Only wr_cqe and status are reliable at this point */
-	trace_xprtrdma_wc_fastreg(wc, &frwr->fr_cid);
-	/* The MR will get recycled when the associated req is retransmitted */
+	trace_xprtrdma_wc_fastreg(wc, &mr->mr_cid);
 
 	rpcrdma_flush_disconnect(cq->cq_context, wc);
 }
 
-static void frwr_cid_init(struct rpcrdma_ep *ep,
-			  struct rpcrdma_frwr *frwr)
-{
-	struct rpc_rdma_cid *cid = &frwr->fr_cid;
-
-	cid->ci_queue_id = ep->re_attr.send_cq->res.id;
-	cid->ci_completion_id = frwr->fr_mr->res.id;
-}
-
 /**
  * frwr_send - post Send WRs containing the RPC Call message
  * @r_xprt: controlling transport instance
@@ -403,27 +390,36 @@ static void frwr_cid_init(struct rpcrdma_ep *ep,
  */
 int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
+	struct ib_send_wr *post_wr, *send_wr = &req->rl_wr;
 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
-	struct ib_send_wr *post_wr;
 	struct rpcrdma_mr *mr;
+	unsigned int num_wrs;
 
-	post_wr = &req->rl_wr;
+	num_wrs = 1;
+	post_wr = send_wr;
 	list_for_each_entry(mr, &req->rl_registered, mr_list) {
-		struct rpcrdma_frwr *frwr;
-
-		frwr = &mr->frwr;
-
-		frwr->fr_cqe.done = frwr_wc_fastreg;
-		frwr_cid_init(ep, frwr);
-		frwr->fr_regwr.wr.next = post_wr;
-		frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
-		frwr->fr_regwr.wr.num_sge = 0;
-		frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
-		frwr->fr_regwr.wr.send_flags = 0;
+		trace_xprtrdma_mr_fastreg(mr);
+
+		mr->mr_cqe.done = frwr_wc_fastreg;
+		mr->mr_regwr.wr.next = post_wr;
+		mr->mr_regwr.wr.wr_cqe = &mr->mr_cqe;
+		mr->mr_regwr.wr.num_sge = 0;
+		mr->mr_regwr.wr.opcode = IB_WR_REG_MR;
+		mr->mr_regwr.wr.send_flags = 0;
+		post_wr = &mr->mr_regwr.wr;
+		++num_wrs;
+	}
 
-		post_wr = &frwr->fr_regwr.wr;
+	if ((kref_read(&req->rl_kref) > 1) || num_wrs > ep->re_send_count) {
+		send_wr->send_flags |= IB_SEND_SIGNALED;
+		ep->re_send_count = min_t(unsigned int, ep->re_send_batch,
+					  num_wrs - ep->re_send_count);
+	} else {
+		send_wr->send_flags &= ~IB_SEND_SIGNALED;
+		ep->re_send_count -= num_wrs;
 	}
 
+	trace_xprtrdma_post_send(req);
 	return ib_post_send(ep->re_id->qp, post_wr, NULL);
 }
 
@@ -440,6 +436,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
 	list_for_each_entry(mr, mrs, mr_list)
 		if (mr->mr_handle == rep->rr_inv_rkey) {
 			list_del_init(&mr->mr_list);
+			trace_xprtrdma_mr_reminv(mr);
 			frwr_mr_put(mr);
 			break;	/* only one invalidated MR per RPC */
 		}
@@ -447,9 +444,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs)
 
 static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr)
 {
-	if (wc->status != IB_WC_SUCCESS)
-		frwr_mr_recycle(mr);
-	else
+	if (likely(wc->status == IB_WC_SUCCESS))
 		frwr_mr_put(mr);
 }
 
@@ -462,12 +457,10 @@ static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr)
 static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
 {
 	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr =
-		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+	struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
 
 	/* WARNING: Only wr_cqe and status are reliable at this point */
-	trace_xprtrdma_wc_li(wc, &frwr->fr_cid);
+	trace_xprtrdma_wc_li(wc, &mr->mr_cid);
 	frwr_mr_done(wc, mr);
 
 	rpcrdma_flush_disconnect(cq->cq_context, wc);
@@ -483,14 +476,12 @@ static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
 static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 {
 	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr =
-		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
+	struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
 
 	/* WARNING: Only wr_cqe and status are reliable at this point */
-	trace_xprtrdma_wc_li_wake(wc, &frwr->fr_cid);
+	trace_xprtrdma_wc_li_wake(wc, &mr->mr_cid);
 	frwr_mr_done(wc, mr);
-	complete(&frwr->fr_linv_done);
+	complete(&mr->mr_linv_done);
 
 	rpcrdma_flush_disconnect(cq->cq_context, wc);
 }
@@ -511,7 +502,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	struct ib_send_wr *first, **prev, *last;
 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 	const struct ib_send_wr *bad_wr;
-	struct rpcrdma_frwr *frwr;
 	struct rpcrdma_mr *mr;
 	int rc;
 
@@ -520,35 +510,34 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * Chain the LOCAL_INV Work Requests and post them with
 	 * a single ib_post_send() call.
 	 */
-	frwr = NULL;
 	prev = &first;
 	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
 
 		trace_xprtrdma_mr_localinv(mr);
 		r_xprt->rx_stats.local_inv_needed++;
 
-		frwr = &mr->frwr;
-		frwr->fr_cqe.done = frwr_wc_localinv;
-		frwr_cid_init(ep, frwr);
-		last = &frwr->fr_invwr;
+		last = &mr->mr_invwr;
 		last->next = NULL;
-		last->wr_cqe = &frwr->fr_cqe;
+		last->wr_cqe = &mr->mr_cqe;
 		last->sg_list = NULL;
 		last->num_sge = 0;
 		last->opcode = IB_WR_LOCAL_INV;
 		last->send_flags = IB_SEND_SIGNALED;
 		last->ex.invalidate_rkey = mr->mr_handle;
 
+		last->wr_cqe->done = frwr_wc_localinv;
+
 		*prev = last;
 		prev = &last->next;
 	}
+	mr = container_of(last, struct rpcrdma_mr, mr_invwr);
 
 	/* Strong send queue ordering guarantees that when the
 	 * last WR in the chain completes, all WRs in the chain
 	 * are complete.
 	 */
-	frwr->fr_cqe.done = frwr_wc_localinv_wake;
-	reinit_completion(&frwr->fr_linv_done);
+	last->wr_cqe->done = frwr_wc_localinv_wake;
+	reinit_completion(&mr->mr_linv_done);
 
 	/* Transport disconnect drains the receive CQ before it
 	 * replaces the QP. The RPC reply handler won't call us
@@ -562,22 +551,12 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * not happen, so don't wait in that case.
 	 */
 	if (bad_wr != first)
-		wait_for_completion(&frwr->fr_linv_done);
+		wait_for_completion(&mr->mr_linv_done);
 	if (!rc)
 		return;
 
-	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
-	 */
+	/* On error, the MRs get destroyed once the QP has drained. */
 	trace_xprtrdma_post_linv_err(req, rc);
-	while (bad_wr) {
-		frwr = container_of(bad_wr, struct rpcrdma_frwr,
-				    fr_invwr);
-		mr = container_of(frwr, struct rpcrdma_mr, frwr);
-		bad_wr = bad_wr->next;
-
-		list_del_init(&mr->mr_list);
-		frwr_mr_recycle(mr);
-	}
 }
 
 /**
@@ -589,20 +568,24 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc)
 {
 	struct ib_cqe *cqe = wc->wr_cqe;
-	struct rpcrdma_frwr *frwr =
-		container_of(cqe, struct rpcrdma_frwr, fr_cqe);
-	struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr);
-	struct rpcrdma_rep *rep = mr->mr_req->rl_reply;
+	struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe);
+	struct rpcrdma_rep *rep;
 
 	/* WARNING: Only wr_cqe and status are reliable at this point */
-	trace_xprtrdma_wc_li_done(wc, &frwr->fr_cid);
-	frwr_mr_done(wc, mr);
+	trace_xprtrdma_wc_li_done(wc, &mr->mr_cid);
 
-	/* Ensure @rep is generated before frwr_mr_done */
+	/* Ensure that @rep is generated before the MR is released */
+	rep = mr->mr_req->rl_reply;
 	smp_rmb();
-	rpcrdma_complete_rqst(rep);
 
-	rpcrdma_flush_disconnect(cq->cq_context, wc);
+	if (wc->status != IB_WC_SUCCESS) {
+		if (rep)
+			rpcrdma_unpin_rqst(rep);
+		rpcrdma_flush_disconnect(cq->cq_context, wc);
+		return;
+	}
+	frwr_mr_put(mr);
+	rpcrdma_complete_rqst(rep);
 }
 
 /**
@@ -619,33 +602,29 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
 	struct ib_send_wr *first, *last, **prev;
 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
-	const struct ib_send_wr *bad_wr;
-	struct rpcrdma_frwr *frwr;
 	struct rpcrdma_mr *mr;
 	int rc;
 
 	/* Chain the LOCAL_INV Work Requests and post them with
 	 * a single ib_post_send() call.
 	 */
-	frwr = NULL;
 	prev = &first;
 	while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
 
 		trace_xprtrdma_mr_localinv(mr);
 		r_xprt->rx_stats.local_inv_needed++;
 
-		frwr = &mr->frwr;
-		frwr->fr_cqe.done = frwr_wc_localinv;
-		frwr_cid_init(ep, frwr);
-		last = &frwr->fr_invwr;
+		last = &mr->mr_invwr;
 		last->next = NULL;
-		last->wr_cqe = &frwr->fr_cqe;
+		last->wr_cqe = &mr->mr_cqe;
 		last->sg_list = NULL;
 		last->num_sge = 0;
 		last->opcode = IB_WR_LOCAL_INV;
 		last->send_flags = IB_SEND_SIGNALED;
 		last->ex.invalidate_rkey = mr->mr_handle;
 
+		last->wr_cqe->done = frwr_wc_localinv;
+
 		*prev = last;
 		prev = &last->next;
 	}
@@ -655,31 +634,23 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * are complete. The last completion will wake up the
 	 * RPC waiter.
 	 */
-	frwr->fr_cqe.done = frwr_wc_localinv_done;
+	last->wr_cqe->done = frwr_wc_localinv_done;
 
 	/* Transport disconnect drains the receive CQ before it
 	 * replaces the QP. The RPC reply handler won't call us
 	 * unless re_id->qp is a valid pointer.
 	 */
-	bad_wr = NULL;
-	rc = ib_post_send(ep->re_id->qp, first, &bad_wr);
+	rc = ib_post_send(ep->re_id->qp, first, NULL);
 	if (!rc)
 		return;
 
-	/* Recycle MRs in the LOCAL_INV chain that did not get posted.
-	 */
+	/* On error, the MRs get destroyed once the QP has drained. */
 	trace_xprtrdma_post_linv_err(req, rc);
-	while (bad_wr) {
-		frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr);
-		mr = container_of(frwr, struct rpcrdma_mr, frwr);
-		bad_wr = bad_wr->next;
-
-		frwr_mr_recycle(mr);
-	}
 
 	/* The final LOCAL_INV WR in the chain is supposed to
-	 * do the wake. If it was never posted, the wake will
-	 * not happen, so wake here in that case.
+	 * do the wake. If it was never posted, the wake does
+	 * not happen. Unpin the rqst in preparation for its
+	 * retransmission.
 	 */
-	rpcrdma_complete_rqst(req->rl_reply);
+	rpcrdma_unpin_rqst(req->rl_reply);
 }
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 292f066d006e..649f7d8b9733 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -1326,9 +1326,35 @@ rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
 	return -EIO;
 }
 
-/* Perform XID lookup, reconstruction of the RPC reply, and
- * RPC completion while holding the transport lock to ensure
- * the rep, rqst, and rq_task pointers remain stable.
+/**
+ * rpcrdma_unpin_rqst - Release rqst without completing it
+ * @rep: RPC/RDMA Receive context
+ *
+ * This is done when a connection is lost so that a Reply
+ * can be dropped and its matching Call can be subsequently
+ * retransmitted on a new connection.
+ */
+void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep)
+{
+	struct rpc_xprt *xprt = &rep->rr_rxprt->rx_xprt;
+	struct rpc_rqst *rqst = rep->rr_rqst;
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+	req->rl_reply = NULL;
+	rep->rr_rqst = NULL;
+
+	spin_lock(&xprt->queue_lock);
+	xprt_unpin_rqst(rqst);
+	spin_unlock(&xprt->queue_lock);
+}
+
+/**
+ * rpcrdma_complete_rqst - Pass completed rqst back to RPC
+ * @rep: RPC/RDMA Receive context
+ *
+ * Reconstruct the RPC reply and complete the transaction
+ * while @rqst is still pinned to ensure the rep, rqst, and
+ * rq_task pointers remain stable.
  */
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
 {
@@ -1430,13 +1456,14 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 		credits = 1;	/* don't deadlock */
 	else if (credits > r_xprt->rx_ep->re_max_requests)
 		credits = r_xprt->rx_ep->re_max_requests;
+	rpcrdma_post_recvs(r_xprt, credits + (buf->rb_bc_srv_max_requests << 1),
+			   false);
 	if (buf->rb_credits != credits)
 		rpcrdma_update_cwnd(r_xprt, credits);
-	rpcrdma_post_recvs(r_xprt, false);
 
 	req = rpcr_to_rdmar(rqst);
 	if (unlikely(req->rl_reply))
-		rpcrdma_recv_buffer_put(req->rl_reply);
+		rpcrdma_rep_put(buf, req->rl_reply);
 	req->rl_reply = rep;
 	rep->rr_rqst = rqst;
 
@@ -1464,5 +1491,5 @@ out_shortreply:
 	trace_xprtrdma_reply_short_err(rep);
 
 out:
-	rpcrdma_recv_buffer_put(rep);
+	rpcrdma_rep_put(buf, rep);
 }
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 78d29d1bcc20..09953597d055 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -262,8 +262,10 @@ xprt_rdma_connect_worker(struct work_struct *work)
  * xprt_rdma_inject_disconnect - inject a connection fault
  * @xprt: transport context
  *
- * If @xprt is connected, disconnect it to simulate spurious connection
- * loss.
+ * If @xprt is connected, disconnect it to simulate spurious
+ * connection loss. Caller must hold @xprt's send lock to
+ * ensure that data structures and hardware resources are
+ * stable during the rdma_disconnect() call.
  */
 static void
 xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ec912cf9c618..1e965a380896 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -101,6 +101,12 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 	struct rdma_cm_id *id = ep->re_id;
 
+	/* Wait for rpcrdma_post_recvs() to leave its critical
+	 * section.
+	 */
+	if (atomic_inc_return(&ep->re_receiving) > 1)
+		wait_for_completion(&ep->re_done);
+
 	/* Flush Receives, then wait for deferred Reply work
 	 * to complete.
 	 */
@@ -114,22 +120,6 @@ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 	rpcrdma_ep_put(ep);
 }
 
-/**
- * rpcrdma_qp_event_handler - Handle one QP event (error notification)
- * @event: details of the event
- * @context: ep that owns QP where event occurred
- *
- * Called from the RDMA provider (device driver) possibly in an interrupt
- * context. The QP is always destroyed before the ID, so the ID will be
- * reliably available when this handler is invoked.
- */
-static void rpcrdma_qp_event_handler(struct ib_event *event, void *context)
-{
-	struct rpcrdma_ep *ep = context;
-
-	trace_xprtrdma_qp_event(ep, event);
-}
-
 /* Ensure xprt_force_disconnect() is invoked exactly once when a
  * connection is closed or lost. (The important thing is it needs
  * to be invoked "at least" once).
@@ -205,7 +195,7 @@ static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 
 out_flushed:
 	rpcrdma_flush_disconnect(r_xprt, wc);
-	rpcrdma_rep_destroy(rep);
+	rpcrdma_rep_put(&r_xprt->rx_buf, rep);
 }
 
 static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
@@ -414,6 +404,7 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 	__module_get(THIS_MODULE);
 	device = id->device;
 	ep->re_id = id;
+	reinit_completion(&ep->re_done);
 
 	ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
 	ep->re_inline_send = xprt_rdma_max_inline_write;
@@ -424,8 +415,6 @@ static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
 
 	r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
 
-	ep->re_attr.event_handler = rpcrdma_qp_event_handler;
-	ep->re_attr.qp_context = ep;
 	ep->re_attr.srq = NULL;
 	ep->re_attr.cap.max_inline_data = 0;
 	ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
@@ -535,7 +524,7 @@ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
 	 * outstanding Receives.
 	 */
 	rpcrdma_ep_get(ep);
-	rpcrdma_post_recvs(r_xprt, true);
+	rpcrdma_post_recvs(r_xprt, 1, true);
 
 	rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
 	if (rc)
@@ -954,13 +943,11 @@ static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
 		rpcrdma_req_reset(req);
 }
 
-/* No locking needed here. This function is called only by the
- * Receive completion handler.
- */
 static noinline
 struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
 				       bool temp)
 {
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_rep *rep;
 
 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
@@ -987,7 +974,10 @@ struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
 	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	rep->rr_recv_wr.num_sge = 1;
 	rep->rr_temp = temp;
-	list_add(&rep->rr_all, &r_xprt->rx_buf.rb_all_reps);
+
+	spin_lock(&buf->rb_lock);
+	list_add(&rep->rr_all, &buf->rb_all_reps);
+	spin_unlock(&buf->rb_lock);
 	return rep;
 
 out_free_regbuf:
@@ -998,16 +988,23 @@ out:
 	return NULL;
 }
 
-/* No locking needed here. This function is invoked only by the
- * Receive completion handler, or during transport shutdown.
- */
-static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
 {
-	list_del(&rep->rr_all);
 	rpcrdma_regbuf_free(rep->rr_rdmabuf);
 	kfree(rep);
 }
 
+static void rpcrdma_rep_destroy(struct rpcrdma_rep *rep)
+{
+	struct rpcrdma_buffer *buf = &rep->rr_rxprt->rx_buf;
+
+	spin_lock(&buf->rb_lock);
+	list_del(&rep->rr_all);
+	spin_unlock(&buf->rb_lock);
+
+	rpcrdma_rep_free(rep);
+}
+
 static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
 {
 	struct llist_node *node;
@@ -1019,12 +1016,21 @@ static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
 	return llist_entry(node, struct rpcrdma_rep, rr_node);
 }
 
-static void rpcrdma_rep_put(struct rpcrdma_buffer *buf,
-			    struct rpcrdma_rep *rep)
+/**
+ * rpcrdma_rep_put - Release rpcrdma_rep back to free list
+ * @buf: buffer pool
+ * @rep: rep to release
+ *
+ */
+void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
 {
 	llist_add(&rep->rr_node, &buf->rb_free_reps);
 }
 
+/* Caller must ensure the QP is quiescent (RQ is drained) before
+ * invoking this function, to guarantee rb_all_reps is not
+ * changing.
+ */
 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
@@ -1032,7 +1038,7 @@ static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
 
 	list_for_each_entry(rep, &buf->rb_all_reps, rr_all) {
 		rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
-		rep->rr_temp = true;
+		rep->rr_temp = true;	/* Mark this rep for destruction */
 	}
 }
 
@@ -1040,8 +1046,18 @@ static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_rep *rep;
 
-	while ((rep = rpcrdma_rep_get_locked(buf)) != NULL)
-		rpcrdma_rep_destroy(rep);
+	spin_lock(&buf->rb_lock);
+	while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
+					       struct rpcrdma_rep,
+					       rr_all)) != NULL) {
+		list_del(&rep->rr_all);
+		spin_unlock(&buf->rb_lock);
+
+		rpcrdma_rep_free(rep);
+
+		spin_lock(&buf->rb_lock);
+	}
+	spin_unlock(&buf->rb_lock);
 }
 
 /**
@@ -1104,7 +1120,7 @@ void rpcrdma_req_destroy(struct rpcrdma_req *req)
 		list_del(&mr->mr_all);
 		spin_unlock(&buf->rb_lock);
 
-		frwr_release_mr(mr);
+		frwr_mr_release(mr);
 	}
 
 	rpcrdma_regbuf_free(req->rl_recvbuf);
@@ -1135,7 +1151,7 @@ static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
 		list_del(&mr->mr_all);
 		spin_unlock(&buf->rb_lock);
 
-		frwr_release_mr(mr);
+		frwr_mr_release(mr);
 
 		spin_lock(&buf->rb_lock);
 	}
@@ -1221,17 +1237,6 @@ void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
 	spin_unlock(&buffers->rb_lock);
 }
 
-/**
- * rpcrdma_recv_buffer_put - Release rpcrdma_rep back to free list
- * @rep: rep to release
- *
- * Used after error conditions.
- */
-void rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
-{
-	rpcrdma_rep_put(&rep->rr_rxprt->rx_buf, rep);
-}
-
 /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
  *
  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
@@ -1342,21 +1347,7 @@ static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
  */
 int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
-	struct ib_send_wr *send_wr = &req->rl_wr;
-	struct rpcrdma_ep *ep = r_xprt->rx_ep;
-	int rc;
-
-	if (!ep->re_send_count || kref_read(&req->rl_kref) > 1) {
-		send_wr->send_flags |= IB_SEND_SIGNALED;
-		ep->re_send_count = ep->re_send_batch;
-	} else {
-		send_wr->send_flags &= ~IB_SEND_SIGNALED;
-		--ep->re_send_count;
-	}
-
-	trace_xprtrdma_post_send(req);
-	rc = frwr_send(r_xprt, req);
-	if (rc)
+	if (frwr_send(r_xprt, req))
 		return -ENOTCONN;
 	return 0;
 }
@@ -1364,27 +1355,30 @@ int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 /**
  * rpcrdma_post_recvs - Refill the Receive Queue
  * @r_xprt: controlling transport instance
- * @temp: mark Receive buffers to be deleted after use
+ * @needed: current credit grant
+ * @temp: mark Receive buffers to be deleted after one use
  *
  */
-void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_ep *ep = r_xprt->rx_ep;
 	struct ib_recv_wr *wr, *bad_wr;
 	struct rpcrdma_rep *rep;
-	int needed, count, rc;
+	int count, rc;
 
 	rc = 0;
 	count = 0;
 
-	needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1);
 	if (likely(ep->re_receive_count > needed))
 		goto out;
 	needed -= ep->re_receive_count;
 	if (!temp)
 		needed += RPCRDMA_MAX_RECV_BATCH;
 
+	if (atomic_inc_return(&ep->re_receiving) > 1)
+		goto out;
+
 	/* fast path: all needed reps can be found on the free list */
 	wr = NULL;
 	while (needed) {
@@ -1410,6 +1404,9 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 
 	rc = ib_post_recv(ep->re_id->qp, wr,
 			  (const struct ib_recv_wr **)&bad_wr);
+	if (atomic_dec_return(&ep->re_receiving) > 0)
+		complete(&ep->re_done);
+
 out:
 	trace_xprtrdma_post_recvs(r_xprt, count, rc);
 	if (rc) {
@@ -1418,7 +1415,7 @@ out:
 
 			rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
 			wr = wr->next;
-			rpcrdma_recv_buffer_put(rep);
+			rpcrdma_rep_put(buf, rep);
 			--count;
 		}
 	}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index fe3be985e239..436ad7312614 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -83,6 +83,7 @@ struct rpcrdma_ep {
 	unsigned int		re_max_inline_recv;
 	int			re_async_rc;
 	int			re_connect_status;
+	atomic_t		re_receiving;
 	atomic_t		re_force_disconnect;
 	struct ib_qp_init_attr	re_attr;
 	wait_queue_head_t       re_connect_wait;
@@ -228,31 +229,28 @@ struct rpcrdma_sendctx {
  * An external memory region is any buffer or page that is registered
  * on the fly (ie, not pre-registered).
  */
-struct rpcrdma_frwr {
-	struct ib_mr			*fr_mr;
-	struct ib_cqe			fr_cqe;
-	struct rpc_rdma_cid		fr_cid;
-	struct completion		fr_linv_done;
-	union {
-		struct ib_reg_wr	fr_regwr;
-		struct ib_send_wr	fr_invwr;
-	};
-};
-
 struct rpcrdma_req;
 struct rpcrdma_mr {
 	struct list_head	mr_list;
 	struct rpcrdma_req	*mr_req;
+
+	struct ib_mr		*mr_ibmr;
 	struct ib_device	*mr_device;
 	struct scatterlist	*mr_sg;
 	int			mr_nents;
 	enum dma_data_direction	mr_dir;
-	struct rpcrdma_frwr	frwr;
+	struct ib_cqe		mr_cqe;
+	struct completion	mr_linv_done;
+	union {
+		struct ib_reg_wr	mr_regwr;
+		struct ib_send_wr	mr_invwr;
+	};
 	struct rpcrdma_xprt	*mr_xprt;
 	u32			mr_handle;
 	u32			mr_length;
 	u64			mr_offset;
 	struct list_head	mr_all;
+	struct rpc_rdma_cid	mr_cid;
 };
 
 /*
@@ -461,7 +459,7 @@ int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt);
 void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt);
 
 int rpcrdma_post_sends(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
-void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
+void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed, bool temp);
 
 /*
  * Buffer calls - xprtrdma/verbs.c
@@ -480,7 +478,7 @@ void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers,
 			struct rpcrdma_req *req);
-void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep);
 
 bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size,
 			    gfp_t flags);
@@ -527,7 +525,7 @@ rpcrdma_data_dir(bool writing)
 void frwr_reset(struct rpcrdma_req *req);
 int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device);
 int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr);
-void frwr_release_mr(struct rpcrdma_mr *mr);
+void frwr_mr_release(struct rpcrdma_mr *mr);
 struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
 				struct rpcrdma_mr_seg *seg,
 				int nsegs, bool writing, __be32 xid,
@@ -560,6 +558,7 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_ep *ep);
 void rpcrdma_reset_cwnd(struct rpcrdma_xprt *r_xprt);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
+void rpcrdma_unpin_rqst(struct rpcrdma_rep *rep);
 void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
 
 static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index e35760f238a4..47aa47a2b07c 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -558,6 +558,10 @@ xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
 	struct rpc_rqst *req;
 	ssize_t ret;
 
+	/* Is this transport associated with the backchannel? */
+	if (!xprt->bc_serv)
+		return -ESHUTDOWN;
+
 	/* Look up and lock the request corresponding to the given XID */
 	req = xprt_lookup_bc_request(xprt, transport->recv.xid);
 	if (!req) {
@@ -1018,6 +1022,7 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
 	 * to cope with writespace callbacks arriving _after_ we have
 	 * called sendmsg(). */
 	req->rq_xtime = ktime_get();
+	tcp_sock_set_cork(transport->inet, true);
 	while (1) {
 		status = xprt_sock_sendmsg(transport->sock, &msg, xdr,
 					   transport->xmit.offset, rm, &sent);
@@ -1032,6 +1037,8 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
 		if (likely(req->rq_bytes_sent >= msglen)) {
 			req->rq_xmit_bytes_sent += transport->xmit.offset;
 			transport->xmit.offset = 0;
+			if (atomic_long_read(&xprt->xmit_queuelen) == 1)
+				tcp_sock_set_cork(transport->inet, false);
 			return 0;
 		}
 
@@ -2163,6 +2170,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 		}
 
 		xs_tcp_set_socket_timeouts(xprt, sock);
+		tcp_sock_set_nodelay(sk);
 
 		write_lock_bh(&sk->sk_callback_lock);
 
@@ -2177,7 +2185,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
 		/* socket options */
 		sock_reset_flag(sk, SOCK_LINGER);
-		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
 
 		xprt_clear_connected(xprt);