summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-07-13 14:35:37 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2017-07-13 14:35:37 -0700
commitb86faee6d111294fa95a2e89b5f771b2da3c9782 (patch)
tree1518742b6f7fabd4e3b3875fc31aeddcae11ceb8 /net
parent48ea2cedde3507941f4549b0d27ed46ed29e39ff (diff)
parentb4f937cffa66b3d56eb8f586e620d0b223a281a3 (diff)
downloadlinux-b86faee6d111294fa95a2e89b5f771b2da3c9782.tar.gz
Merge tag 'nfs-for-4.13-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker:
 "Stable bugfixes:
   - Fix -EACCESS on commit to DS handling
   - Fix initialization of nfs_page_array->npages
   - Only invalidate dentries that are actually invalid

  Features:
   - Enable NFSoRDMA transparent state migration
   - Add support for lookup-by-filehandle
   - Add support for nfs re-exporting

  Other bugfixes and cleanups:
   - Christoph cleaned up the way we declare NFS operations
   - Clean up various internal structures
   - Various cleanups to commits
   - Various improvements to error handling
   - Set the dt_type of . and .. entries in NFS v4
   - Make slot allocation more reliable
   - Fix fscache stat printing
   - Fix uninitialized variable warnings
   - Fix potential list overrun in nfs_atomic_open()
   - Fix a race in NFSoRDMA RPC reply handler
   - Fix return size for nfs42_proc_copy()
   - Fix against MAC forgery timing attacks"

* tag 'nfs-for-4.13-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (68 commits)
  NFS: Don't run wake_up_bit() when nobody is waiting...
  nfs: add export operations
  nfs4: add NFSv4 LOOKUPP handlers
  nfs: add a nfs_ilookup helper
  nfs: replace d_add with d_splice_alias in atomic_open
  sunrpc: use constant time memory comparison for mac
  NFSv4.2 fix size storage for nfs42_proc_copy
  xprtrdma: Fix documenting comments in frwr_ops.c
  xprtrdma: Replace PAGE_MASK with offset_in_page()
  xprtrdma: FMR does not need list_del_init()
  xprtrdma: Demote "connect" log messages
  NFSv4.1: Use seqid returned by EXCHANGE_ID after state migration
  NFSv4.1: Handle EXCHGID4_FLAG_CONFIRMED_R during NFSv4.1 migration
  xprtrdma: Don't defer MR recovery if ro_map fails
  xprtrdma: Fix FRWR invalidation error recovery
  xprtrdma: Fix client lock-up after application signal fires
  xprtrdma: Rename rpcrdma_req::rl_free
  xprtrdma: Pass only the list of registered MRs to ro_unmap_sync
  xprtrdma: Pre-mark remotely invalidated MRs
  xprtrdma: On invalidation failure, remove MWs from rl_registered
  ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_crypto.c3
-rw-r--r--net/sunrpc/xprt.c8
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c47
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c69
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c125
-rw-r--r--net/sunrpc/xprtrdma/transport.c3
-rw-r--r--net/sunrpc/xprtrdma/verbs.c55
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h40
8 files changed, 201 insertions, 149 deletions
diff --git a/net/sunrpc/auth_gss/gss_krb5_crypto.c b/net/sunrpc/auth_gss/gss_krb5_crypto.c
index fb39284ec174..12649c9fedab 100644
--- a/net/sunrpc/auth_gss/gss_krb5_crypto.c
+++ b/net/sunrpc/auth_gss/gss_krb5_crypto.c
@@ -34,6 +34,7 @@
  * WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
  */
 
+#include <crypto/algapi.h>
 #include <crypto/hash.h>
 #include <crypto/skcipher.h>
 #include <linux/err.h>
