summary refs log tree commit diff
path: root/net/sunrpc
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 10:33:33 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2016-05-26 10:33:33 -0700
commitea8ea737c46cffa5d0ee74309f81e55a7e5e9c2a (patch)
treeae159b2c5968fa3c2a5a4ab7176584bc9a17b889 /net/sunrpc
parent0b9210c9c86e46a7a62bbc7b69b84001315072ff (diff)
parentc7d73af2d249f0323f5cdb171a59497ce80011fb (diff)
downloadlinux-ea8ea737c46cffa5d0ee74309f81e55a7e5e9c2a.tar.gz
Merge tag 'nfs-for-4.7-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker:
 "Highlights include:

  Features:
   - Add support for the NFS v4.2 COPY operation
   - Add support for NFS/RDMA over IPv6

  Bugfixes and cleanups:
   - Avoid race that crashes nfs_init_commit()
   - Fix oops in callback path
   - Fix LOCK/OPEN race when unlinking an open file
   - Choose correct stateids when using delegations in setattr, read and
     write
   - Don't send empty SETATTR after OPEN_CREATE
   - xprtrdma: Prevent server from writing a reply into memory client
     has released
   - xprtrdma: Support using Read list and Reply chunk in one RPC call"

* tag 'nfs-for-4.7-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (61 commits)
  pnfs: pnfs_update_layout needs to consider if strict iomode checking is on
  nfs/flexfiles: Use the layout segment for reading unless it a IOMODE_RW and reading is disabled
  nfs/flexfiles: Helper function to detect FF_FLAGS_NO_READ_IO
  nfs: avoid race that crashes nfs_init_commit
  NFS: checking for NULL instead of IS_ERR() in nfs_commit_file()
  pnfs: make pnfs_layout_process more robust
  pnfs: rework LAYOUTGET retry handling
  pnfs: lift retry logic from send_layoutget to pnfs_update_layout
  pnfs: fix bad error handling in send_layoutget
  flexfiles: add kerneldoc header to nfs4_ff_layout_prepare_ds
  flexfiles: remove pointless setting of NFS_LAYOUT_RETURN_REQUESTED
  pnfs: only tear down lsegs that precede seqid in LAYOUTRETURN args
  pnfs: keep track of the return sequence number in pnfs_layout_hdr
  pnfs: record sequence in pnfs_layout_segment when it's created
  pnfs: don't merge new ff lsegs with ones that have LAYOUTRETURN bit set
  pNFS/flexfiles: When initing reads or writes, we might have to retry connecting to DSes
  pNFS/flexfiles: When checking for available DSes, conditionally check for MDS io
  pNFS/flexfile: Fix erroneous fall back to read/write through the MDS
  NFS: Reclaim writes via writepage are opportunistic
  NFSv4: Use the right stateid for delegations in setattr, read and write
  ...
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth.c9
-rw-r--r--net/sunrpc/auth_generic.c13
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c6
-rw-r--r--net/sunrpc/auth_unix.c6
-rw-r--r--net/sunrpc/clnt.c17
-rw-r--r--net/sunrpc/xdr.c2
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c16
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c134
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c214
-rw-r--r--net/sunrpc/xprtrdma/physical_ops.c39
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c517
-rw-r--r--net/sunrpc/xprtrdma/transport.c16
-rw-r--r--net/sunrpc/xprtrdma/verbs.c78
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h47
-rw-r--r--net/sunrpc/xprtsock.c6
15 files changed, 676 insertions, 444 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 02f53674dc39..040ff627c18a 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -543,7 +543,7 @@ rpcauth_cache_enforce_limit(void)
  */
 struct rpc_cred *
 rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
-		int flags)
+		int flags, gfp_t gfp)
 {
 	LIST_HEAD(free);
 	struct rpc_cred_cache *cache = auth->au_credcache;
@@ -580,7 +580,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
 	if (flags & RPCAUTH_LOOKUP_RCU)
 		return ERR_PTR(-ECHILD);
 
-	new = auth->au_ops->crcreate(auth, acred, flags);
+	new = auth->au_ops->crcreate(auth, acred, flags, gfp);
 	if (IS_ERR(new)) {
 		cred = new;
 		goto out;
@@ -703,8 +703,7 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
 		new = rpcauth_bind_new_cred(task, lookupflags);
 	if (IS_ERR(new))
 		return PTR_ERR(new);
-	if (req->rq_cred != NULL)
-		put_rpccred(req->rq_cred);
+	put_rpccred(req->rq_cred);
 	req->rq_cred = new;
 	return 0;
 }
@@ -712,6 +711,8 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
 void
 put_rpccred(struct rpc_cred *cred)
 {
+	if (cred == NULL)
+		return;
 	/* Fast path for unhashed credentials */
 	if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) {
 		if (atomic_dec_and_test(&cred->cr_count))
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 41248b1820c7..54dd3fdead54 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -38,6 +38,13 @@ struct rpc_cred *rpc_lookup_cred(void)
 }
 EXPORT_SYMBOL_GPL(rpc_lookup_cred);
 
+struct rpc_cred *
+rpc_lookup_generic_cred(struct auth_cred *acred, int flags, gfp_t gfp)
+{
+	return rpcauth_lookup_credcache(&generic_auth, acred, flags, gfp);
+}
+EXPORT_SYMBOL_GPL(rpc_lookup_generic_cred);
+
 struct rpc_cred *rpc_lookup_cred_nonblock(void)
 {
 	return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU);
@@ -77,15 +84,15 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task,
 static struct rpc_cred *
 generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 {
-	return rpcauth_lookup_credcache(&generic_auth, acred, flags);
+	return rpcauth_lookup_credcache(&generic_auth, acred, flags, GFP_KERNEL);
 }
 
 static struct rpc_cred *
-generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
 {
 	struct generic_cred *gcred;
 
-	gcred = kmalloc(sizeof(*gcred), GFP_KERNEL);
+	gcred = kmalloc(sizeof(*gcred), gfp);
 	if (gcred == NULL)
 		return ERR_PTR(-ENOMEM);
 
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 15612ffa8d57..e64ae93d5b4f 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1299,11 +1299,11 @@ gss_destroy_cred(struct rpc_cred *cred)
 static struct rpc_cred *
 gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 {
-	return rpcauth_lookup_credcache(auth, acred, flags);
+	return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
 }
 
 static struct rpc_cred *
-gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
 {
 	struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth);
 	struct gss_cred	*cred = NULL;
@@ -1313,7 +1313,7 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 		__func__, from_kuid(&init_user_ns, acred->uid),
 		auth->au_flavor);
 
-	if (!(cred = kzalloc(sizeof(*cred), GFP_NOFS)))
+	if (!(cred = kzalloc(sizeof(*cred), gfp)))
 		goto out_err;
 
 	rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops);
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 0d3dd364c22f..9f65452b7cbc 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -52,11 +52,11 @@ unx_destroy(struct rpc_auth *auth)
 static struct rpc_cred *
 unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 {
-	return rpcauth_lookup_credcache(auth, acred, flags);
+	return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
 }
 
 static struct rpc_cred *
-unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
 {
 	struct unx_cred	*cred;
 	unsigned int groups = 0;
@@ -66,7 +66,7 @@ unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 			from_kuid(&init_user_ns, acred->uid),
 			from_kgid(&init_user_ns, acred->gid));
 
-	if (!(cred = kmalloc(sizeof(*cred), GFP_NOFS)))
+	if (!(cred = kmalloc(sizeof(*cred), gfp)))
 		return ERR_PTR(-ENOMEM);
 
 	rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 7e0c9bf22df8..06b4df9faaa1 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1414,6 +1414,23 @@ size_t rpc_max_payload(struct rpc_clnt *clnt)
 EXPORT_SYMBOL_GPL(rpc_max_payload);
 
 /**
+ * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
+ * @clnt: RPC client to query
+ */
+size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
+{
+	struct rpc_xprt *xprt;
+	size_t ret;
+
+	rcu_read_lock();
+	xprt = rcu_dereference(clnt->cl_xprt);
+	ret = xprt->ops->bc_maxpayload(xprt);
+	rcu_read_unlock();
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
+
+/**
  * rpc_get_timeout - Get timeout for transport in units of HZ
  * @clnt: RPC client to query
  */
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 6bdb3865212d..c4f3cc0c0775 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -797,6 +797,8 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
 		xdr_set_iov(xdr, buf->head, buf->len);
 	else if (buf->page_len != 0)
 		xdr_set_page_base(xdr, 0, buf->len);
+	else
+		xdr_set_iov(xdr, buf->head, buf->len);
 	if (p != NULL && p > xdr->p && xdr->end >= p) {
 		xdr->nwords -= p - xdr->p;
 		xdr->p = p;
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 2dcd7640eeb5..87762d976b63 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -192,6 +192,22 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
 }
 
 /**
+ * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
+ * @xprt: transport
+ *
+ * Returns maximum size, in bytes, of a backchannel message
+ */
+size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+	size_t maxmsg;
+
+	maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
+	return maxmsg - RPCRDMA_HDRLEN_MIN;
+}
+
+/**
  * rpcrdma_bc_marshal_reply - Send backwards direction reply
  * @rqst: buffer containing RPC reply data
  *
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index b289e106540b..6326ebe8b595 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -35,10 +35,71 @@
 /* Maximum scatter/gather per FMR */
 #define RPCRDMA_MAX_FMR_SGES	(64)
 
+static struct workqueue_struct *fmr_recovery_wq;
+
+#define FMR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND)
+
+int
+fmr_alloc_recovery_wq(void)
+{
+	fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
+	return !fmr_recovery_wq ? -ENOMEM : 0;
+}
+
+void
+fmr_destroy_recovery_wq(void)
+{
+	struct workqueue_struct *wq;
+
+	if (!fmr_recovery_wq)
+		return;
+
+	wq = fmr_recovery_wq;
+	fmr_recovery_wq = NULL;
+	destroy_workqueue(wq);
+}
+
+static int
+__fmr_unmap(struct rpcrdma_mw *mw)
+{
+	LIST_HEAD(l);
+
+	list_add(&mw->fmr.fmr->list, &l);
+	return ib_unmap_fmr(&l);
+}
+
+/* Deferred reset of a single FMR. Generate a fresh rkey by
+ * replacing the MR. There's no recovery if this fails.
+ */
+static void
+__fmr_recovery_worker(struct work_struct *work)
+{
+	struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
+					    mw_work);
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+
+	__fmr_unmap(mw);
+	rpcrdma_put_mw(r_xprt, mw);
+	return;
+}
+
+/* A broken MR was discovered in a context that can't sleep.
+ * Defer recovery to the recovery worker.
+ */
+static void
+__fmr_queue_recovery(struct rpcrdma_mw *mw)
+{
+	INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
+	queue_work(fmr_recovery_wq, &mw->mw_work);
+}
+
 static int
 fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 	    struct rpcrdma_create_data_internal *cdata)
 {
+	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
+						      RPCRDMA_MAX_DATA_SEGS /
+						      RPCRDMA_MAX_FMR_SGES));
 	return 0;
 }
 
@@ -48,7 +109,7 @@ static size_t
 fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 {
 	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES);
+		     RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
 }
 
 static int
@@ -89,6 +150,7 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
 		if (IS_ERR(r->fmr.fmr))
 			goto out_fmr_err;
 
+		r->mw_xprt = r_xprt;
 		list_add(&r->mw_list, &buf->rb_mws);
 		list_add(&r->mw_all, &buf->rb_all);
 	}
@@ -104,15 +166,6 @@ out:
 	return rc;
 }
 
-static int
-__fmr_unmap(struct rpcrdma_mw *r)
-{
-	LIST_HEAD(l);
-
-	list_add(&r->fmr.fmr->list, &l);
-	return ib_unmap_fmr(&l);
-}
-
 /* Use the ib_map_phys_fmr() verb to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
@@ -183,15 +236,10 @@ static void
 __fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
 {
 	struct ib_device *device = r_xprt->rx_ia.ri_device;
-	struct rpcrdma_mw *mw = seg->rl_mw;
 	int nsegs = seg->mr_nsegs;
 
-	seg->rl_mw = NULL;
-
 	while (nsegs--)
 		rpcrdma_unmap_one(device, seg++);
-
-	rpcrdma_put_mw(r_xprt, mw);
 }
 
 /* Invalidate all memory regions that were registered for "req".
@@ -234,42 +282,50 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 		seg = &req->rl_segments[i];
 
 		__fmr_dma_unmap(r_xprt, seg);
+		rpcrdma_put_mw(r_xprt, seg->rl_mw);
 
 		i += seg->mr_nsegs;
 		seg->mr_nsegs = 0;
+		seg->rl_mw = NULL;
 	}
 
 	req->rl_nchunks = 0;
 }
 
-/* Use the ib_unmap_fmr() verb to prevent further remote
- * access via RDMA READ or RDMA WRITE.
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ *
+ * In the asynchronous case, DMA unmapping occurs first here
+ * because the rpcrdma_mr_seg is released immediately after this
+ * call. It's contents won't be available in __fmr_dma_unmap later.
+ * FIXME.
  */
-static int
-fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
+static void
+fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		  bool sync)
 {
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_mr_seg *seg1 = seg;
-	struct rpcrdma_mw *mw = seg1->rl_mw;
-	int rc, nsegs = seg->mr_nsegs;
+	struct rpcrdma_mr_seg *seg;
+	struct rpcrdma_mw *mw;
+	unsigned int i;
 
-	dprintk("RPC:       %s: FMR %p\n", __func__, mw);
+	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
+		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
 
-	seg1->rl_mw = NULL;
-	while (seg1->mr_nsegs--)
-		rpcrdma_unmap_one(ia->ri_device, seg++);
-	rc = __fmr_unmap(mw);
-	if (rc)
-		goto out_err;
-	rpcrdma_put_mw(r_xprt, mw);
-	return nsegs;
+		if (sync) {
+			/* ORDER */
+			__fmr_unmap(mw);
+			__fmr_dma_unmap(r_xprt, seg);
+			rpcrdma_put_mw(r_xprt, mw);
+		} else {
+			__fmr_dma_unmap(r_xprt, seg);
+			__fmr_queue_recovery(mw);
+		}
 
-out_err:
-	/* The FMR is abandoned, but remains in rb_all. fmr_op_destroy
-	 * will attempt to release it when the transport is destroyed.
-	 */
-	dprintk("RPC:       %s: ib_unmap_fmr status %i\n", __func__, rc);
-	return nsegs;
+		i += seg->mr_nsegs;
+		seg->mr_nsegs = 0;
+		seg->rl_mw = NULL;
+	}
 }
 
 static void
@@ -295,7 +351,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
 const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
 	.ro_map				= fmr_op_map,
 	.ro_unmap_sync			= fmr_op_unmap_sync,