@@ -927,7 +928,7 @@ gss_krb5_aes_decrypt(struct krb5_ctx *kctx, u32 offset, struct xdr_buf *buf,
 	if (ret)
 		goto out_err;
 
-	if (memcmp(pkt_hmac, our_hmac, kctx->gk5e->cksumlength) != 0) {
+	if (crypto_memneq(pkt_hmac, our_hmac, kctx->gk5e->cksumlength) != 0) {
 		ret = GSS_S_BAD_SIG;
 		goto out_err;
 	}
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 3e63c5e97ebe..4654a9934269 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1047,13 +1047,15 @@ out:
 	return ret;
 }
 
-static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
+static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
 {
 	struct rpc_rqst *req = ERR_PTR(-EAGAIN);
 
 	if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
 		goto out;
-	req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
+	spin_unlock(&xprt->reserve_lock);
+	req = kzalloc(sizeof(struct rpc_rqst), GFP_NOFS);
+	spin_lock(&xprt->reserve_lock);
 	if (req != NULL)
 		goto out;
 	atomic_dec(&xprt->num_reqs);
@@ -1081,7 +1083,7 @@ void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
 		list_del(&req->rq_list);
 		goto out_init_req;
 	}
-	req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT|__GFP_NOWARN);
+	req = xprt_dynamic_alloc_slot(xprt);
 	if (!IS_ERR(req))
 		goto out_init_req;
 	switch (PTR_ERR(req)) {
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 59e64025ed96..d3f84bb1d443 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -91,7 +91,7 @@ __fmr_unmap(struct rpcrdma_mw *mw)
 
 	list_add(&mw->fmr.fm_mr->list, &l);
 	rc = ib_unmap_fmr(&l);
-	list_del_init(&mw->fmr.fm_mr->list);
+	list_del(&mw->fmr.fm_mr->list);
 	return rc;
 }
 
@@ -213,13 +213,11 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
 	}
-	mw->mw_nents = i;
 	mw->mw_dir = rpcrdma_data_dir(writing);
-	if (i == 0)
-		goto out_dmamap_err;
 
-	if (!ib_dma_map_sg(r_xprt->rx_ia.ri_device,
-			   mw->mw_sg, mw->mw_nents, mw->mw_dir))
+	mw->mw_nents = ib_dma_map_sg(r_xprt->rx_ia.ri_device,
+				     mw->mw_sg, i, mw->mw_dir);
+	if (!mw->mw_nents)
 		goto out_dmamap_err;
 
 	for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++)
@@ -237,16 +235,18 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	return mw->mw_nents;
 
 out_dmamap_err:
-	pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
-	       mw->mw_sg, mw->mw_nents);
-	rpcrdma_defer_mr_recovery(mw);
+	pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
+	       mw->mw_sg, i);
+	rpcrdma_put_mw(r_xprt, mw);
 	return -EIO;
 
 out_maperr:
 	pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
 	       len, (unsigned long long)dma_pages[0],
 	       pageoff, mw->mw_nents, rc);
-	rpcrdma_defer_mr_recovery(mw);
+	ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+			mw->mw_sg, mw->mw_nents, mw->mw_dir);
+	rpcrdma_put_mw(r_xprt, mw);
 	return -EIO;
 }
 
@@ -255,24 +255,26 @@ out_maperr:
  * Sleeps until it is safe for the host CPU to access the
  * previously mapped memory regions.
  *
- * Caller ensures that req->rl_registered is not empty.
+ * Caller ensures that @mws is not empty before the call. This
+ * function empties the list.
  */
 static void
-fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
 {
-	struct rpcrdma_mw *mw, *tmp;
+	struct rpcrdma_mw *mw;
 	LIST_HEAD(unmap_list);
 	int rc;
 
-	dprintk("RPC:       %s: req %p\n", __func__, req);
-
 	/* ORDER: Invalidate all of the req's MRs first
 	 *
 	 * ib_unmap_fmr() is slow, so use a single call instead
 	 * of one call per mapped FMR.
 	 */
-	list_for_each_entry(mw, &req->rl_registered, mw_list)
+	list_for_each_entry(mw, mws, mw_list) {
+		dprintk("RPC:       %s: unmapping fmr %p\n",
+			__func__, &mw->fmr);
 		list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
+	}
 	r_xprt->rx_stats.local_inv_needed++;
 	rc = ib_unmap_fmr(&unmap_list);
 	if (rc)
@@ -281,9 +283,11 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	/* ORDER: Now DMA unmap all of the req's MRs, and return
 	 * them to the free MW list.
 	 */
-	list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
-		list_del_init(&mw->mw_list);
-		list_del_init(&mw->fmr.fm_mr->list);
+	while (!list_empty(mws)) {
+		mw = rpcrdma_pop_mw(mws);
+		dprintk("RPC:       %s: DMA unmapping fmr %p\n",
+			__func__, &mw->fmr);
+		list_del(&mw->fmr.fm_mr->list);
 		ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
 				mw->mw_sg, mw->mw_nents, mw->mw_dir);
 		rpcrdma_put_mw(r_xprt, mw);
@@ -294,8 +298,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 out_reset:
 	pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
 
-	list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
-		list_del_init(&mw->fmr.fm_mr->list);
+	while (!list_empty(mws)) {
+		mw = rpcrdma_pop_mw(mws);
+		list_del(&mw->fmr.fm_mr->list);
 		fmr_op_recover_mr(mw);
 	}
 }
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index f81dd93176c0..6aea36a38bfd 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -277,7 +277,7 @@ __frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
 }
 
 /**
- * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
+ * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC
  * @cq:	completion queue (ignored)
  * @wc:	completed WR
  *
@@ -298,7 +298,7 @@ frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
 }
 
 /**
- * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC
  * @cq:	completion queue (ignored)
  * @wc:	completed WR
  *
@@ -319,7 +319,7 @@ frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
 }
 
 /**
- * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
+ * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC
  * @cq:	completion queue (ignored)
  * @wc:	completed WR
  *
@@ -355,7 +355,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	struct ib_mr *mr;
 	struct ib_reg_wr *reg_wr;
 	struct ib_send_wr *bad_wr;
-	int rc, i, n, dma_nents;
+	int rc, i, n;
 	u8 key;
 
 	mw = NULL;
@@ -391,14 +391,10 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
 	}
-	mw->mw_nents = i;
 	mw->mw_dir = rpcrdma_data_dir(writing);
-	if (i == 0)
-		goto out_dmamap_err;
 
-	dma_nents = ib_dma_map_sg(ia->ri_device,
-				  mw->mw_sg, mw->mw_nents, mw->mw_dir);
-	if (!dma_nents)
+	mw->mw_nents = ib_dma_map_sg(ia->ri_device, mw->mw_sg, i, mw->mw_dir);
+	if (!mw->mw_nents)
 		goto out_dmamap_err;
 
 	n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE);
@@ -436,13 +432,14 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	return mw->mw_nents;
 
 out_dmamap_err:
-	pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n",
-	       mw->mw_sg, mw->mw_nents);
-	rpcrdma_defer_mr_recovery(mw);
+	pr_err("rpcrdma: failed to DMA map sg %p sg_nents %d\n",
+	       mw->mw_sg, i);
+	frmr->fr_state = FRMR_IS_INVALID;
+	rpcrdma_put_mw(r_xprt, mw);
 	return -EIO;
 
 out_mapmr_err:
-	pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
+	pr_err("rpcrdma: failed to map mr %p (%d/%d)\n",
 	       frmr->fr_mr, n, mw->mw_nents);
 	rpcrdma_defer_mr_recovery(mw);
 	return -EIO;
@@ -458,21 +455,19 @@ out_senderr:
  * Sleeps until it is safe for the host CPU to access the
  * previously mapped memory regions.
  *
- * Caller ensures that req->rl_registered is not empty.
+ * Caller ensures that @mws is not empty before the call. This
+ * function empties the list.
  */
 static void
-frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
 {
 	struct ib_send_wr *first, **prev, *last, *bad_wr;
-	struct rpcrdma_rep *rep = req->rl_reply;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_frmr *f;
 	struct rpcrdma_mw *mw;
 	int count, rc;
 
-	dprintk("RPC:       %s: req %p\n", __func__, req);
-
-	/* ORDER: Invalidate all of the req's MRs first
+	/* ORDER: Invalidate all of the MRs first
 	 *
 	 * Chain the LOCAL_INV Work Requests and post them with
 	 * a single ib_post_send() call.
@@ -480,11 +475,10 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	f = NULL;
 	count = 0;
 	prev = &first;
-	list_for_each_entry(mw, &req->rl_registered, mw_list) {
+	list_for_each_entry(mw, mws, mw_list) {
 		mw->frmr.fr_state = FRMR_IS_INVALID;
 
-		if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
-		    (mw->mw_handle == rep->rr_inv_rkey))
+		if (mw->mw_flags & RPCRDMA_MW_F_RI)
 			continue;
 
 		f = &mw->frmr;
@@ -524,18 +518,19 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * unless ri_id->qp is a valid pointer.
 	 */
 	r_xprt->rx_stats.local_inv_needed++;
+	bad_wr = NULL;
 	rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
+	if (bad_wr != first)
+		wait_for_completion(&f->fr_linv_done);
 	if (rc)
 		goto reset_mrs;
 
-	wait_for_completion(&f->fr_linv_done);
-
-	/* ORDER: Now DMA unmap all of the req's MRs, and return
+	/* ORDER: Now DMA unmap all of the MRs, and return
 	 * them to the free MW list.
 	 */
 unmap:
-	while (!list_empty(&req->rl_registered)) {
-		mw = rpcrdma_pop_mw(&req->rl_registered);
+	while (!list_empty(mws)) {
+		mw = rpcrdma_pop_mw(mws);
 		dprintk("RPC:       %s: DMA unmapping frmr %p\n",
 			__func__, &mw->frmr);
 		ib_dma_unmap_sg(ia->ri_device,
@@ -546,17 +541,19 @@ unmap:
 
 reset_mrs:
 	pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc);
-	rdma_disconnect(ia->ri_id);
 
 	/* Find and reset the MRs in the LOCAL_INV WRs that did not
-	 * get posted. This is synchronous, and slow.
+	 * get posted.
 	 */
-	list_for_each_entry(mw, &req->rl_registered, mw_list) {
-		f = &mw->frmr;
-		if (mw->mw_handle == bad_wr->ex.invalidate_rkey) {
-			__frwr_reset_mr(ia, mw);
-			bad_wr = bad_wr->next;
-		}
+	rpcrdma_init_cqcount(&r_xprt->rx_ep, -count);
+	while (bad_wr) {
+		f = container_of(bad_wr, struct rpcrdma_frmr,
+				 fr_invwr);
+		mw = container_of(f, struct rpcrdma_mw, frmr);
+
+		__frwr_reset_mr(ia, mw);
+
+		bad_wr = bad_wr->next;
 	}
 	goto unmap;
 }
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 694e9b13ecf0..ca4d6e4528f3 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -141,7 +141,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
 
 	if (xdr->page_len) {
 		remaining = xdr->page_len;
-		offset = xdr->page_base & ~PAGE_MASK;
+		offset = offset_in_page(xdr->page_base);
 		count = 0;
 		while (remaining) {
 			remaining -= min_t(unsigned int,
@@ -222,7 +222,7 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
 
 	len = xdrbuf->page_len;
 	ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
-	page_base = xdrbuf->page_base & ~PAGE_MASK;
+	page_base = offset_in_page(xdrbuf->page_base);
 	p = 0;
 	while (len && n < RPCRDMA_MAX_SEGS) {
 		if (!ppages[p]) {
@@ -540,7 +540,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 			goto out;
 
 		page = virt_to_page(xdr->tail[0].iov_base);
-		page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+		page_base = offset_in_page(xdr->tail[0].iov_base);
 
 		/* If the content in the page list is an odd length,
 		 * xdr_write_pages() has added a pad at the beginning
@@ -557,7 +557,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 	 */
 	if (xdr->page_len) {
 		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
-		page_base = xdr->page_base & ~PAGE_MASK;
+		page_base = offset_in_page(xdr->page_base);
 		remaining = xdr->page_len;
 		while (remaining) {
 			sge_no++;
@@ -587,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
 	 */
 	if (xdr->tail[0].iov_len) {
 		page = virt_to_page(xdr->tail[0].iov_base);
-		page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+		page_base = offset_in_page(xdr->tail[0].iov_base);
 		len = xdr->tail[0].iov_len;
 
 map_tail:
@@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 		rpclen = 0;
 	}
 
+	req->rl_xid = rqst->rq_xid;
+	rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
 	/* This implementation supports the following combinations
 	 * of chunk lists in one RPC-over-RDMA Call message:
 	 *
@@ -875,9 +878,9 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 	srcp += curlen;
 	copy_len -= curlen;
 
-	page_base = rqst->rq_rcv_buf.page_base;
-	ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
-	page_base &= ~PAGE_MASK;
+	ppages = rqst->rq_rcv_buf.pages +
+		(rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
+	page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
 	fixup_copy_count = 0;
 	if (copy_len && rqst->rq_rcv_buf.page_len) {
 		int pagelist_len;
@@ -928,6 +931,24 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 	return fixup_copy_count;
 }
 
+/* Caller must guarantee @rep remains stable during this call.
+ */
+static void
+rpcrdma_mark_remote_invalidation(struct list_head *mws,
+				 struct rpcrdma_rep *rep)
+{
+	struct rpcrdma_mw *mw;
+
+	if (!(rep->rr_wc_flags & IB_WC_WITH_INVALIDATE))
+		return;
+
+	list_for_each_entry(mw, mws, mw_list)
+		if (mw->mw_handle == rep->rr_inv_rkey) {
+			mw->mw_flags = RPCRDMA_MW_F_RI;
+			break; /* only one invalidated MR per RPC */
+		}
+}
+
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
@@ -969,14 +990,16 @@ rpcrdma_reply_handler(struct work_struct *work)
 {
 	struct rpcrdma_rep *rep =
 			container_of(work, struct rpcrdma_rep, rr_work);
+	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
-	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	__be32 *iptr;
 	int rdmalen, status, rmerr;
 	unsigned long cwnd;
+	struct list_head mws;
 
 	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
 
@@ -994,27 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work)
 	/* Match incoming rpcrdma_rep to an rpcrdma_req to
 	 * get context for handling any incoming chunks.
 	 */
-	spin_lock_bh(&xprt->transport_lock);
-	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-	if (!rqst)
+	spin_lock(&buf->rb_lock);
+	req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+					headerp->rm_xid);
+	if (!req)
 		goto out_nomatch;
-
-	req = rpcr_to_rdmar(rqst);
 	if (req->rl_reply)
 		goto out_duplicate;
 
-	/* Sanity checking has passed. We are now committed
-	 * to complete this transaction.
+	list_replace_init(&req->rl_registered, &mws);
+	rpcrdma_mark_remote_invalidation(&mws, rep);
+
+	/* Avoid races with signals and duplicate replies
+	 * by marking this req as matched.
 	 */
-	list_del_init(&rqst->rq_list);
-	spin_unlock_bh(&xprt->transport_lock);
+	req->rl_reply = rep;
+	spin_unlock(&buf->rb_lock);
+
 	dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
-	/* from here on, the reply is no longer an orphan */
-	req->rl_reply = rep;
-	xprt->reestablish_timeout = 0;
+	/* Invalidate and unmap the data payloads before waking the
+	 * waiting application. This guarantees the memory regions
+	 * are properly fenced from the server before the application
+	 * accesses the data. It also ensures proper send flow control:
+	 * waking the next RPC waits until this RPC has relinquished
+	 * all its Send Queue entries.
+	 */
+	if (!list_empty(&mws))
+		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
+	/* Perform XID lookup, reconstruction of the RPC reply, and
+	 * RPC completion while holding the transport lock to ensure
+	 * the rep, rqst, and rq_task pointers remain stable.
+	 */
+	spin_lock_bh(&xprt->transport_lock);
+	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+	if (!rqst)
+		goto out_norqst;
+	xprt->reestablish_timeout = 0;
 	if (headerp->rm_vers != rpcrdma_version)
 		goto out_badversion;
 
@@ -1024,12 +1065,9 @@ rpcrdma_reply_handler(struct work_struct *work)
 	case rdma_msg:
 		/* never expect read chunks */
 		/* never expect reply chunks (two ways to check) */
-		/* never expect write chunks without having offered RDMA */
 		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
 		    (headerp->rm_body.rm_chunks[1] == xdr_zero &&
-		     headerp->rm_body.rm_chunks[2] != xdr_zero) ||
-		    (headerp->rm_body.rm_chunks[1] != xdr_zero &&
-		     list_empty(&req->rl_registered)))
+		     headerp->rm_body.rm_chunks[2] != xdr_zero))
 			goto badheader;
 		if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
 			/* count any expected write chunks in read reply */
@@ -1066,8 +1104,7 @@ rpcrdma_reply_handler(struct work_struct *work)
 		/* never expect read or write chunks, always reply chunks */
 		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
 		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
-		    headerp->rm_body.rm_chunks[2] != xdr_one ||
-		    list_empty(&req->rl_registered))
+		    headerp->rm_body.rm_chunks[2] != xdr_one)
 			goto badheader;
 		iptr = (__be32 *)((unsigned char *)headerp +
 							RPCRDMA_HDRLEN_MIN);
@@ -1093,17 +1130,6 @@ badheader:
 	}
 
 out:
-	/* Invalidate and flush the data payloads before waking the
-	 * waiting application. This guarantees the memory region is
-	 * properly fenced from the server before the application
-	 * accesses the data. It also ensures proper send flow
-	 * control: waking the next RPC waits until this RPC has
-	 * relinquished all its Send Queue entries.
-	 */
-	if (!list_empty(&req->rl_registered))
-		r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
-
-	spin_lock_bh(&xprt->transport_lock);
 	cwnd = xprt->cwnd;
 	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
@@ -1112,7 +1138,7 @@ out:
 	xprt_complete_rqst(rqst->rq_task, status);
 	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-			__func__, xprt, rqst, status);
+		__func__, xprt, rqst, status);
 	return;
 
 out_badstatus:
@@ -1161,26 +1187,37 @@ out_rdmaerr:
 	r_xprt->rx_stats.bad_reply_count++;
 	goto out;
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
  */
+out_norqst:
+	spin_unlock_bh(&xprt->transport_lock);
+	rpcrdma_buffer_put(req);
+	dprintk("RPC:       %s: race, no rqst left for req %p\n",
+		__func__, req);
+	return;
+
 out_shortreply:
 	dprintk("RPC:       %s: short/invalid reply\n", __func__);
 	goto repost;
 
 out_nomatch:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
 		__func__, be32_to_cpu(headerp->rm_xid),
 		rep->rr_len);
 	goto repost;
 
 out_duplicate:
-	spin_unlock_bh(&xprt->transport_lock);
+	spin_unlock(&buf->rb_lock);
 	dprintk("RPC:       %s: "
 		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
 repost:
 	r_xprt->rx_stats.bad_reply_count++;
 	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 62ecbccd9748..d1c458e5ec4d 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -684,7 +684,8 @@ xprt_rdma_free(struct rpc_task *task)
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	if (unlikely(!list_empty(&req->rl_registered)))
+	rpcrdma_remove_req(&r_xprt->rx_buf, req);
+	if (!list_empty(&req->rl_registered))
 		ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 	rpcrdma_unmap_sges(ia, req);
 	rpcrdma_buffer_put(req);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 3dbce9ac4327..e4171f2abe37 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -243,8 +243,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
 #endif
-	struct ib_qp_attr *attr = &ia->ri_qp_attr;
-	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
 	int connstate = 0;
 
 	switch (event->event) {
@@ -267,7 +265,8 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		break;
 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-		pr_info("rpcrdma: removing device for %pIS:%u\n",
+		pr_info("rpcrdma: removing device %s for %pIS:%u\n",
+			ia->ri_device->name,
 			sap, rpc_get_port(sap));
 #endif
 		set_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags);
@@ -282,13 +281,6 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		return 1;
 	case RDMA_CM_EVENT_ESTABLISHED:
 		connstate = 1;
-		ib_query_qp(ia->ri_id->qp, attr,
-			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
-			    iattr);
-		dprintk("RPC:       %s: %d responder resources"
-			" (%d initiator)\n",
-			__func__, attr->max_dest_rd_atomic,
-			attr->max_rd_atomic);
 		rpcrdma_update_connect_private(xprt, &event->param.conn);
 		goto connected;
 	case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -298,11 +290,9 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 		connstate = -ENETDOWN;
 		goto connected;
 	case RDMA_CM_EVENT_REJECTED:
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-		pr_info("rpcrdma: connection to %pIS:%u on %s rejected: %s\n",
-			sap, rpc_get_port(sap), ia->ri_device->name,
+		dprintk("rpcrdma: connection to %pIS:%u rejected: %s\n",
+			sap, rpc_get_port(sap),
 			rdma_reject_msg(id, event->status));
-#endif
 		connstate = -ECONNREFUSED;
 		if (event->status == IB_CM_REJ_STALE_CONN)
 			connstate = -EAGAIN;
@@ -310,37 +300,19 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 	case RDMA_CM_EVENT_DISCONNECTED:
 		connstate = -ECONNABORTED;
 connected:
-		dprintk("RPC:       %s: %sconnected\n",
-					__func__, connstate > 0 ? "" : "dis");
 		atomic_set(&xprt->rx_buf.rb_credits, 1);
 		ep->rep_connected = connstate;
 		rpcrdma_conn_func(ep);
 		wake_up_all(&ep->rep_connect_wait);
 		/*FALLTHROUGH*/
 	default:
-		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
-			__func__, sap, rpc_get_port(sap), ep,
-			rdma_event_msg(event->event));
+		dprintk("RPC:       %s: %pIS:%u on %s/%s (ep 0x%p): %s\n",
+			__func__, sap, rpc_get_port(sap),
+			ia->ri_device->name, ia->ri_ops->ro_displayname,
+			ep, rdma_event_msg(event->event));
 		break;
 	}
 
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-	if (connstate == 1) {
-		int ird = attr->max_dest_rd_atomic;
-		int tird = ep->rep_remote_cma.responder_resources;
-
-		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
-			sap, rpc_get_port(sap),
-			ia->ri_device->name,
-			ia->ri_ops->ro_displayname,
-			xprt->rx_buf.rb_max_requests,
-			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
-	} else if (connstate < 0) {
-		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
-			sap, rpc_get_port(sap), connstate);
-	}
-#endif
-
 	return 0;
 }
 