-	.ro_unmap			= fmr_op_unmap,
+	.ro_unmap_safe			= fmr_op_unmap_safe,
 	.ro_open			= fmr_op_open,
 	.ro_maxpages			= fmr_op_maxpages,
 	.ro_init			= fmr_op_init,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 94c3fa910b85..c0947544babe 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -98,6 +98,47 @@ frwr_destroy_recovery_wq(void)
 	destroy_workqueue(wq);
 }
 
+static int
+__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
+{
+	struct rpcrdma_frmr *f = &r->frmr;
+	int rc;
+
+	rc = ib_dereg_mr(f->fr_mr);
+	if (rc) {
+		pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
+			rc, r);
+		return rc;
+	}
+
+	f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
+			       ia->ri_max_frmr_depth);
+	if (IS_ERR(f->fr_mr)) {
+		pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
+			PTR_ERR(f->fr_mr), r);
+		return PTR_ERR(f->fr_mr);
+	}
+
+	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
+	f->fr_state = FRMR_IS_INVALID;
+	return 0;
+}
+
+static void
+__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_frmr *f = &mw->frmr;
+	int rc;
+
+	rc = __frwr_reset_mr(ia, mw);
+	ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir);
+	if (rc)
+		return;
+
+	rpcrdma_put_mw(r_xprt, mw);
+}
+
 /* Deferred reset of a single FRMR. Generate a fresh rkey by
  * replacing the MR.
  *
@@ -109,26 +150,10 @@ static void
 __frwr_recovery_worker(struct work_struct *work)
 {
 	struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-					    frmr.fr_work);
-	struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
-	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
-	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
-
-	if (ib_dereg_mr(r->frmr.fr_mr))
-		goto out_fail;
+					    mw_work);
 
-	r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
-	if (IS_ERR(r->frmr.fr_mr))
-		goto out_fail;
-
-	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
-	r->frmr.fr_state = FRMR_IS_INVALID;
-	rpcrdma_put_mw(r_xprt, r);
+	__frwr_reset_and_unmap(r->mw_xprt, r);
 	return;
-
-out_fail:
-	pr_warn("RPC:       %s: FRMR %p unrecovered\n",
-		__func__, r);
 }
 
 /* A broken MR was discovered in a context that can't sleep.
@@ -137,8 +162,8 @@ out_fail:
 static void
 __frwr_queue_recovery(struct rpcrdma_mw *r)
 {
-	INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
-	queue_work(frwr_recovery_wq, &r->frmr.fr_work);
+	INIT_WORK(&r->mw_work, __frwr_recovery_worker);
+	queue_work(frwr_recovery_wq, &r->mw_work);
 }
 
 static int
@@ -152,11 +177,11 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
 	if (IS_ERR(f->fr_mr))
 		goto out_mr_err;
 
-	f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL);
-	if (!f->sg)
+	f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL);
+	if (!f->fr_sg)
 		goto out_list_err;
 
-	sg_init_table(f->sg, depth);
+	sg_init_table(f->fr_sg, depth);
 
 	init_completion(&f->fr_linv_done);
 
@@ -185,7 +210,7 @@ __frwr_release(struct rpcrdma_mw *r)
 	if (rc)
 		dprintk("RPC:       %s: ib_dereg_mr status %i\n",
 			__func__, rc);
-	kfree(r->frmr.sg);
+	kfree(r->frmr.fr_sg);
 }
 
 static int
@@ -231,6 +256,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 					       depth;
 	}
 
+	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
+						      RPCRDMA_MAX_DATA_SEGS /
+						      ia->ri_max_frmr_depth));
 	return 0;
 }
 
@@ -243,7 +271,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
 	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
+		     RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth);
 }
 
 static void
@@ -350,9 +378,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
 			return rc;
 		}
 
+		r->mw_xprt = r_xprt;
 		list_add(&r->mw_list, &buf->rb_mws);
 		list_add(&r->mw_all, &buf->rb_all);
-		r->frmr.fr_xprt = r_xprt;
 	}
 
 	return 0;
@@ -396,12 +424,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
 	for (i = 0; i < nsegs;) {
 		if (seg->mr_page)
-			sg_set_page(&frmr->sg[i],
+			sg_set_page(&frmr->fr_sg[i],
 				    seg->mr_page,
 				    seg->mr_len,
 				    offset_in_page(seg->mr_offset));
 		else
-			sg_set_buf(&frmr->sg[i], seg->mr_offset,
+			sg_set_buf(&frmr->fr_sg[i], seg->mr_offset,
 				   seg->mr_len);
 
 		++seg;
@@ -412,25 +440,26 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
 	}
-	frmr->sg_nents = i;
+	frmr->fr_nents = i;
+	frmr->fr_dir = direction;
 
-	dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction);
+	dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction);
 	if (!dma_nents) {
 		pr_err("RPC:       %s: failed to dma map sg %p sg_nents %u\n",
-		       __func__, frmr->sg, frmr->sg_nents);
+		       __func__, frmr->fr_sg, frmr->fr_nents);
 		return -ENOMEM;
 	}
 
-	n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE);
-	if (unlikely(n != frmr->sg_nents)) {
+	n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE);
+	if (unlikely(n != frmr->fr_nents)) {
 		pr_err("RPC:       %s: failed to map mr %p (%u/%u)\n",
-		       __func__, frmr->fr_mr, n, frmr->sg_nents);
+		       __func__, frmr->fr_mr, n, frmr->fr_nents);
 		rc = n < 0 ? n : -EINVAL;
 		goto out_senderr;
 	}
 
 	dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
-		__func__, mw, frmr->sg_nents, mr->length);
+		__func__, mw, frmr->fr_nents, mr->length);
 
 	key = (u8)(mr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(mr, ++key);
@@ -452,18 +481,16 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	if (rc)
 		goto out_senderr;
 
-	seg1->mr_dir = direction;
 	seg1->rl_mw = mw;
 	seg1->mr_rkey = mr->rkey;
 	seg1->mr_base = mr->iova;
-	seg1->mr_nsegs = frmr->sg_nents;
+	seg1->mr_nsegs = frmr->fr_nents;
 	seg1->mr_len = mr->length;
 
-	return frmr->sg_nents;
+	return frmr->fr_nents;
 
 out_senderr:
 	dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-	ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
 	__frwr_queue_recovery(mw);
 	return rc;
 }
@@ -487,24 +514,6 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
 	return invalidate_wr;
 }
 
-static void
-__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-		 int rc)
-{
-	struct ib_device *device = r_xprt->rx_ia.ri_device;
-	struct rpcrdma_mw *mw = seg->rl_mw;
-	struct rpcrdma_frmr *f = &mw->frmr;
-
-	seg->rl_mw = NULL;
-
-	ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir);
-
-	if (!rc)
-		rpcrdma_put_mw(r_xprt, mw);
-	else
-		__frwr_queue_recovery(mw);
-}
-
 /* Invalidate all memory regions that were registered for "req".
  *
  * Sleeps until it is safe for the host CPU to access the
@@ -518,6 +527,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	struct rpcrdma_mr_seg *seg;
 	unsigned int i, nchunks;
 	struct rpcrdma_frmr *f;
+	struct rpcrdma_mw *mw;
 	int rc;
 
 	dprintk("RPC:       %s: req %p\n", __func__, req);
@@ -558,11 +568,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * unless ri_id->qp is a valid pointer.
 	 */
 	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
-	if (rc) {
-		pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
-		rdma_disconnect(ia->ri_id);
-		goto unmap;
-	}
+	if (rc)
+		goto reset_mrs;
 
 	wait_for_completion(&f->fr_linv_done);
 
@@ -572,56 +579,65 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 unmap:
 	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
 		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
+		seg->rl_mw = NULL;
 
-		__frwr_dma_unmap(r_xprt, seg, rc);
+		ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents,
+				f->fr_dir);
+		rpcrdma_put_mw(r_xprt, mw);
 
 		i += seg->mr_nsegs;
 		seg->mr_nsegs = 0;
 	}
 
 	req->rl_nchunks = 0;
-}
+	return;
 
-/* Post a LOCAL_INV Work Request to prevent further remote access
- * via RDMA READ or RDMA WRITE.
- */
-static int
-frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
-	struct rpcrdma_mr_seg *seg1 = seg;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_mw *mw = seg1->rl_mw;
-	struct rpcrdma_frmr *frmr = &mw->frmr;
-	struct ib_send_wr *invalidate_wr, *bad_wr;
-	int rc, nsegs = seg->mr_nsegs;
+reset_mrs:
+	pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
 
-	dprintk("RPC:       %s: FRMR %p\n", __func__, mw);
+	/* Find and reset the MRs in the LOCAL_INV WRs that did not
+	 * get posted. This is synchronous, and slow.
+	 */
+	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
+		f = &mw->frmr;
 
-	seg1->rl_mw = NULL;
-	frmr->fr_state = FRMR_IS_INVALID;
-	invalidate_wr = &mw->frmr.fr_invwr;
+		if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
+			__frwr_reset_mr(ia, mw);
+			bad_wr = bad_wr->next;
+		}
 
-	memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-	frmr->fr_cqe.done = frwr_wc_localinv;
-	invalidate_wr->wr_cqe = &frmr->fr_cqe;
-	invalidate_wr->opcode = IB_WR_LOCAL_INV;
-	invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
-	DECR_CQCOUNT(&r_xprt->rx_ep);
+		i += seg->mr_nsegs;
+	}
+	goto unmap;
+}
 
-	ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
-	read_lock(&ia->ri_qplock);
-	rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr);
-	read_unlock(&ia->ri_qplock);
-	if (rc)
-		goto out_err;
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ */
+static void
+frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		   bool sync)
+{
+	struct rpcrdma_mr_seg *seg;
+	struct rpcrdma_mw *mw;
+	unsigned int i;
 
-	rpcrdma_put_mw(r_xprt, mw);
-	return nsegs;
+	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
+		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
 
-out_err:
-	dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-	__frwr_queue_recovery(mw);
-	return nsegs;
+		if (sync)
+			__frwr_reset_and_unmap(r_xprt, mw);
+		else
+			__frwr_queue_recovery(mw);
+
+		i += seg->mr_nsegs;
+		seg->mr_nsegs = 0;
+		seg->rl_mw = NULL;
+	}
 }
 
 static void
@@ -643,7 +659,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
 const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
 	.ro_map				= frwr_op_map,
 	.ro_unmap_sync			= frwr_op_unmap_sync,
-	.ro_unmap			= frwr_op_unmap,
+	.ro_unmap_safe			= frwr_op_unmap_safe,
 	.ro_open			= frwr_op_open,
 	.ro_maxpages			= frwr_op_maxpages,
 	.ro_init			= frwr_op_init,
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index 481b9b6f4a15..3750596cc432 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 		       __func__, PTR_ERR(mr));
 		return -ENOMEM;
 	}
-
 	ia->ri_dma_mr = mr;
+
+	rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int,
+						      RPCRDMA_MAX_DATA_SEGS,
+						      RPCRDMA_MAX_HDR_SEGS));
 	return 0;
 }
 
@@ -47,7 +50,7 @@ static size_t
 physical_op_maxpages(struct rpcrdma_xprt *r_xprt)
 {
 	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     rpcrdma_max_segments(r_xprt));
+		     RPCRDMA_MAX_HDR_SEGS);
 }
 
 static int
@@ -71,17 +74,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	return 1;
 }
 
-/* Unmap a memory region, but leave it registered.
- */
-static int
-physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
-	rpcrdma_unmap_one(ia->ri_device, seg);
-	return 1;
-}
-
 /* DMA unmap all memory regions that were mapped for "req".
  */
 static void
@@ -94,6 +86,25 @@ physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 		rpcrdma_unmap_one(device, &req->rl_segments[i++]);
 }
 
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ *
+ * For physical memory registration, there is no good way to
+ * fence a single MR that has been advertised to the server. The
+ * client has already handed the server an R_key that cannot be
+ * invalidated and is shared by all MRs on this connection.
+ * Tearing down the PD might be the only safe choice, but it's
+ * not clear that a freshly acquired DMA R_key would be different
+ * than the one used by the PD that was just destroyed.
+ * FIXME.
+ */
+static void
+physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		       bool sync)
+{
+	physical_op_unmap_sync(r_xprt, req);
+}
+
 static void
 physical_op_destroy(struct rpcrdma_buffer *buf)
 {
@@ -102,7 +113,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf)
 const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
 	.ro_map				= physical_op_map,
 	.ro_unmap_sync			= physical_op_unmap_sync,
-	.ro_unmap			= physical_op_unmap,
+	.ro_unmap_safe			= physical_op_unmap_safe,
 	.ro_open			= physical_op_open,
 	.ro_maxpages			= physical_op_maxpages,
 	.ro_init			= physical_op_init,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 888823bb6dae..35a81096e83d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -61,26 +61,84 @@ enum rpcrdma_chunktype {
 	rpcrdma_replych
 };
 
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 static const char transfertypes[][12] = {
-	"pure inline",	/* no chunks */
-	" read chunk",	/* some argument via rdma read */
-	"*read chunk",	/* entire request via rdma read */
-	"write chunk",	/* some result via rdma write */
+	"inline",	/* no chunks */
+	"read list",	/* some argument via rdma read */
+	"*read list",	/* entire request via rdma read */
+	"write list",	/* some result via rdma write */
 	"reply chunk"	/* entire reply via rdma write */
 };
-#endif
+
+/* Returns size of largest RPC-over-RDMA header in a Call message
+ *
+ * The largest Call header contains a full-size Read list and a
+ * minimal Reply chunk.
+ */
+static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
+{
+	unsigned int size;
+
+	/* Fixed header fields and list discriminators */
+	size = RPCRDMA_HDRLEN_MIN;
+
+	/* Maximum Read list size */
+	maxsegs += 2;	/* segment for head and tail buffers */
+	size = maxsegs * sizeof(struct rpcrdma_read_chunk);
+
+	/* Minimal Read chunk size */
+	size += sizeof(__be32);	/* segment count */
+	size += sizeof(struct rpcrdma_segment);
+	size += sizeof(__be32);	/* list discriminator */
+
+	dprintk("RPC:       %s: max call header size = %u\n",
+		__func__, size);
+	return size;
+}
+
+/* Returns size of largest RPC-over-RDMA header in a Reply message
+ *
+ * There is only one Write list or one Reply chunk per Reply
+ * message.  The larger list is the Write list.
+ */
+static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
+{
+	unsigned int size;
+
+	/* Fixed header fields and list discriminators */
+	size = RPCRDMA_HDRLEN_MIN;
+
+	/* Maximum Write list size */
+	maxsegs += 2;	/* segment for head and tail buffers */
+	size = sizeof(__be32);		/* segment count */
+	size += maxsegs * sizeof(struct rpcrdma_segment);
+	size += sizeof(__be32);	/* list discriminator */
+
+	dprintk("RPC:       %s: max reply header size = %u\n",
+		__func__, size);
+	return size;
+}
+
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
+				  struct rpcrdma_create_data_internal *cdata,
+				  unsigned int maxsegs)
+{
+	ia->ri_max_inline_write = cdata->inline_wsize -
+				  rpcrdma_max_call_header_size(maxsegs);
+	ia->ri_max_inline_read = cdata->inline_rsize -
+				 rpcrdma_max_reply_header_size(maxsegs);
+}
 
 /* The client can send a request inline as long as the RPCRDMA header
  * plus the RPC call fit under the transport's inline limit. If the
  * combined call message size exceeds that limit, the client must use
  * the read chunk list for this operation.
  */