@@ -971,7 +943,6 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 	if (req == NULL)
 		return ERR_PTR(-ENOMEM);
 
-	INIT_LIST_HEAD(&req->rl_free);
 	spin_lock(&buffer->rb_reqslock);
 	list_add(&req->rl_all, &buffer->rb_allreqs);
 	spin_unlock(&buffer->rb_reqslock);
@@ -1033,6 +1004,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 	spin_lock_init(&buf->rb_recovery_lock);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
+	INIT_LIST_HEAD(&buf->rb_pending);
 	INIT_LIST_HEAD(&buf->rb_stale_mrs);
 	INIT_DELAYED_WORK(&buf->rb_refresh_worker,
 			  rpcrdma_mr_refresh_worker);
@@ -1055,7 +1027,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 			goto out;
 		}
 		req->rl_backchannel = false;
-		list_add(&req->rl_free, &buf->rb_send_bufs);
+		list_add(&req->rl_list, &buf->rb_send_bufs);
 	}
 
 	INIT_LIST_HEAD(&buf->rb_recv_bufs);
@@ -1084,8 +1056,8 @@ rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
 	struct rpcrdma_req *req;
 
 	req = list_first_entry(&buf->rb_send_bufs,
-			       struct rpcrdma_req, rl_free);
-	list_del(&req->rl_free);
+			       struct rpcrdma_req, rl_list);
+	list_del_init(&req->rl_list);
 	return req;
 }
 