-static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
+static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
+				struct rpc_rqst *rqst)
 {
-	unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+	return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
 }
 
 /* The client can't know how large the actual reply will be. Thus it
@@ -89,11 +147,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
  * limit, the client must provide a write list or a reply chunk for
  * this request.
  */
-static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
+static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
+				   struct rpc_rqst *rqst)
 {
-	unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
 }
 
 static int
@@ -226,23 +285,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 	return n;
 }
 
-/*
- * Create read/write chunk lists, and reply chunks, for RDMA
- *
- *   Assume check against THRESHOLD has been done, and chunks are required.
- *   Assume only encoding one list entry for read|write chunks. The NFSv3
- *     protocol is simple enough to allow this as it only has a single "bulk
- *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
- *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
- *
- * When used for a single reply chunk (which is a special write
- * chunk used for the entire reply, rather than just the data), it
- * is used primarily for READDIR and READLINK which would otherwise
- * be severely size-limited by a small rdma inline read max. The server
- * response will come back as an RDMA Write, followed by a message
- * of type RDMA_NOMSG carrying the xid and length. As a result, reply
- * chunks do not provide data alignment, however they do not require
- * "fixup" (moving the response to the upper layer buffer) either.
+static inline __be32 *
+xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
+{
+	*iptr++ = cpu_to_be32(seg->mr_rkey);
+	*iptr++ = cpu_to_be32(seg->mr_len);
+	return xdr_encode_hyper(iptr, seg->mr_base);
+}
+
+/* XDR-encode the Read list. Supports encoding a list of read
+ * segments that belong to a single read chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  *
@@ -250,131 +302,190 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
  *   N elements, position P (same P for all chunks of same arg!):
  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
  *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Read list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+			 struct rpcrdma_req *req, struct rpc_rqst *rqst,
+			 __be32 *iptr, enum rpcrdma_chunktype rtype)
+{
+	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	unsigned int pos;
+	int n, nsegs;
+
+	if (rtype == rpcrdma_noch) {
+		*iptr++ = xdr_zero;	/* item not present */
+		return iptr;
+	}
+
+	pos = rqst->rq_snd_buf.head[0].iov_len;
+	if (rtype == rpcrdma_areadch)
+		pos = 0;
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
+				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
+	if (nsegs < 0)
+		return ERR_PTR(nsegs);
+
+	do {
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
+		if (n <= 0)
+			return ERR_PTR(n);
+
+		*iptr++ = xdr_one;	/* item present */
+
+		/* All read segments in this chunk
+		 * have the same "position".
+		 */
+		*iptr++ = cpu_to_be32(pos);
+		iptr = xdr_encode_rdma_segment(iptr, seg);
+
+		dprintk("RPC: %5u %s: read segment pos %u "
+			"%d@0x%016llx:0x%08x (%s)\n",
+			rqst->rq_task->tk_pid, __func__, pos,
+			seg->mr_len, (unsigned long long)seg->mr_base,
+			seg->mr_rkey, n < nsegs ? "more" : "last");
+
+		r_xprt->rx_stats.read_chunk_count++;
+		req->rl_nchunks++;
+		seg += n;
+		nsegs -= n;
+	} while (nsegs);
+	req->rl_nextseg = seg;
+
+	/* Finish Read list */
+	*iptr++ = xdr_zero;	/* Next item not present */
+	return iptr;
+}
+
+/* XDR-encode the Write list. Supports encoding a list containing
+ * one array of plain segments that belong to a single write chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
  *  Write chunklist (a list of (one) counted array):
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO - 0
  *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Write list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+			  struct rpc_rqst *rqst, __be32 *iptr,
+			  enum rpcrdma_chunktype wtype)
+{
+	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	int n, nsegs, nchunks;
+	__be32 *segcount;
+
+	if (wtype != rpcrdma_writech) {
+		*iptr++ = xdr_zero;	/* no Write list present */
+		return iptr;
+	}
+
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
+				     rqst->rq_rcv_buf.head[0].iov_len,
+				     wtype, seg,
+				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
+	if (nsegs < 0)
+		return ERR_PTR(nsegs);
+
+	*iptr++ = xdr_one;	/* Write list present */
+	segcount = iptr++;	/* save location of segment count */
+
+	nchunks = 0;
+	do {
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+		if (n <= 0)
+			return ERR_PTR(n);
+
+		iptr = xdr_encode_rdma_segment(iptr, seg);
+
+		dprintk("RPC: %5u %s: write segment "
+			"%d@0x016%llx:0x%08x (%s)\n",
+			rqst->rq_task->tk_pid, __func__,
+			seg->mr_len, (unsigned long long)seg->mr_base,
+			seg->mr_rkey, n < nsegs ? "more" : "last");
+
+		r_xprt->rx_stats.write_chunk_count++;
+		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+		req->rl_nchunks++;
+		nchunks++;
+		seg   += n;
+		nsegs -= n;
+	} while (nsegs);
+	req->rl_nextseg = seg;
+
+	/* Update count of segments in this Write chunk */
+	*segcount = cpu_to_be32(nchunks);
+
+	/* Finish Write list */
+	*iptr++ = xdr_zero;	/* Next item not present */
+	return iptr;
+}
+
+/* XDR-encode the Reply chunk. Supports encoding an array of plain
+ * segments that belong to a single write (reply) chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
  *  Reply chunk (a counted array):
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
  *
- * Returns positive RPC/RDMA header size, or negative errno.
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Reply chunk, or an error pointer.
  */
-
-static ssize_t
-rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
-		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
+static __be32 *
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+			   struct rpcrdma_req *req, struct rpc_rqst *rqst,
+			   __be32 *iptr, enum rpcrdma_chunktype wtype)
 {
-	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-	int n, nsegs, nchunks = 0;
-	unsigned int pos;
-	struct rpcrdma_mr_seg *seg = req->rl_segments;
-	struct rpcrdma_read_chunk *cur_rchunk = NULL;
-	struct rpcrdma_write_array *warray = NULL;
-	struct rpcrdma_write_chunk *cur_wchunk = NULL;
-	__be32 *iptr = headerp->rm_body.rm_chunks;
-	int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool);
-
-	if (type == rpcrdma_readch || type == rpcrdma_areadch) {
-		/* a read chunk - server will RDMA Read our memory */
-		cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
-	} else {
-		/* a write or reply chunk - server will RDMA Write our memory */
-		*iptr++ = xdr_zero;	/* encode a NULL read chunk list */
-		if (type == rpcrdma_replych)
-			*iptr++ = xdr_zero;	/* a NULL write chunk list */
-		warray = (struct rpcrdma_write_array *) iptr;
-		cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
-	}
+	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	int n, nsegs, nchunks;
+	__be32 *segcount;
 
-	if (type == rpcrdma_replych || type == rpcrdma_areadch)
-		pos = 0;
-	else
-		pos = target->head[0].iov_len;
+	if (wtype != rpcrdma_replych) {
+		*iptr++ = xdr_zero;	/* no Reply chunk present */
+		return iptr;
+	}
 
-	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
 	if (nsegs < 0)
-		return nsegs;
+		return ERR_PTR(nsegs);
 
-	map = r_xprt->rx_ia.ri_ops->ro_map;
+	*iptr++ = xdr_one;	/* Reply chunk present */
+	segcount = iptr++;	/* save location of segment count */
+
+	nchunks = 0;
 	do {
-		n = map(r_xprt, seg, nsegs, cur_wchunk != NULL);
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
 		if (n <= 0)
-			goto out;
-		if (cur_rchunk) {	/* read */
-			cur_rchunk->rc_discrim = xdr_one;
-			/* all read chunks have the same "position" */
-			cur_rchunk->rc_position = cpu_to_be32(pos);
-			cur_rchunk->rc_target.rs_handle =
-						cpu_to_be32(seg->mr_rkey);
-			cur_rchunk->rc_target.rs_length =
-						cpu_to_be32(seg->mr_len);
-			xdr_encode_hyper(
-					(__be32 *)&cur_rchunk->rc_target.rs_offset,
-					seg->mr_base);
-			dprintk("RPC:       %s: read chunk "
-				"elem %d@0x%llx:0x%x pos %u (%s)\n", __func__,
-				seg->mr_len, (unsigned long long)seg->mr_base,
-				seg->mr_rkey, pos, n < nsegs ? "more" : "last");
-			cur_rchunk++;
-			r_xprt->rx_stats.read_chunk_count++;
-		} else {		/* write/reply */
-			cur_wchunk->wc_target.rs_handle =
-						cpu_to_be32(seg->mr_rkey);
-			cur_wchunk->wc_target.rs_length =
-						cpu_to_be32(seg->mr_len);
-			xdr_encode_hyper(
-					(__be32 *)&cur_wchunk->wc_target.rs_offset,
-					seg->mr_base);
-			dprintk("RPC:       %s: %s chunk "
-				"elem %d@0x%llx:0x%x (%s)\n", __func__,
-				(type == rpcrdma_replych) ? "reply" : "write",
-				seg->mr_len, (unsigned long long)seg->mr_base,
-				seg->mr_rkey, n < nsegs ? "more" : "last");
-			cur_wchunk++;
-			if (type == rpcrdma_replych)
-				r_xprt->rx_stats.reply_chunk_count++;
-			else
-				r_xprt->rx_stats.write_chunk_count++;
-			r_xprt->rx_stats.total_rdma_request += seg->mr_len;
-		}
+			return ERR_PTR(n);
+
+		iptr = xdr_encode_rdma_segment(iptr, seg);
+
+		dprintk("RPC: %5u %s: reply segment "
+			"%d@0x%016llx:0x%08x (%s)\n",
+			rqst->rq_task->tk_pid, __func__,
+			seg->mr_len, (unsigned long long)seg->mr_base,
+			seg->mr_rkey, n < nsegs ? "more" : "last");
+
+		r_xprt->rx_stats.reply_chunk_count++;
+		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+		req->rl_nchunks++;
 		nchunks++;
 		seg   += n;
 		nsegs -= n;
 	} while (nsegs);
+	req->rl_nextseg = seg;
 
-	/* success. all failures return above */
-	req->rl_nchunks = nchunks;
-
-	/*
-	 * finish off header. If write, marshal discrim and nchunks.
-	 */
-	if (cur_rchunk) {
-		iptr = (__be32 *) cur_rchunk;
-		*iptr++ = xdr_zero;	/* finish the read chunk list */
-		*iptr++ = xdr_zero;	/* encode a NULL write chunk list */
-		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */
-	} else {
-		warray->wc_discrim = xdr_one;
-		warray->wc_nchunks = cpu_to_be32(nchunks);
-		iptr = (__be32 *) cur_wchunk;
-		if (type == rpcrdma_writech) {
-			*iptr++ = xdr_zero; /* finish the write chunk list */
-			*iptr++ = xdr_zero; /* encode a NULL reply chunk */
-		}
-	}
-
-	/*
-	 * Return header size.
-	 */
-	return (unsigned char *)iptr - (unsigned char *)headerp;
+	/* Update count of segments in the Reply chunk */
+	*segcount = cpu_to_be32(nchunks);
 
-out:
-	for (pos = 0; nchunks--;)
-		pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
-						      &req->rl_segments[pos]);
-	return n;
+	return iptr;
 }
 
 /*
@@ -440,13 +551,10 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
  * Marshal a request: the primary job of this routine is to choose
  * the transfer modes. See comments below.
  *
- * Uses multiple RDMA IOVs for a request:
- *  [0] -- RPC RDMA header, which uses memory from the *start* of the
- *         preregistered buffer that already holds the RPC data in
- *         its middle.
- *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
- *  [2] -- optional padding.
- *  [3] -- if padded, header only in [1] and data here.
+ * Prepares up to two IOVs per Call message:
+ *
+ *  [0] -- RPC RDMA header
+ *  [1] -- the RPC header/data
  *
  * Returns zero on success, otherwise a negative errno.
  */
@@ -457,24 +565,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-	char *base;
-	size_t rpclen;
-	ssize_t hdrlen;
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
+	ssize_t hdrlen;
+	size_t rpclen;
+	__be32 *iptr;
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
 		return rpcrdma_bc_marshal_reply(rqst);
 #endif
 
-	/*
-	 * rpclen gets amount of data in first buffer, which is the
-	 * pre-registered buffer.
-	 */
-	base = rqst->rq_svec[0].iov_base;
-	rpclen = rqst->rq_svec[0].iov_len;
-
 	headerp = rdmab_to_msg(req->rl_rdmabuf);
 	/* don't byte-swap XID, it's already done in request */
 	headerp->rm_xid = rqst->rq_xid;
@@ -485,15 +586,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	/*
 	 * Chunks needed for results?
 	 *
-	 * o Read ops return data as write chunk(s), header as inline.
 	 * o If the expected result is under the inline threshold, all ops
 	 *   return as inline.
+	 * o Large read ops return data as write chunk(s), header as
+	 *   inline.
 	 * o Large non-read ops return as a single reply chunk.
 	 */
-	if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
-		wtype = rpcrdma_writech;
-	else if (rpcrdma_results_inline(rqst))
+	if (rpcrdma_results_inline(r_xprt, rqst))
 		wtype = rpcrdma_noch;
+	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;
 
@@ -511,10 +613,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * that both has a data payload, and whose non-data arguments
 	 * by themselves are larger than the inline threshold.
 	 */
-	if (rpcrdma_args_inline(rqst)) {
+	if (rpcrdma_args_inline(r_xprt, rqst)) {
 		rtype = rpcrdma_noch;
+		rpcrdma_inline_pullup(rqst);
+		rpclen = rqst->rq_svec[0].iov_len;
 	} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
 		rtype = rpcrdma_readch;
+		rpclen = rqst->rq_svec[0].iov_len;
+		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
 	} else {
 		r_xprt->rx_stats.nomsg_call_count++;
 		headerp->rm_type = htonl(RDMA_NOMSG);
@@ -522,57 +628,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 		rpclen = 0;
 	}
 
-	/* The following simplification is not true forever */
-	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
-		wtype = rpcrdma_noch;
-	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
-		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
-			__func__);
-		return -EIO;
-	}
-
-	hdrlen = RPCRDMA_HDRLEN_MIN;
-
-	/*
-	 * Pull up any extra send data into the preregistered buffer.
-	 * When padding is in use and applies to the transfer, insert
-	 * it and change the message type.
+	/* This implementation supports the following combinations
+	 * of chunk lists in one RPC-over-RDMA Call message:
+	 *
+	 *   - Read list
+	 *   - Write list
+	 *   - Reply chunk
+	 *   - Read list + Reply chunk
+	 *
+	 * It might not yet support the following combinations:
+	 *
+	 *   - Read list + Write list
+	 *
+	 * It does not support the following combinations:
+	 *
+	 *   - Write list + Reply chunk
+	 *   - Read list + Write list + Reply chunk
+	 *
+	 * This implementation supports only a single chunk in each
+	 * Read or Write list. Thus for example the client cannot
+	 * send a Call message with a Position Zero Read chunk and a
+	 * regular Read chunk at the same time.
 	 */