@@ -1187,6 +1159,7 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 
 	if (!mw)
 		goto out_nomws;
+	mw->mw_flags = 0;
 	return mw;
 
 out_nomws:
@@ -1267,7 +1240,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
 
 	spin_lock(&buffers->rb_lock);
 	buffers->rb_send_count--;
-	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+	list_add_tail(&req->rl_list, &buffers->rb_send_bufs);
 	if (rep) {
 		buffers->rb_recv_count--;
 		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 1d66acf1a723..b282d3f8cdd8 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -271,6 +271,7 @@ struct rpcrdma_mw {
 	struct scatterlist	*mw_sg;
 	int			mw_nents;
 	enum dma_data_direction	mw_dir;
+	unsigned long		mw_flags;
 	union {
 		struct rpcrdma_fmr	fmr;
 		struct rpcrdma_frmr	frmr;
@@ -282,6 +283,11 @@ struct rpcrdma_mw {
 	struct list_head	mw_all;
 };
 
+/* mw_flags */
+enum {
+	RPCRDMA_MW_F_RI		= 1,
+};
+
 /*
  * struct rpcrdma_req -- structure central to the request/reply sequence.
  *
@@ -334,7 +340,8 @@ enum {
 
 struct rpcrdma_buffer;
 struct rpcrdma_req {
-	struct list_head	rl_free;
+	struct list_head	rl_list;
+	__be32			rl_xid;
 	unsigned int		rl_mapped_sges;
 	unsigned int		rl_connect_cookie;
 	struct rpcrdma_buffer	*rl_buffer;
@@ -396,6 +403,7 @@ struct rpcrdma_buffer {
 	int			rb_send_count, rb_recv_count;
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
+	struct list_head	rb_pending;
 	u32			rb_max_requests;
 	atomic_t		rb_credits;	/* most recent credit grant */
 
@@ -461,7 +469,7 @@ struct rpcrdma_memreg_ops {
 				  struct rpcrdma_mr_seg *, int, bool,
 				  struct rpcrdma_mw **);
 	void		(*ro_unmap_sync)(struct rpcrdma_xprt *,
-					 struct rpcrdma_req *);
+					 struct list_head *);
 	void		(*ro_unmap_safe)(struct rpcrdma_xprt *,
 					 struct rpcrdma_req *, bool);
 	void		(*ro_recover_mr)(struct rpcrdma_mw *);
@@ -544,6 +552,34 @@ void rpcrdma_destroy_req(struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
+static inline void
+rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	if (list_empty(&req->rl_list))
+		list_add_tail(&req->rl_list, &buffers->rb_pending);
+	spin_unlock(&buffers->rb_lock);
+}
+
+static inline struct rpcrdma_req *
+rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
+{
+	struct rpcrdma_req *pos;
+
+	list_for_each_entry(pos, &buffers->rb_pending, rl_list)
+		if (pos->rl_xid == xid)
+			return pos;
+	return NULL;
+}
+
+static inline void
+rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+	spin_lock(&buffers->rb_lock);
+	list_del(&req->rl_list);
+	spin_unlock(&buffers->rb_lock);
+}
+
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);