-	if (rtype == rpcrdma_noch) {
-
-		rpcrdma_inline_pullup(rqst);
-
-		headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
-		headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
-		headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
-		/* new length after pullup */
-		rpclen = rqst->rq_svec[0].iov_len;
-	} else if (rtype == rpcrdma_readch)
-		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
-	if (rtype != rpcrdma_noch) {
-		hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
-					       headerp, rtype);
-		wtype = rtype;	/* simplify dprintk */
-
-	} else if (wtype != rpcrdma_noch) {
-		hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
-					       headerp, wtype);
-	}
-	if (hdrlen < 0)
-		return hdrlen;
+	req->rl_nchunks = 0;
+	req->rl_nextseg = req->rl_segments;
+	iptr = headerp->rm_body.rm_chunks;
+	iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
+	if (IS_ERR(iptr))
+		goto out_unmap;
+	iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
+	if (IS_ERR(iptr))
+		goto out_unmap;
+	iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
+	if (IS_ERR(iptr))
+		goto out_unmap;
+	hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
+
+	if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+		goto out_overflow;
+
+	dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
+		rqst->rq_task->tk_pid, __func__,
+		transfertypes[rtype], transfertypes[wtype],
+		hdrlen, rpclen);
 
-	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd"
-		" headerp 0x%p base 0x%p lkey 0x%x\n",
-		__func__, transfertypes[wtype], hdrlen, rpclen,
-		headerp, base, rdmab_lkey(req->rl_rdmabuf));
-
-	/*
-	 * initialize send_iov's - normally only two: rdma chunk header and
-	 * single preregistered RPC header buffer, but if padding is present,
-	 * then use a preregistered (and zeroed) pad buffer between the RPC
-	 * header and any write data. In all non-rdma cases, any following
-	 * data has been copied into the RPC header buffer.
-	 */
 	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
 	req->rl_send_iov[0].length = hdrlen;
 	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
@@ -587,6 +686,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 
 	req->rl_niovs = 2;
 	return 0;
+
+out_overflow:
+	pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
+		hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
+	/* Terminate this RPC. Chunks registered above will be
+	 * released by xprt_release -> xprt_rmda_free .
+	 */
+	return -EIO;
+
+out_unmap:
+	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
+	return PTR_ERR(iptr);
 }
 
 /*
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index b1b009f10ea3..99d2e5b72726 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -73,6 +73,8 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
 
 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
+static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
+static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
 static unsigned int zero;
 static unsigned int max_padding = PAGE_SIZE;
 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
@@ -96,6 +98,8 @@ static struct ctl_table xr_tunables_table[] = {
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec,
+		.extra1		= &min_inline_size,
+		.extra2		= &max_inline_size,
 	},
 	{
 		.procname	= "rdma_max_inline_write",
@@ -103,6 +107,8 @@ static struct ctl_table xr_tunables_table[] = {
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec,
+		.extra1		= &min_inline_size,
+		.extra2		= &max_inline_size,
 	},
 	{
 		.procname	= "rdma_inline_write_padding",
@@ -508,6 +514,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
 out:
 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
 	req->rl_connect_cookie = 0;	/* our reserved value */
+	req->rl_task = task;
 	return req->rl_sendbuf->rg_base;
 
 out_rdmabuf:
@@ -564,7 +571,6 @@ xprt_rdma_free(void *buffer)
 	struct rpcrdma_req *req;
 	struct rpcrdma_xprt *r_xprt;
 	struct rpcrdma_regbuf *rb;
-	int i;
 
 	if (buffer == NULL)
 		return;
@@ -578,11 +584,8 @@ xprt_rdma_free(void *buffer)
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	for (i = 0; req->rl_nchunks;) {
-		--req->rl_nchunks;
-		i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
-						    &req->rl_segments[i]);
-	}
+	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
+					    !RPC_IS_ASYNC(req->rl_task));
 
 	rpcrdma_buffer_put(req);
 }
@@ -707,6 +710,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	.bc_setup		= xprt_rdma_bc_setup,
 	.bc_up			= xprt_rdma_bc_up,
+	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
 	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
 	.bc_destroy		= xprt_rdma_bc_destroy,
 #endif
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f5ed9f982cd7..b044d98a1370 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -203,15 +203,6 @@ out_fail:
 	goto out_schedule;
 }
 
-static void
-rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
-{
-	struct ib_wc wc;
-
-	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-		rpcrdma_receive_wc(NULL, &wc);
-}
-
 static int
 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 {
@@ -374,23 +365,6 @@ out:
 }
 
 /*
- * Drain any cq, prior to teardown.
- */
-static void
-rpcrdma_clean_cq(struct ib_cq *cq)
-{
-	struct ib_wc wc;
-	int count = 0;
-
-	while (1 == ib_poll_cq(cq, 1, &wc))
-		++count;
-
-	if (count)
-		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
-			__func__, count, wc.opcode);
-}
-
-/*
  * Exported functions.
  */
 
@@ -459,7 +433,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
 		__func__, ia->ri_ops->ro_displayname);
 
-	rwlock_init(&ia->ri_qplock);
 	return 0;
 
 out3:
@@ -515,7 +488,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 			__func__);
 		return -ENOMEM;
 	}
-	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS;
+	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
 
 	/* check provider's send/recv wr limits */
 	if (cdata->max_requests > max_qp_wr)
@@ -526,11 +499,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
+	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
 	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 	if (rc)
 		return rc;
 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
+	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
 	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
 	ep->rep_attr.cap.max_recv_sge = 1;
 	ep->rep_attr.cap.max_inline_data = 0;
@@ -578,6 +553,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_attr.recv_cq = recvcq;
 
 	/* Initialize cma parameters */
+	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 
 	/* RPC/RDMA does not use private data */
 	ep->rep_remote_cma.private_data = NULL;
@@ -591,7 +567,16 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 		ep->rep_remote_cma.responder_resources =
 						ia->ri_device->attrs.max_qp_rd_atom;
 
-	ep->rep_remote_cma.retry_count = 7;
+	/* Limit transport retries so client can detect server
+	 * GID changes quickly. RPC layer handles re-establishing
+	 * transport connection and retransmission.
+	 */
+	ep->rep_remote_cma.retry_count = 6;
+
+	/* RPC-over-RDMA handles its own flow control. In addition,
+	 * make all RNR NAKs visible so we know that RPC-over-RDMA
+	 * flow control is working correctly (no NAKs should be seen).
+	 */
 	ep->rep_remote_cma.flow_control = 0;
 	ep->rep_remote_cma.rnr_retry_count = 0;
 
@@ -622,13 +607,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
 	cancel_delayed_work_sync(&ep->rep_connect_worker);
 
-	if (ia->ri_id->qp)
-		rpcrdma_ep_disconnect(ep, ia);
-
-	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
-	rpcrdma_clean_cq(ep->rep_attr.send_cq);
-
 	if (ia->ri_id->qp) {
+		rpcrdma_ep_disconnect(ep, ia);
 		rdma_destroy_qp(ia->ri_id);
 		ia->ri_id->qp = NULL;
 	}
@@ -659,7 +639,6 @@ retry:
 		dprintk("RPC:       %s: reconnecting...\n", __func__);
 
 		rpcrdma_ep_disconnect(ep, ia);
-		rpcrdma_flush_cqs(ep);
 
 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 		id = rpcrdma_create_id(xprt, ia,
@@ -692,10 +671,8 @@ retry:
 			goto out;
 		}
 
-		write_lock(&ia->ri_qplock);
 		old = ia->ri_id;
 		ia->ri_id = id;
-		write_unlock(&ia->ri_qplock);
 
 		rdma_destroy_qp(old);
 		rpcrdma_destroy_id(old);
@@ -785,7 +762,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
 	int rc;
 
-	rpcrdma_flush_cqs(ep);
 	rc = rdma_disconnect(ia->ri_id);
 	if (!rc) {
 		/* returns without wait if not connected */
@@ -797,6 +773,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
 		ep->rep_connected = rc;
 	}
+
+	ib_drain_qp(ia->ri_id->qp);
 }
 
 struct rpcrdma_req *
@@ -1271,25 +1249,3 @@ out_rc:
 	rpcrdma_recv_buffer_put(rep);
 	return rc;
 }
-
-/* How many chunk list items fit within our inline buffers?
- */
-unsigned int
-rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
-{
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	int bytes, segments;
-
-	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
-	bytes -= RPCRDMA_HDRLEN_MIN;
-	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
-		pr_warn("RPC:       %s: inline threshold too small\n",
-			__func__);
-		return 0;
-	}
-
-	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
-	dprintk("RPC:       %s: max chunk list size = %d segments\n",
-		__func__, segments);
-	return segments;
-}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2ebc743cb96f..95cdc66225ee 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -65,7 +65,6 @@
  */
 struct rpcrdma_ia {
 	const struct rpcrdma_memreg_ops	*ri_ops;
-	rwlock_t		ri_qplock;
 	struct ib_device	*ri_device;
 	struct rdma_cm_id 	*ri_id;
 	struct ib_pd		*ri_pd;
@@ -73,6 +72,8 @@ struct rpcrdma_ia {
 	struct completion	ri_done;
 	int			ri_async_rc;
 	unsigned int		ri_max_frmr_depth;
+	unsigned int		ri_max_inline_write;
+	unsigned int		ri_max_inline_read;
 	struct ib_qp_attr	ri_qp_attr;
 	struct ib_qp_init_attr	ri_qp_init_attr;
 };
@@ -144,6 +145,26 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
 
 #define RPCRDMA_DEF_GFP		(GFP_NOIO | __GFP_NOWARN)
 
+/* To ensure a transport can always make forward progress,
+ * the number of RDMA segments allowed in header chunk lists
+ * is capped at 8. This prevents less-capable devices and
+ * memory registrations from overrunning the Send buffer
+ * while building chunk lists.
+ *
+ * Elements of the Read list take up more room than the
+ * Write list or Reply chunk. 8 read segments means the Read
+ * list (or Write list or Reply chunk) cannot consume more
+ * than
+ *
+ * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
+ *
+ * And the fixed part of the header is another 24 bytes.
+ *
+ * The smallest inline threshold is 1024 bytes, ensuring that
+ * at least 750 bytes are available for RPC messages.
+ */
+#define RPCRDMA_MAX_HDR_SEGS	(8)
+
 /*
  * struct rpcrdma_rep -- this structure encapsulates state required to recv
  * and complete a reply, asychronously. It needs several pieces of
@@ -162,7 +183,9 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
  */
 
 #define RPCRDMA_MAX_DATA_SEGS	((1 * 1024 * 1024) / PAGE_SIZE)
-#define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
+
+/* data segments + head/tail for Call + head/tail for Reply */
+#define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 4)
 
 struct rpcrdma_buffer;
 
@@ -198,14 +221,13 @@ enum rpcrdma_frmr_state {
 };
 
 struct rpcrdma_frmr {
-	struct scatterlist		*sg;
-	int				sg_nents;
+	struct scatterlist		*fr_sg;
+	int				fr_nents;
+	enum dma_data_direction		fr_dir;
 	struct ib_mr			*fr_mr;
 	struct ib_cqe			fr_cqe;
 	enum rpcrdma_frmr_state		fr_state;
 	struct completion		fr_linv_done;
-	struct work_struct		fr_work;
-	struct rpcrdma_xprt		*fr_xprt;
 	union {
 		struct ib_reg_wr	fr_regwr;
 		struct ib_send_wr	fr_invwr;
@@ -222,6 +244,8 @@ struct rpcrdma_mw {
 		struct rpcrdma_fmr	fmr;
 		struct rpcrdma_frmr	frmr;
 	};
+	struct work_struct	mw_work;
+	struct rpcrdma_xprt	*mw_xprt;
 	struct list_head	mw_list;
 	struct list_head	mw_all;
 };
@@ -270,12 +294,14 @@ struct rpcrdma_req {
 	unsigned int		rl_niovs;
 	unsigned int		rl_nchunks;
 	unsigned int		rl_connect_cookie;
+	struct rpc_task		*rl_task;
 	struct rpcrdma_buffer	*rl_buffer;
 	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
 	struct ib_sge		rl_send_iov[RPCRDMA_MAX_IOVS];
 	struct rpcrdma_regbuf	*rl_rdmabuf;
 	struct rpcrdma_regbuf	*rl_sendbuf;
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
+	struct rpcrdma_mr_seg	*rl_nextseg;
 
 	struct ib_cqe		rl_cqe;
 	struct list_head	rl_all;
@@ -372,8 +398,8 @@ struct rpcrdma_memreg_ops {
 				  struct rpcrdma_mr_seg *, int, bool);
 	void		(*ro_unmap_sync)(struct rpcrdma_xprt *,
 					 struct rpcrdma_req *);
-	int		(*ro_unmap)(struct rpcrdma_xprt *,
-				    struct rpcrdma_mr_seg *);
+	void		(*ro_unmap_safe)(struct rpcrdma_xprt *,
+					 struct rpcrdma_req *, bool);
 	int		(*ro_open)(struct rpcrdma_ia *,
 				   struct rpcrdma_ep *,
 				   struct rpcrdma_create_data_internal *);
@@ -456,7 +482,6 @@ struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
 void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 			 struct rpcrdma_regbuf *);
 
-unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
 int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
 int frwr_alloc_recovery_wq(void);
@@ -519,6 +544,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
  */
 int rpcrdma_marshal_req(struct rpc_rqst *);
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
+				  struct rpcrdma_create_data_internal *,
+				  unsigned int);
 
 /* RPC/RDMA module init - xprtrdma/transport.c
  */
@@ -534,6 +562,7 @@ void xprt_rdma_cleanup(void);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
 int xprt_rdma_bc_up(struct svc_serv *, struct net *);
+size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
 int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
 int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index b90c5397b5e1..2d3e0c42361e 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1364,6 +1364,11 @@ static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
 		return ret;
 	return 0;
 }
+
+static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
+{
+	return PAGE_SIZE;
+}
 #else
 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 					struct xdr_skb_reader *desc)
@@ -2661,6 +2666,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
 #ifdef CONFIG_SUNRPC_BACKCHANNEL
 	.bc_setup		= xprt_setup_bc,
 	.bc_up			= xs_tcp_bc_up,
+	.bc_maxpayload		= xs_tcp_bc_maxpayload,
 	.bc_free_rqst		= xprt_free_bc_rqst,
 	.bc_destroy		= xprt_destroy_bc,
 #endif