summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-10-13 21:28:20 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2016-10-13 21:28:20 -0700
commitc4a86165d1ec70c8e592fa7b7cc7070971533021 (patch)
tree704b2071d0a90df92a9e08d3e6a219196c3f0116 /net
parent2778556474b1996aa68ae61619386b8802733bd8 (diff)
parent3f807e5ae5597bd65a6fff684083e8eaa21f3fa7 (diff)
downloadlinux-c4a86165d1ec70c8e592fa7b7cc7070971533021.tar.gz
Merge tag 'nfs-for-4.9-1' of git://git.linux-nfs.org/projects/anna/linux-nfs
Pull NFS client updates from Anna Schumaker:
 "Highlights include:

  Stable bugfixes:
   - sunrpc: fix writ espace race causing stalls
   - NFS: Fix inode corruption in nfs_prime_dcache()
   - NFSv4: Don't report revoked delegations as valid in nfs_have_delegation()
   - NFSv4: nfs4_copy_delegation_stateid() must fail if the delegation is invalid
   - NFSv4: Open state recovery must account for file permission changes
   - NFSv4.2: Fix a reference leak in nfs42_proc_layoutstats_generic

  Features:
   - Add support for tracking multiple layout types with an ordered list
   - Add support for using multiple backchannel threads on the client
   - Add support for pNFS file layout session trunking
   - Delay xprtrdma use of DMA API (for device driver removal)
   - Add support for xprtrdma remote invalidation
   - Add support for larger xprtrdma inline thresholds
   - Use a scatter/gather list for sending xprtrdma RPC calls
   - Add support for the CB_NOTIFY_LOCK callback
   - Improve hashing sunrpc auth_creds by using both uid and gid

  Bugfixes:
   - Fix xprtrdma use of DMA API
   - Validate filenames before adding to the dcache
   - Fix corruption of xdr->nwords in xdr_copy_to_scratch
   - Fix setting buffer length in xdr_set_next_buffer()
   - Don't deadlock the state manager on the SEQUENCE status flags
   - Various delegation and stateid related fixes
   - Retry operations if an interrupted slot receives EREMOTEIO
   - Make nfs boot time y2038 safe"

* tag 'nfs-for-4.9-1' of git://git.linux-nfs.org/projects/anna/linux-nfs: (100 commits)
  NFSv4.2: Fix a reference leak in nfs42_proc_layoutstats_generic
  fs: nfs: Make nfs boot time y2038 safe
  sunrpc: replace generic auth_cred hash with auth-specific function
  sunrpc: add RPCSEC_GSS hash_cred() function
  sunrpc: add auth_unix hash_cred() function
  sunrpc: add generic_auth hash_cred() function
  sunrpc: add hash_cred() function to rpc_authops struct
  Retry operation on EREMOTEIO on an interrupted slot
  pNFS: Fix atime updates on pNFS clients
  sunrpc: queue work on system_power_efficient_wq
  NFSv4.1: Even if the stateid is OK, we may need to recover the open modes
  NFSv4: If recovery failed for a specific open stateid, then don't retry
  NFSv4: Fix retry issues with nfs41_test/free_stateid
  NFSv4: Open state recovery must account for file permission changes
  NFSv4: Mark the lock and open stateids as invalid after freeing them
  NFSv4: Don't test open_stateid unless it is set
  NFSv4: nfs4_do_handle_exception() handle revoke/expiry of a single stateid
  NFS: Always call nfs_inode_find_state_and_recover() when revoking a delegation
  NFSv4: Fix a race when updating an open_stateid
  NFSv4: Fix a race in nfs_inode_reclaim_delegation()
  ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth.c2
-rw-r--r--net/sunrpc/auth_generic.c9
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c7
-rw-r--r--net/sunrpc/auth_unix.c9
-rw-r--r--net/sunrpc/backchannel_rqst.c8
-rw-r--r--net/sunrpc/cache.c5
-rw-r--r--net/sunrpc/clnt.c132
-rw-r--r--net/sunrpc/sched.c35
-rw-r--r--net/sunrpc/svc.c17
-rw-r--r--net/sunrpc/xdr.c11
-rw-r--r--net/sunrpc/xprt.c2
-rw-r--r--net/sunrpc/xprtmultipath.c24
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c53
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c7
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c28
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c323
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c19
-rw-r--r--net/sunrpc/xprtrdma/transport.c202
-rw-r--r--net/sunrpc/xprtrdma/verbs.c237
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h108
-rw-r--r--net/sunrpc/xprtsock.c34
21 files changed, 794 insertions, 478 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index a7e42f9a405c..2bff63a73cf8 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -551,7 +551,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
 			*entry, *new;
 	unsigned int nr;
 
-	nr = hash_long(from_kuid(&init_user_ns, acred->uid), cache->hashbits);
+	nr = auth->au_ops->hash_cred(acred, cache->hashbits);
 
 	rcu_read_lock();
 	hlist_for_each_entry_rcu(entry, &cache->hashtable[nr], cr_hash) {
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 83dffeadf20a..f1df9837f1ac 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -78,6 +78,14 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task,
 	return auth->au_ops->lookup_cred(auth, acred, lookupflags);
 }
 
+static int
+generic_hash_cred(struct auth_cred *acred, unsigned int hashbits)
+{
+	return hash_64(from_kgid(&init_user_ns, acred->gid) |
+		((u64)from_kuid(&init_user_ns, acred->uid) <<
+			(sizeof(gid_t) * 8)), hashbits);
+}
+
 /*
  * Lookup generic creds for current process
  */
@@ -258,6 +266,7 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred)
 static const struct rpc_authops generic_auth_ops = {
 	.owner = THIS_MODULE,
 	.au_name = "Generic",
+	.hash_cred = generic_hash_cred,
 	.lookup_cred = generic_lookup_cred,
 	.crcreate = generic_create_cred,
 	.key_timeout = generic_key_timeout,
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 976c7812bbd5..d8bd97a5a7c9 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1298,6 +1298,12 @@ gss_destroy_cred(struct rpc_cred *cred)
 	gss_destroy_nullcred(cred);
 }
 
+static int
+gss_hash_cred(struct auth_cred *acred, unsigned int hashbits)
+{
+	return hash_64(from_kuid(&init_user_ns, acred->uid), hashbits);
+}
+
 /*
  * Lookup RPCSEC_GSS cred for the current process
  */
@@ -1982,6 +1988,7 @@ static const struct rpc_authops authgss_ops = {
 	.au_name	= "RPCSEC_GSS",
 	.create		= gss_create,
 	.destroy	= gss_destroy,
+	.hash_cred	= gss_hash_cred,
 	.lookup_cred	= gss_lookup_cred,
 	.crcreate	= gss_create_cred,
 	.list_pseudoflavors = gss_mech_list_pseudoflavors,
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index a1d768a973f5..306fc0f54596 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -46,6 +46,14 @@ unx_destroy(struct rpc_auth *auth)
 	rpcauth_clear_credcache(auth->au_credcache);
 }
 
+static int
+unx_hash_cred(struct auth_cred *acred, unsigned int hashbits)
+{
+	return hash_64(from_kgid(&init_user_ns, acred->gid) |
+		((u64)from_kuid(&init_user_ns, acred->uid) <<
+			(sizeof(gid_t) * 8)), hashbits);
+}
+
 /*
  * Lookup AUTH_UNIX creds for current process
  */
@@ -220,6 +228,7 @@ const struct rpc_authops authunix_ops = {
 	.au_name	= "UNIX",
 	.create		= unx_create,
 	.destroy	= unx_destroy,
+	.hash_cred	= unx_hash_cred,
 	.lookup_cred	= unx_lookup_cred,
 	.crcreate	= unx_create_cred,
 };
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 229956bf8457..ac701c28f44f 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -76,13 +76,7 @@ static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
 	page = alloc_page(gfp_flags);
 	if (page == NULL)
 		return -ENOMEM;
-	buf->head[0].iov_base = page_address(page);
-	buf->head[0].iov_len = PAGE_SIZE;
-	buf->tail[0].iov_base = NULL;
-	buf->tail[0].iov_len = 0;
-	buf->page_len = 0;
-	buf->len = 0;
-	buf->buflen = PAGE_SIZE;
+	xdr_buf_init(buf, page_address(page), PAGE_SIZE);
 	return 0;
 }
 
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 4d8e11f94a35..8aabe12201f8 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -353,7 +353,7 @@ void sunrpc_init_cache_detail(struct cache_detail *cd)
 	spin_unlock(&cache_list_lock);
 
 	/* start the cleaning process */
-	schedule_delayed_work(&cache_cleaner, 0);
+	queue_delayed_work(system_power_efficient_wq, &cache_cleaner, 0);
 }
 EXPORT_SYMBOL_GPL(sunrpc_init_cache_detail);
 
@@ -476,7 +476,8 @@ static void do_cache_clean(struct work_struct *work)
 		delay = 0;
 
 	if (delay)
-		schedule_delayed_work(&cache_cleaner, delay);
+		queue_delayed_work(system_power_efficient_wq,
+				   &cache_cleaner, delay);
 }
 
 
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 66f23b376fa0..34dd7b26ee5f 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -184,7 +184,6 @@ static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 				   struct super_block *sb)
 {
 	struct dentry *dentry;
-	int err = 0;
 
 	switch (event) {
 	case RPC_PIPEFS_MOUNT:
@@ -201,7 +200,7 @@ static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
 		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
 		return -ENOTSUPP;
 	}
-	return err;
+	return 0;
 }
 
 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
@@ -988,7 +987,6 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
 {
 
 	if (clnt != NULL) {
-		rpc_task_release_client(task);
 		if (task->tk_xprt == NULL)
 			task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi);
 		task->tk_client = clnt;
@@ -1693,6 +1691,7 @@ call_allocate(struct rpc_task *task)
 	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
 	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
+	int status;
 
 	dprint_status(task);
 
@@ -1718,11 +1717,14 @@ call_allocate(struct rpc_task *task)
 	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
 	req->rq_rcvsize <<= 2;
 
-	req->rq_buffer = xprt->ops->buf_alloc(task,
-					req->rq_callsize + req->rq_rcvsize);
-	if (req->rq_buffer != NULL)
-		return;
+	status = xprt->ops->buf_alloc(task);
 	xprt_inject_disconnect(xprt);
+	if (status == 0)
+		return;
+	if (status != -ENOMEM) {
+		rpc_exit(task, status);
+		return;
+	}
 
 	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
 
@@ -1748,18 +1750,6 @@ rpc_task_force_reencode(struct rpc_task *task)
 	task->tk_rqstp->rq_bytes_sent = 0;
 }
 
-static inline void
-rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
-{
-	buf->head[0].iov_base = start;
-	buf->head[0].iov_len = len;
-	buf->tail[0].iov_len = 0;
-	buf->page_len = 0;
-	buf->flags = 0;
-	buf->len = 0;
-	buf->buflen = len;
-}
-
 /*
  * 3.	Encode arguments of an RPC call
  */
@@ -1772,12 +1762,12 @@ rpc_xdr_encode(struct rpc_task *task)
 
 	dprint_status(task);
 
-	rpc_xdr_buf_init(&req->rq_snd_buf,
-			 req->rq_buffer,
-			 req->rq_callsize);
-	rpc_xdr_buf_init(&req->rq_rcv_buf,
-			 (char *)req->rq_buffer + req->rq_callsize,
-			 req->rq_rcvsize);
+	xdr_buf_init(&req->rq_snd_buf,
+		     req->rq_buffer,
+		     req->rq_callsize);
+	xdr_buf_init(&req->rq_rcv_buf,
+		     req->rq_rbuffer,
+		     req->rq_rcvsize);
 
 	p = rpc_encode_header(task);
 	if (p == NULL) {
@@ -2616,6 +2606,70 @@ int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
 
 /**
+ * rpc_clnt_setup_test_and_add_xprt()
+ *
+ * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
+ *   1) caller of the test function must dereference the rpc_xprt_switch
+ *   and the rpc_xprt.
+ *   2) test function must call rpc_xprt_switch_add_xprt, usually in
+ *   the rpc_call_done routine.
+ *
+ * Upon success (return of 1), the test function adds the new
+ * transport to the rpc_clnt xprt switch
+ *
+ * @clnt: struct rpc_clnt to get the new transport
+ * @xps:  the rpc_xprt_switch to hold the new transport
+ * @xprt: the rpc_xprt to test
+ * @data: a struct rpc_add_xprt_test pointer that holds the test function
+ *        and test function call data
+ */
+int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
+				     struct rpc_xprt_switch *xps,
+				     struct rpc_xprt *xprt,
+				     void *data)
+{
+	struct rpc_cred *cred;
+	struct rpc_task *task;
+	struct rpc_add_xprt_test *xtest = (struct rpc_add_xprt_test *)data;
+	int status = -EADDRINUSE;
+
+	xprt = xprt_get(xprt);
+	xprt_switch_get(xps);
+
+	if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
+		goto out_err;
+
+	/* Test the connection */
+	cred = authnull_ops.lookup_cred(NULL, NULL, 0);
+	task = rpc_call_null_helper(clnt, xprt, cred,
+				    RPC_TASK_SOFT | RPC_TASK_SOFTCONN,
+				    NULL, NULL);
+	put_rpccred(cred);
+	if (IS_ERR(task)) {
+		status = PTR_ERR(task);
+		goto out_err;
+	}
+	status = task->tk_status;
+	rpc_put_task(task);
+
+	if (status < 0)
+		goto out_err;
+
+	/* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
+	xtest->add_xprt_test(clnt, xprt, xtest->data);
+
+	/* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
+	return 1;
+out_err:
+	xprt_put(xprt);
+	xprt_switch_put(xps);
+	pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not added\n",
+		status, xprt->address_strings[RPC_DISPLAY_ADDR]);
+	return status;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
+
+/**
  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
  * @clnt: pointer to struct rpc_clnt
  * @xprtargs: pointer to struct xprt_create
@@ -2697,6 +2751,34 @@ rpc_cap_max_reconnect_timeout(struct rpc_clnt *clnt, unsigned long timeo)
 }
 EXPORT_SYMBOL_GPL(rpc_cap_max_reconnect_timeout);
 
+void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
+{
+	xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
+
+void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
+{
+	rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
+				 xprt);
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
+
+bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
+				   const struct sockaddr *sap)
+{
+	struct rpc_xprt_switch *xps;
+	bool ret;
+
+	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
+
+	rcu_read_lock();
+	ret = rpc_xprt_switch_has_addr(xps, sap);
+	rcu_read_unlock();
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
+
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 static void rpc_show_header(void)
 {
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 9ae588511aaf..5db68b371db2 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -849,14 +849,17 @@ static void rpc_async_schedule(struct work_struct *work)
 }
 
 /**
- * rpc_malloc - allocate an RPC buffer
- * @task: RPC task that will use this buffer
- * @size: requested byte size
+ * rpc_malloc - allocate RPC buffer resources
+ * @task: RPC task
+ *
+ * A single memory region is allocated, which is split between the
+ * RPC call and RPC reply that this task is being used for. When
+ * this RPC is retired, the memory is released by calling rpc_free.
  *
  * To prevent rpciod from hanging, this allocator never sleeps,
- * returning NULL and suppressing warning if the request cannot be serviced
- * immediately.
- * The caller can arrange to sleep in a way that is safe for rpciod.
+ * returning -ENOMEM and suppressing warning if the request cannot
+ * be serviced immediately. The caller can arrange to sleep in a
+ * way that is safe for rpciod.
  *
  * Most requests are 'small' (under 2KiB) and can be serviced from a
  * mempool, ensuring that NFS reads and writes can always proceed,
@@ -865,8 +868,10 @@ static void rpc_async_schedule(struct work_struct *work)
  * In order to avoid memory starvation triggering more writebacks of
  * NFS requests, we avoid using GFP_KERNEL.
  */
-void *rpc_malloc(struct rpc_task *task, size_t size)
+int rpc_malloc(struct rpc_task *task)
 {
+	struct rpc_rqst *rqst = task->tk_rqstp;
+	size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
 	struct rpc_buffer *buf;
 	gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
 
@@ -880,28 +885,28 @@ void *rpc_malloc(struct rpc_task *task, size_t size)
 		buf = kmalloc(size, gfp);
 
 	if (!buf)
-		return NULL;
+		return -ENOMEM;
 
 	buf->len = size;
 	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
 			task->tk_pid, size, buf);
-	return &buf->data;
+	rqst->rq_buffer = buf->data;
+	rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
+	return 0;
 }
 EXPORT_SYMBOL_GPL(rpc_malloc);
 
 /**
- * rpc_free - free buffer allocated via rpc_malloc
- * @buffer: buffer to free
+ * rpc_free - free RPC buffer resources allocated via rpc_malloc
+ * @task: RPC task
  *
  */
-void rpc_free(void *buffer)
+void rpc_free(struct rpc_task *task)
 {
+	void *buffer = task->tk_rqstp->rq_buffer;
 	size_t size;
 	struct rpc_buffer *buf;
 
-	if (!buffer)
-		return;
-
 	buf = container_of(buffer, struct rpc_buffer, data);
 	size = buf->len;
 
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index c5b0cb4f4056..7c8070ec93c8 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -401,6 +401,21 @@ int svc_bind(struct svc_serv *serv, struct net *net)
 }
 EXPORT_SYMBOL_GPL(svc_bind);
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static void
+__svc_init_bc(struct svc_serv *serv)
+{
+	INIT_LIST_HEAD(&serv->sv_cb_list);
+	spin_lock_init(&serv->sv_cb_lock);
+	init_waitqueue_head(&serv->sv_cb_waitq);
+}
+#else
+static void
+__svc_init_bc(struct svc_serv *serv)
+{
+}
+#endif
+
 /*
  * Create an RPC service
  */
@@ -443,6 +458,8 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 	init_timer(&serv->sv_temptimer);
 	spin_lock_init(&serv->sv_lock);
 
+	__svc_init_bc(serv);
+
 	serv->sv_nrpools = npools;
 	serv->sv_pools =
 		kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index c4f3cc0c0775..7f1071e103ca 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -767,7 +767,7 @@ static void xdr_set_next_page(struct xdr_stream *xdr)
 	newbase -= xdr->buf->page_base;
 
 	if (xdr_set_page_base(xdr, newbase, PAGE_SIZE) < 0)
-		xdr_set_iov(xdr, xdr->buf->tail, xdr->buf->len);
+		xdr_set_iov(xdr, xdr->buf->tail, xdr->nwords << 2);
 }
 
 static bool xdr_set_next_buffer(struct xdr_stream *xdr)
@@ -776,7 +776,7 @@ static bool xdr_set_next_buffer(struct xdr_stream *xdr)
 		xdr_set_next_page(xdr);
 	else if (xdr->iov == xdr->buf->head) {
 		if (xdr_set_page_base(xdr, 0, PAGE_SIZE) < 0)
-			xdr_set_iov(xdr, xdr->buf->tail, xdr->buf->len);
+			xdr_set_iov(xdr, xdr->buf->tail, xdr->nwords << 2);
 	}
 	return xdr->p != xdr->end;
 }
@@ -859,12 +859,15 @@ EXPORT_SYMBOL_GPL(xdr_set_scratch_buffer);
 static __be32 *xdr_copy_to_scratch(struct xdr_stream *xdr, size_t nbytes)
 {
 	__be32 *p;
-	void *cpdest = xdr->scratch.iov_base;
+	char *cpdest = xdr->scratch.iov_base;
 	size_t cplen = (char *)xdr->end - (char *)xdr->p;
 
 	if (nbytes > xdr->scratch.iov_len)
 		return NULL;
-	memcpy(cpdest, xdr->p, cplen);
+	p = __xdr_inline_decode(xdr, cplen);
+	if (p == NULL)
+		return NULL;
+	memcpy(cpdest, p, cplen);
 	cpdest += cplen;
 	nbytes -= cplen;
 	if (!xdr_set_next_buffer(xdr))
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ea244b29138b..685e6d225414 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1295,7 +1295,7 @@ void xprt_release(struct rpc_task *task)
 	xprt_schedule_autodisconnect(xprt);
 	spin_unlock_bh(&xprt->transport_lock);
 	if (req->rq_buffer)
-		xprt->ops->buf_free(req->rq_buffer);
+		xprt->ops->buf_free(task);
 	xprt_inject_disconnect(xprt);
 	if (req->rq_cred != NULL)
 		put_rpccred(req->rq_cred);
diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c
index 66c9d63f4797..ae92a9e9ba52 100644
--- a/net/sunrpc/xprtmultipath.c
+++ b/net/sunrpc/xprtmultipath.c
@@ -15,6 +15,7 @@
 #include <asm/cmpxchg.h>
 #include <linux/spinlock.h>
 #include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/addr.h>
 #include <linux/sunrpc/xprtmultipath.h>
 
 typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
@@ -49,7 +50,8 @@ void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
 	if (xprt == NULL)
 		return;
 	spin_lock(&xps->xps_lock);
-	if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
+	if ((xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) &&
+	    !rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
 		xprt_switch_add_xprt_locked(xps, xprt);
 	spin_unlock(&xps->xps_lock);
 }
@@ -232,6 +234,26 @@ struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
 	return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
 }
 
+bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps,
+			      const struct sockaddr *sap)
+{
+	struct list_head *head;
+	struct rpc_xprt *pos;
+
+	if (xps == NULL || sap == NULL)
+		return false;
+
+	head = &xps->xps_xprt_list;
+	list_for_each_entry_rcu(pos, head, xprt_switch) {
+		if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) {
+			pr_info("RPC:   addr %s already in xprt switch\n",
+				pos->address_strings[RPC_DISPLAY_ADDR]);
+			return true;
+		}
+	}
+	return false;
+}
+
 static
 struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
 		const struct rpc_xprt *cur)
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 87762d976b63..2c472e1b4827 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -27,7 +27,7 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
 	list_del(&req->rl_all);
 	spin_unlock(&buf->rb_reqslock);
 
-	rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+	rpcrdma_destroy_req(req);
 
 	kfree(rqst);
 }
@@ -35,10 +35,8 @@ static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
 static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
 				 struct rpc_rqst *rqst)
 {
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_regbuf *rb;
 	struct rpcrdma_req *req;
-	struct xdr_buf *buf;
 	size_t size;
 
 	req = rpcrdma_create_req(r_xprt);
@@ -46,30 +44,19 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
 		return PTR_ERR(req);
 	req->rl_backchannel = true;
 
-	size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
-	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
+				  DMA_TO_DEVICE, GFP_KERNEL);
 	if (IS_ERR(rb))
 		goto out_fail;
 	req->rl_rdmabuf = rb;
 
-	size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
-	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	size = r_xprt->rx_data.inline_rsize;
+	rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
 	if (IS_ERR(rb))
 		goto out_fail;
-	rb->rg_owner = req;
 	req->rl_sendbuf = rb;
-	/* so that rpcr_to_rdmar works when receiving a request */
-	rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
-
-	buf = &rqst->rq_snd_buf;
-	buf->head[0].iov_base = rqst->rq_buffer;
-	buf->head[0].iov_len = 0;
-	buf->tail[0].iov_base = NULL;
-	buf->tail[0].iov_len = 0;
-	buf->page_len = 0;
-	buf->len = 0;
-	buf->buflen = size;
-
+	xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size);
+	rpcrdma_set_xprtdata(rqst, req);
 	return 0;
 
 out_fail:
@@ -219,7 +206,6 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_msg *headerp;
-	size_t rpclen;
 
 	headerp = rdmab_to_msg(req->rl_rdmabuf);
 	headerp->rm_xid = rqst->rq_xid;
@@ -231,26 +217,9 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 	headerp->rm_body.rm_chunks[1] = xdr_zero;
 	headerp->rm_body.rm_chunks[2] = xdr_zero;
 
-	rpclen = rqst->rq_svec[0].iov_len;
-
-#ifdef RPCRDMA_BACKCHANNEL_DEBUG
-	pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
-		__func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
-	pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
-		__func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
-	pr_info("RPC:       %s:      RPC: %*ph\n",
-		__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
-#endif
-
-	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
-	req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
-	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
-	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
-	req->rl_send_iov[1].length = rpclen;
-	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
-
-	req->rl_niovs = 2;
+	if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
+				       &rqst->rq_snd_buf, rpcrdma_noch))
+		return -EIO;
 	return 0;
 }
 
@@ -402,7 +371,7 @@ out_overflow:
 out_short:
 	pr_warn("RPC/RDMA short backward direction call\n");
 
-	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
 		xprt_disconnect_done(xprt);
 	else
 		pr_warn("RPC:       %s: reposting rep %p\n",
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 21cb3b150b37..1ebb09e1ac4f 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -160,9 +160,8 @@ static int
 fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 	    struct rpcrdma_create_data_internal *cdata)
 {
-	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
-						      RPCRDMA_MAX_DATA_SEGS /
-						      RPCRDMA_MAX_FMR_SGES));
+	ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
+				RPCRDMA_MAX_FMR_SGES);
 	return 0;
 }
 
@@ -274,6 +273,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 */
 	list_for_each_entry(mw, &req->rl_registered, mw_list)
 		list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
+	r_xprt->rx_stats.local_inv_needed++;
 	rc = ib_unmap_fmr(&unmap_list);
 	if (rc)
 		goto out_reset;
@@ -331,4 +331,5 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
 	.ro_init_mr			= fmr_op_init_mr,
 	.ro_release_mr			= fmr_op_release_mr,
 	.ro_displayname			= "fmr",
+	.ro_send_w_inv_ok		= 0,
 };
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 892b5e1d9b09..210949562786 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -67,6 +67,8 @@
  * pending send queue WRs before the transport is reconnected.
  */
 
+#include <linux/sunrpc/rpc_rdma.h>
+
 #include "xprt_rdma.h"
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -161,7 +163,7 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 		return PTR_ERR(f->fr_mr);
 	}
 
-	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
+	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, f);
 	f->fr_state = FRMR_IS_INVALID;
 	return 0;
 }
@@ -242,9 +244,8 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 					       depth;
 	}
 
-	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
-						      RPCRDMA_MAX_DATA_SEGS /
-						      ia->ri_max_frmr_depth));
+	ia->ri_max_segs = max_t(unsigned int, 1, RPCRDMA_MAX_DATA_SEGS /
+				ia->ri_max_frmr_depth);
 	return 0;
 }
 
@@ -329,7 +330,7 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 	frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
 	if (wc->status != IB_WC_SUCCESS)
 		__frwr_sendcompletion_flush(wc, frmr, "localinv");
-	complete_all(&frmr->fr_linv_done);
+	complete(&frmr->fr_linv_done);
 }
 
 /* Post a REG_MR Work Request to register a memory region
@@ -396,7 +397,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 		goto out_mapmr_err;
 
 	dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
-		__func__, mw, mw->mw_nents, mr->length);
+		__func__, frmr, mw->mw_nents, mr->length);
 
 	key = (u8)(mr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(mr, ++key);
@@ -449,6 +450,8 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
 	struct rpcrdma_frmr *f = &mw->frmr;
 	struct ib_send_wr *invalidate_wr;
 
+	dprintk("RPC:       %s: invalidating frmr %p\n", __func__, f);
+
 	f->fr_state = FRMR_IS_INVALID;
 	invalidate_wr = &f->fr_invwr;
 
@@ -472,6 +475,7 @@ static void
 frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
 	struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
+	struct rpcrdma_rep *rep = req->rl_reply;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_mw *mw, *tmp;
 	struct rpcrdma_frmr *f;
@@ -487,6 +491,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	f = NULL;
 	invalidate_wrs = pos = prev = NULL;
 	list_for_each_entry(mw, &req->rl_registered, mw_list) {
+		if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
+		    (mw->mw_handle == rep->rr_inv_rkey)) {
+			mw->frmr.fr_state = FRMR_IS_INVALID;
+			continue;
+		}
+
 		pos = __frwr_prepare_linv_wr(mw);
 
 		if (!invalidate_wrs)
@@ -496,6 +506,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 		prev = pos;
 		f = &mw->frmr;
 	}
+	if (!f)
+		goto unmap;
 
 	/* Strong send queue ordering guarantees that when the
 	 * last WR in the chain completes, all WRs in the chain
@@ -510,6 +522,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * replaces the QP. The RPC reply handler won't call us
 	 * unless ri_id->qp is a valid pointer.
 	 */
+	r_xprt->rx_stats.local_inv_needed++;
 	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
 	if (rc)
 		goto reset_mrs;
@@ -521,6 +534,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 */
 unmap:
 	list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
+		dprintk("RPC:       %s: unmapping frmr %p\n",
+			__func__, &mw->frmr);
 		list_del_init(&mw->mw_list);
 		ib_dma_unmap_sg(ia->ri_device,
 				mw->mw_sg, mw->mw_nents, mw->mw_dir);
@@ -576,4 +591,5 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
 	.ro_init_mr			= frwr_op_init_mr,
 	.ro_release_mr			= frwr_op_release_mr,
 	.ro_displayname			= "frwr",
+	.ro_send_w_inv_ok		= RPCRDMA_CMP_F_SND_W_INV_OK,
 };
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index a47f170b20ef..d987c2d3dd6e 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -53,14 +53,6 @@
 # define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-enum rpcrdma_chunktype {
-	rpcrdma_noch = 0,
-	rpcrdma_readch,
-	rpcrdma_areadch,
-	rpcrdma_writech,
-	rpcrdma_replych
-};
-
 static const char transfertypes[][12] = {
 	"inline",	/* no chunks */
 	"read list",	/* some argument via rdma read */
@@ -118,10 +110,12 @@ static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
 	return size;
 }
 
-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
-				  struct rpcrdma_create_data_internal *cdata,
-				  unsigned int maxsegs)
+void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
 {
+	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	unsigned int maxsegs = ia->ri_max_segs;
+
 	ia->ri_max_inline_write = cdata->inline_wsize -
 				  rpcrdma_max_call_header_size(maxsegs);
 	ia->ri_max_inline_read = cdata->inline_rsize -
@@ -155,42 +149,6 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
 }
 
-static int
-rpcrdma_tail_pullup(struct xdr_buf *buf)
-{
-	size_t tlen = buf->tail[0].iov_len;
-	size_t skip = tlen & 3;
-
-	/* Do not include the tail if it is only an XDR pad */
-	if (tlen < 4)
-		return 0;
-
-	/* xdr_write_pages() adds a pad at the beginning of the tail
-	 * if the content in "buf->pages" is unaligned. Force the
-	 * tail's actual content to land at the next XDR position
-	 * after the head instead.
-	 */
-	if (skip) {
-		unsigned char *src, *dst;
-		unsigned int count;
-
-		src = buf->tail[0].iov_base;
-		dst = buf->head[0].iov_base;
-		dst += buf->head[0].iov_len;
-
-		src += skip;
-		tlen -= skip;
-
-		dprintk("RPC:       %s: skip=%zu, memmove(%p, %p, %zu)\n",
-			__func__, skip, dst, src, tlen);
-
-		for (count = tlen; count; count--)
-			*dst++ = *src++;
-	}
-
-	return tlen;
-}
-
 /* Split "vec" on page boundaries into segments. FMR registers pages,
  * not a byte range. Other modes coalesce these segments into a single
  * MR when they can.
@@ -229,7 +187,8 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n)
 
 static int
 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
-	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg)
+	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg,
+	bool reminv_expected)
 {
 	int len, n, p, page_base;
 	struct page **ppages;
@@ -271,6 +230,13 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 	if (type == rpcrdma_readch)
 		return n;
 
+	/* When encoding the Write list, some servers need to see an extra
+	 * segment for odd-length Write chunks. The upper layer provides
+	 * space in the tail iovec for this purpose.
+	 */
+	if (type == rpcrdma_writech && reminv_expected)
+		return n;
+
 	if (xdrbuf->tail[0].iov_len) {
 		/* the rpcrdma protocol allows us to omit any trailing
 		 * xdr pad bytes, saving the server an RDMA operation. */
@@ -327,7 +293,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
 	if (rtype == rpcrdma_areadch)
 		pos = 0;
 	seg = req->rl_segments;
-	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg);
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, false);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
@@ -391,7 +357,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
 	seg = req->rl_segments;
 	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
 				     rqst->rq_rcv_buf.head[0].iov_len,
-				     wtype, seg);
+				     wtype, seg,
+				     r_xprt->rx_ia.ri_reminv_expected);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
@@ -456,7 +423,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 	}
 
 	seg = req->rl_segments;
-	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg);
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+				     r_xprt->rx_ia.ri_reminv_expected);
 	if (nsegs < 0)
 		return ERR_PTR(nsegs);
 
@@ -491,74 +459,184 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
 	return iptr;
 }
 
-/*
- * Copy write data inline.
- * This function is used for "small" requests. Data which is passed
- * to RPC via iovecs (or page list) is copied directly into the
- * pre-registered memory buffer for this request. For small amounts
- * of data, this is efficient. The cutoff value is tunable.
+/* Prepare the RPC-over-RDMA header SGE.
  */
-static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
+static bool
+rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+			u32 len)
 {
-	int i, npages, curlen;
-	int copy_len;
-	unsigned char *srcp, *destp;
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-	int page_base;
-	struct page **ppages;
+	struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
+	struct ib_sge *sge = &req->rl_send_sge[0];
+
+	if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
+		if (!__rpcrdma_dma_map_regbuf(ia, rb))
+			return false;
+		sge->addr = rdmab_addr(rb);
+		sge->lkey = rdmab_lkey(rb);
+	}
+	sge->length = len;
+
+	ib_dma_sync_single_for_device(ia->ri_device, sge->addr,
+				      sge->length, DMA_TO_DEVICE);
+	req->rl_send_wr.num_sge++;
+	return true;
+}
 
-	destp = rqst->rq_svec[0].iov_base;
-	curlen = rqst->rq_svec[0].iov_len;
-	destp += curlen;
+/* Prepare the Send SGEs. The head and tail iovec, and each entry
+ * in the page list, gets its own SGE.
+ */
+static bool
+rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+			 struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+{
+	unsigned int sge_no, page_base, len, remaining;
+	struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+	struct ib_device *device = ia->ri_device;
+	struct ib_sge *sge = req->rl_send_sge;
+	u32 lkey = ia->ri_pd->local_dma_lkey;
+	struct page *page, **ppages;
+
+	/* The head iovec is straightforward, as it is already
+	 * DMA-mapped. Sync the content that has changed.
+	 */
+	if (!rpcrdma_dma_map_regbuf(ia, rb))
+		return false;
+	sge_no = 1;
+	sge[sge_no].addr = rdmab_addr(rb);
+	sge[sge_no].length = xdr->head[0].iov_len;
+	sge[sge_no].lkey = rdmab_lkey(rb);
+	ib_dma_sync_single_for_device(device, sge[sge_no].addr,
+				      sge[sge_no].length, DMA_TO_DEVICE);
+
+	/* If there is a Read chunk, the page list is being handled
+	 * via explicit RDMA, and thus is skipped here. However, the
+	 * tail iovec may include an XDR pad for the page list, as
+	 * well as additional content, and may not reside in the
+	 * same page as the head iovec.
+	 */
+	if (rtype == rpcrdma_readch) {
+		len = xdr->tail[0].iov_len;
 
-	dprintk("RPC:       %s: destp 0x%p len %d hdrlen %d\n",
-		__func__, destp, rqst->rq_slen, curlen);
+		/* Do not include the tail if it is only an XDR pad */
+		if (len < 4)
+			goto out;
 
-	copy_len = rqst->rq_snd_buf.page_len;
+		page = virt_to_page(xdr->tail[0].iov_base);
+		page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
 
-	if (rqst->rq_snd_buf.tail[0].iov_len) {
-		curlen = rqst->rq_snd_buf.tail[0].iov_len;
-		if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
-			memmove(destp + copy_len,
-				rqst->rq_snd_buf.tail[0].iov_base, curlen);
-			r_xprt->rx_stats.pullup_copy_count += curlen;
+		/* If the content in the page list is an odd length,
+		 * xdr_write_pages() has added a pad at the beginning
+		 * of the tail iovec. Force the tail's non-pad content
+		 * to land at the next XDR position in the Send message.
+		 */
+		page_base += len & 3;
+		len -= len & 3;
+		goto map_tail;
+	}
+
+	/* If there is a page list present, temporarily DMA map
+	 * and prepare an SGE for each page to be sent.
+	 */
+	if (xdr->page_len) {
+		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+		page_base = xdr->page_base & ~PAGE_MASK;
+		remaining = xdr->page_len;
+		while (remaining) {
+			sge_no++;
+			if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
+				goto out_mapping_overflow;
+
+			len = min_t(u32, PAGE_SIZE - page_base, remaining);
+			sge[sge_no].addr = ib_dma_map_page(device, *ppages,
+							   page_base, len,
+							   DMA_TO_DEVICE);
+			if (ib_dma_mapping_error(device, sge[sge_no].addr))
+				goto out_mapping_err;
+			sge[sge_no].length = len;
+			sge[sge_no].lkey = lkey;
+
+			req->rl_mapped_sges++;
+			ppages++;
+			remaining -= len;
+			page_base = 0;
 		}
-		dprintk("RPC:       %s: tail destp 0x%p len %d\n",
-			__func__, destp + copy_len, curlen);
-		rqst->rq_svec[0].iov_len += curlen;
 	}
-	r_xprt->rx_stats.pullup_copy_count += copy_len;
 
-	page_base = rqst->rq_snd_buf.page_base;
-	ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
-	page_base &= ~PAGE_MASK;
-	npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
-	for (i = 0; copy_len && i < npages; i++) {
-		curlen = PAGE_SIZE - page_base;
-		if (curlen > copy_len)
-			curlen = copy_len;
-		dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",
-			__func__, i, destp, copy_len, curlen);
-		srcp = kmap_atomic(ppages[i]);
-		memcpy(destp, srcp+page_base, curlen);
-		kunmap_atomic(srcp);
-		rqst->rq_svec[0].iov_len += curlen;
-		destp += curlen;
-		copy_len -= curlen;
-		page_base = 0;
+	/* The tail iovec is not always constructed in the same
+	 * page where the head iovec resides (see, for example,
+	 * gss_wrap_req_priv). To neatly accommodate that case,
+	 * DMA map it separately.
+	 */
+	if (xdr->tail[0].iov_len) {
+		page = virt_to_page(xdr->tail[0].iov_base);
+		page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+		len = xdr->tail[0].iov_len;
+
+map_tail:
+		sge_no++;
+		sge[sge_no].addr = ib_dma_map_page(device, page,
+						   page_base, len,
+						   DMA_TO_DEVICE);
+		if (ib_dma_mapping_error(device, sge[sge_no].addr))
+			goto out_mapping_err;
+		sge[sge_no].length = len;
+		sge[sge_no].lkey = lkey;
+		req->rl_mapped_sges++;
 	}
-	/* header now contains entire send message */
+
+out:
+	req->rl_send_wr.num_sge = sge_no + 1;
+	return true;
+
+out_mapping_overflow:
+	pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
+	return false;
+
+out_mapping_err:
+	pr_err("rpcrdma: Send mapping error\n");
+	return false;
+}
+
+bool
+rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+			  u32 hdrlen, struct xdr_buf *xdr,
+			  enum rpcrdma_chunktype rtype)
+{
+	req->rl_send_wr.num_sge = 0;
+	req->rl_mapped_sges = 0;
+
+	if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
+		goto out_map;
+
+	if (rtype != rpcrdma_areadch)
+		if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
+			goto out_map;
+
+	return true;
+
+out_map:
+	pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+	return false;
+}
+
+void
+rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+	struct ib_device *device = ia->ri_device;
+	struct ib_sge *sge;
+	int count;
+
+	sge = &req->rl_send_sge[2];
+	for (count = req->rl_mapped_sges; count--; sge++)
+		ib_dma_unmap_page(device, sge->addr, sge->length,
+				  DMA_TO_DEVICE);
+	req->rl_mapped_sges = 0;
 }
 
 /*
  * Marshal a request: the primary job of this routine is to choose
  * the transfer modes. See comments below.
  *
- * Prepares up to two IOVs per Call message:
- *
- *  [0] -- RPC RDMA header
- *  [1] -- the RPC header/data
- *
  * Returns zero on success, otherwise a negative errno.
  */
 
@@ -626,12 +704,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 */
 	if (rpcrdma_args_inline(r_xprt, rqst)) {
 		rtype = rpcrdma_noch;
-		rpcrdma_inline_pullup(rqst);
-		rpclen = rqst->rq_svec[0].iov_len;
+		rpclen = rqst->rq_snd_buf.len;
 	} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
 		rtype = rpcrdma_readch;
-		rpclen = rqst->rq_svec[0].iov_len;
-		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
+		rpclen = rqst->rq_snd_buf.head[0].iov_len +
+			 rqst->rq_snd_buf.tail[0].iov_len;
 	} else {
 		r_xprt->rx_stats.nomsg_call_count++;
 		headerp->rm_type = htonl(RDMA_NOMSG);
@@ -673,34 +750,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 		goto out_unmap;
 	hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
 
-	if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
-		goto out_overflow;
-
 	dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
 		rqst->rq_task->tk_pid, __func__,
 		transfertypes[rtype], transfertypes[wtype],
 		hdrlen, rpclen);
 
-	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
-	req->rl_send_iov[0].length = hdrlen;
-	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
-	req->rl_niovs = 1;
-	if (rtype == rpcrdma_areadch)
-		return 0;
-
-	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
-	req->rl_send_iov[1].length = rpclen;
-	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
-
-	req->rl_niovs = 2;
+	if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
+				       &rqst->rq_snd_buf, rtype)) {
+		iptr = ERR_PTR(-EIO);
+		goto out_unmap;
+	}
 	return 0;
 
-out_overflow:
-	pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
-		hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
-	iptr = ERR_PTR(-EIO);
-
 out_unmap:
 	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
 	return PTR_ERR(iptr);
@@ -916,8 +977,10 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
  * allowed to timeout, to discover the errors at that time.
  */
 void
-rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+rpcrdma_reply_handler(struct work_struct *work)
 {
+	struct rpcrdma_rep *rep =
+			container_of(work, struct rpcrdma_rep, rr_work);
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
@@ -1132,6 +1195,6 @@ out_duplicate:
 
 repost:
 	r_xprt->rx_stats.bad_reply_count++;
-	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
 		rpcrdma_recv_buffer_put(rep);
 }
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index cd0c5581498c..2d8545c34095 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -159,33 +159,34 @@ out_unmap:
 /* Server-side transport endpoint wants a whole page for its send
  * buffer. The client RPC code constructs the RPC header in this
  * buffer before it invokes ->send_request.
- *
- * Returns NULL if there was a temporary allocation failure.
  */
-static void *
-xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
+static int
+xprt_rdma_bc_allocate(struct rpc_task *task)
 {
 	struct rpc_rqst *rqst = task->tk_rqstp;
 	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
+	size_t size = rqst->rq_callsize;
 	struct svcxprt_rdma *rdma;
 	struct page *page;
 
 	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
 
-	/* Prevent an infinite loop: try to make this case work */
-	if (size > PAGE_SIZE)
+	if (size > PAGE_SIZE) {
 		WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
 			  size);
+		return -EINVAL;
+	}
 
 	page = alloc_page(RPCRDMA_DEF_GFP);
 	if (!page)
-		return NULL;
+		return -ENOMEM;
 
-	return page_address(page);
+	rqst->rq_buffer = page_address(page);
+	return 0;
 }
 
 static void
-xprt_rdma_bc_free(void *buffer)
+xprt_rdma_bc_free(struct rpc_task *task)
 {
 	/* No-op: ctxt and page have already been freed. */
 }
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 81f0e879f019..ed5e285fd2ea 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -97,7 +97,7 @@ static struct ctl_table xr_tunables_table[] = {
 		.data		= &xprt_rdma_max_inline_read,
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
-		.proc_handler	= proc_dointvec,
+		.proc_handler	= proc_dointvec_minmax,
 		.extra1		= &min_inline_size,
 		.extra2		= &max_inline_size,
 	},
@@ -106,7 +106,7 @@ static struct ctl_table xr_tunables_table[] = {
 		.data		= &xprt_rdma_max_inline_write,
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
-		.proc_handler	= proc_dointvec,
+		.proc_handler	= proc_dointvec_minmax,
 		.extra1		= &min_inline_size,
 		.extra2		= &max_inline_size,
 	},
@@ -477,115 +477,152 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 	}
 }
 
-/*
- * The RDMA allocate/free functions need the task structure as a place
- * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence.
+/* Allocate a fixed-size buffer in which to construct and send the
+ * RPC-over-RDMA header for this request.
+ */
+static bool
+rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		    gfp_t flags)
+{
+	size_t size = RPCRDMA_HDRBUF_SIZE;
+	struct rpcrdma_regbuf *rb;
+
+	if (req->rl_rdmabuf)
+		return true;
+
+	rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
+	if (IS_ERR(rb))
+		return false;
+
+	r_xprt->rx_stats.hardway_register_count += size;
+	req->rl_rdmabuf = rb;
+	return true;
+}
+
+static bool
+rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		    size_t size, gfp_t flags)
+{
+	struct rpcrdma_regbuf *rb;
+
+	if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
+		return true;
+
+	rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
+	if (IS_ERR(rb))
+		return false;
+
+	rpcrdma_free_regbuf(req->rl_sendbuf);
+	r_xprt->rx_stats.hardway_register_count += size;
+	req->rl_sendbuf = rb;
+	return true;
+}
+
+/* The rq_rcv_buf is used only if a Reply chunk is necessary.
+ * The decision to use a Reply chunk is made later in
+ * rpcrdma_marshal_req. This buffer is registered at that time.
  *
- * The RPC layer allocates both send and receive buffers in the same call
- * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
- * We may register rq_rcv_buf when using reply chunks.
+ * Otherwise, the associated RPC Reply arrives in a separate
+ * Receive buffer, arbitrarily chosen by the HCA. The buffer
+ * allocated here for the RPC Reply is not utilized in that
+ * case. See rpcrdma_inline_fixup.
+ *
+ * A regbuf is used here to remember the buffer size.
  */
-static void *
-xprt_rdma_allocate(struct rpc_task *task, size_t size)
+static bool
+rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		    size_t size, gfp_t flags)
 {
-	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_regbuf *rb;
+
+	if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
+		return true;
+
+	rb = rpcrdma_alloc_regbuf(size, DMA_NONE, flags);
+	if (IS_ERR(rb))
+		return false;
+
+	rpcrdma_free_regbuf(req->rl_recvbuf);
+	r_xprt->rx_stats.hardway_register_count += size;
+	req->rl_recvbuf = rb;
+	return true;
+}
+
+/**
+ * xprt_rdma_allocate - allocate transport resources for an RPC
+ * @task: RPC task
+ *
+ * Return values:
+ *        0:	Success; rq_buffer points to RPC buffer to use
+ *   ENOMEM:	Out of memory, call again later
+ *      EIO:	A permanent error occurred, do not retry
+ *
+ * The RDMA allocate/free functions need the task structure as a place
+ * to hide the struct rpcrdma_req, which is necessary for the actual
+ * send/recv sequence.
+ *
+ * xprt_rdma_allocate provides buffers that are already mapped for
+ * DMA, and a local DMA lkey is provided for each.
+ */
+static int
+xprt_rdma_allocate(struct rpc_task *task)
+{
+	struct rpc_rqst *rqst = task->tk_rqstp;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 	struct rpcrdma_req *req;
-	size_t min_size;
 	gfp_t flags;
 
 	req = rpcrdma_buffer_get(&r_xprt->rx_buf);
 	if (req == NULL)
-		return NULL;
+		return -ENOMEM;
 
 	flags = RPCRDMA_DEF_GFP;
 	if (RPC_IS_SWAPPER(task))
 		flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 
-	if (req->rl_rdmabuf == NULL)
-		goto out_rdmabuf;
-	if (req->rl_sendbuf == NULL)
-		goto out_sendbuf;
-	if (size > req->rl_sendbuf->rg_size)
-		goto out_sendbuf;
-
-out:
-	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
-	req->rl_connect_cookie = 0;	/* our reserved value */
-	req->rl_task = task;
-	return req->rl_sendbuf->rg_base;
-
-out_rdmabuf:
-	min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
-	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
-	if (IS_ERR(rb))
+	if (!rpcrdma_get_rdmabuf(r_xprt, req, flags))
 		goto out_fail;
-	req->rl_rdmabuf = rb;
-
-out_sendbuf:
-	/* XDR encoding and RPC/RDMA marshaling of this request has not
-	 * yet occurred. Thus a lower bound is needed to prevent buffer
-	 * overrun during marshaling.
-	 *
-	 * RPC/RDMA marshaling may choose to send payload bearing ops
-	 * inline, if the result is smaller than the inline threshold.
-	 * The value of the "size" argument accounts for header
-	 * requirements but not for the payload in these cases.
-	 *
-	 * Likewise, allocate enough space to receive a reply up to the
-	 * size of the inline threshold.
-	 *
-	 * It's unlikely that both the send header and the received
-	 * reply will be large, but slush is provided here to allow
-	 * flexibility when marshaling.
-	 */
-	min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
-	min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
-	if (size < min_size)
-		size = min_size;
-
-	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
-	if (IS_ERR(rb))
+	if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
+		goto out_fail;
+	if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
 		goto out_fail;
-	rb->rg_owner = req;
 
-	r_xprt->rx_stats.hardway_register_count += size;
-	rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
-	req->rl_sendbuf = rb;
-	goto out;
+	dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n",
+		task->tk_pid, __func__, rqst->rq_callsize,
+		rqst->rq_rcvsize, req);
+
+	req->rl_connect_cookie = 0;	/* our reserved value */
+	rpcrdma_set_xprtdata(rqst, req);
+	rqst->rq_buffer = req->rl_sendbuf->rg_base;
+	rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
+	return 0;
 
 out_fail:
 	rpcrdma_buffer_put(req);
-	return NULL;
+	return -ENOMEM;
 }
 
-/*
- * This function returns all RDMA resources to the pool.
+/**
+ * xprt_rdma_free - release resources allocated by xprt_rdma_allocate
+ * @task: RPC task
+ *
+ * Caller guarantees rqst->rq_buffer is non-NULL.
  */
 static void
-xprt_rdma_free(void *buffer)
+xprt_rdma_free(struct rpc_task *task)
 {
-	struct rpcrdma_req *req;
-	struct rpcrdma_xprt *r_xprt;
-	struct rpcrdma_regbuf *rb;
-
-	if (buffer == NULL)
-		return;
+	struct rpc_rqst *rqst = task->tk_rqstp;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
-	req = rb->rg_owner;
 	if (req->rl_backchannel)
 		return;
 
-	r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
-
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
-					    !RPC_IS_ASYNC(req->rl_task));
-
+	ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
+	rpcrdma_unmap_sges(ia, req);
 	rpcrdma_buffer_put(req);
 }
 
@@ -685,10 +722,11 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 		   r_xprt->rx_stats.failed_marshal_count,
 		   r_xprt->rx_stats.bad_reply_count,
 		   r_xprt->rx_stats.nomsg_call_count);
-	seq_printf(seq, "%lu %lu %lu\n",
+	seq_printf(seq, "%lu %lu %lu %lu\n",
 		   r_xprt->rx_stats.mrs_recovered,
 		   r_xprt->rx_stats.mrs_orphaned,
-		   r_xprt->rx_stats.mrs_allocated);
+		   r_xprt->rx_stats.mrs_allocated,
+		   r_xprt->rx_stats.local_inv_needed);
 }
 
 static int
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index be3178e5e2d2..ec74289af7ec 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -129,15 +129,6 @@ rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 		       wc->status, wc->vendor_err);
 }
 
-static void
-rpcrdma_receive_worker(struct work_struct *work)
-{
-	struct rpcrdma_rep *rep =
-			container_of(work, struct rpcrdma_rep, rr_work);
-
-	rpcrdma_reply_handler(rep);
-}
-
 /* Perform basic sanity checking to avoid using garbage
  * to update the credit grant value.
  */
@@ -161,13 +152,13 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
 }
 
 /**
- * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
+ * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
  * @cq:	completion queue (ignored)
  * @wc:	completed WR
  *
  */
 static void
-rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
+rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
 {
 	struct ib_cqe *cqe = wc->wr_cqe;
 	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
@@ -185,6 +176,9 @@ rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
 		__func__, rep, wc->byte_len);
 
 	rep->rr_len = wc->byte_len;
+	rep->rr_wc_flags = wc->wc_flags;
+	rep->rr_inv_rkey = wc->ex.invalidate_rkey;
+
 	ib_dma_sync_single_for_cpu(rep->rr_device,
 				   rdmab_addr(rep->rr_rdmabuf),
 				   rep->rr_len, DMA_FROM_DEVICE);
@@ -204,6 +198,36 @@ out_fail:
 	goto out_schedule;
 }
 
+static void
+rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
+			       struct rdma_conn_param *param)
+{
+	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+	const struct rpcrdma_connect_private *pmsg = param->private_data;
+	unsigned int rsize, wsize;
+
+	/* Default settings for RPC-over-RDMA Version One */
+	r_xprt->rx_ia.ri_reminv_expected = false;
+	rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
+	wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
+
+	if (pmsg &&
+	    pmsg->cp_magic == rpcrdma_cmp_magic &&
+	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
+		r_xprt->rx_ia.ri_reminv_expected = true;
+		rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
+		wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
+	}
+
+	if (rsize < cdata->inline_rsize)
+		cdata->inline_rsize = rsize;
+	if (wsize < cdata->inline_wsize)
+		cdata->inline_wsize = wsize;
+	pr_info("rpcrdma: max send %u, max recv %u\n",
+		cdata->inline_wsize, cdata->inline_rsize);
+	rpcrdma_set_max_header_sizes(r_xprt);
+}
+
 static int
 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 {
@@ -244,6 +268,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 			" (%d initiator)\n",
 			__func__, attr->max_dest_rd_atomic,
 			attr->max_rd_atomic);
+		rpcrdma_update_connect_private(xprt, &event->param.conn);
 		goto connected;
 	case RDMA_CM_EVENT_CONNECT_ERROR:
 		connstate = -ENOTCONN;
@@ -454,11 +479,12 @@ int
 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 				struct rpcrdma_create_data_internal *cdata)
 {
+	struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
 	struct ib_cq *sendcq, *recvcq;
 	unsigned int max_qp_wr;
 	int rc;
 
-	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
+	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_SEND_SGES) {
 		dprintk("RPC:       %s: insufficient sge's available\n",
 			__func__);
 		return -ENOMEM;
@@ -487,7 +513,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
-	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
+	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_SEND_SGES;
 	ep->rep_attr.cap.max_recv_sge = 1;
 	ep->rep_attr.cap.max_inline_data = 0;
 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
@@ -536,9 +562,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	/* Initialize cma parameters */
 	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 
-	/* RPC/RDMA does not use private data */
-	ep->rep_remote_cma.private_data = NULL;
-	ep->rep_remote_cma.private_data_len = 0;
+	/* Prepare RDMA-CM private message */
+	pmsg->cp_magic = rpcrdma_cmp_magic;
+	pmsg->cp_version = RPCRDMA_CMP_VERSION;
+	pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
+	pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
+	pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
+	ep->rep_remote_cma.private_data = pmsg;
+	ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
 
 	/* Client offers RDMA Read but does not initiate */
 	ep->rep_remote_cma.initiator_depth = 0;
@@ -849,6 +880,10 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 	req->rl_cqe.done = rpcrdma_wc_send;
 	req->rl_buffer = &r_xprt->rx_buf;
 	INIT_LIST_HEAD(&req->rl_registered);
+	req->rl_send_wr.next = NULL;
+	req->rl_send_wr.wr_cqe = &req->rl_cqe;
+	req->rl_send_wr.sg_list = req->rl_send_sge;
+	req->rl_send_wr.opcode = IB_WR_SEND;
 	return req;
 }
 
@@ -865,17 +900,21 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 	if (rep == NULL)
 		goto out;
 
-	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
-					       GFP_KERNEL);
+	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
+					       DMA_FROM_DEVICE, GFP_KERNEL);
 	if (IS_ERR(rep->rr_rdmabuf)) {
 		rc = PTR_ERR(rep->rr_rdmabuf);
 		goto out_free;
 	}
 
 	rep->rr_device = ia->ri_device;
-	rep->rr_cqe.done = rpcrdma_receive_wc;
+	rep->rr_cqe.done = rpcrdma_wc_receive;
 	rep->rr_rxprt = r_xprt;
-	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
+	INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
+	rep->rr_recv_wr.next = NULL;
+	rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
+	rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
+	rep->rr_recv_wr.num_sge = 1;
 	return rep;
 
 out_free:
@@ -966,17 +1005,18 @@ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
 }
 
 static void
-rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
+rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
 {
-	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
+	rpcrdma_free_regbuf(rep->rr_rdmabuf);
 	kfree(rep);
 }
 
 void
-rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+rpcrdma_destroy_req(struct rpcrdma_req *req)
 {
-	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
-	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
+	rpcrdma_free_regbuf(req->rl_recvbuf);
+	rpcrdma_free_regbuf(req->rl_sendbuf);
+	rpcrdma_free_regbuf(req->rl_rdmabuf);
 	kfree(req);
 }
 
@@ -1009,15 +1049,13 @@ rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
 void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
-	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-
 	cancel_delayed_work_sync(&buf->rb_recovery_worker);
 
 	while (!list_empty(&buf->rb_recv_bufs)) {
 		struct rpcrdma_rep *rep;
 
 		rep = rpcrdma_buffer_get_rep_locked(buf);
-		rpcrdma_destroy_rep(ia, rep);
+		rpcrdma_destroy_rep(rep);
 	}
 	buf->rb_send_count = 0;
 
@@ -1030,7 +1068,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 		list_del(&req->rl_all);
 
 		spin_unlock(&buf->rb_reqslock);
-		rpcrdma_destroy_req(ia, req);
+		rpcrdma_destroy_req(req);
 		spin_lock(&buf->rb_reqslock);
 	}
 	spin_unlock(&buf->rb_reqslock);
@@ -1129,7 +1167,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
 	struct rpcrdma_rep *rep = req->rl_reply;
 
-	req->rl_niovs = 0;
+	req->rl_send_wr.num_sge = 0;
 	req->rl_reply = NULL;
 
 	spin_lock(&buffers->rb_lock);
@@ -1171,70 +1209,81 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 	spin_unlock(&buffers->rb_lock);
 }
 
-/*
- * Wrappers for internal-use kmalloc memory registration, used by buffer code.
- */
-
 /**
- * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
- * @ia: controlling rpcrdma_ia
+ * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
  * @size: size of buffer to be allocated, in bytes
+ * @direction: direction of data movement
  * @flags: GFP flags
  *
- * Returns pointer to private header of an area of internally
- * registered memory, or an ERR_PTR. The registered buffer follows
- * the end of the private header.
+ * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
+ * can be persistently DMA-mapped for I/O.
  *
  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
- * receiving the payload of RDMA RECV operations. regbufs are not
- * used for RDMA READ/WRITE operations, thus are registered only for
- * LOCAL access.
+ * receiving the payload of RDMA RECV operations. During Long Calls
+ * or Replies they may be registered externally via ro_map.
  */
 struct rpcrdma_regbuf *
-rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
+rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
+		     gfp_t flags)
 {
 	struct rpcrdma_regbuf *rb;
-	struct ib_sge *iov;
 
 	rb = kmalloc(sizeof(*rb) + size, flags);
 	if (rb == NULL)
-		goto out;
+		return ERR_PTR(-ENOMEM);
 
-	iov = &rb->rg_iov;
-	iov->addr = ib_dma_map_single(ia->ri_device,
-				      (void *)rb->rg_base, size,
-				      DMA_BIDIRECTIONAL);
-	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
-		goto out_free;
+	rb->rg_device = NULL;
+	rb->rg_direction = direction;
+	rb->rg_iov.length = size;
 
-	iov->length = size;
-	iov->lkey = ia->ri_pd->local_dma_lkey;
-	rb->rg_size = size;
-	rb->rg_owner = NULL;
 	return rb;
+}
 
-out_free:
-	kfree(rb);
-out:
-	return ERR_PTR(-ENOMEM);
+/**
+ * __rpcrdma_map_regbuf - DMA-map a regbuf
+ * @ia: controlling rpcrdma_ia
+ * @rb: regbuf to be mapped
+ */
+bool
+__rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+	if (rb->rg_direction == DMA_NONE)
+		return false;
+
+	rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
+					    (void *)rb->rg_base,
+					    rdmab_length(rb),
+					    rb->rg_direction);
+	if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
+		return false;
+
+	rb->rg_device = ia->ri_device;
+	rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
+	return true;
+}
+
+static void
+rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
+{
+	if (!rpcrdma_regbuf_is_mapped(rb))
+		return;
+
+	ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
+			    rdmab_length(rb), rb->rg_direction);
+	rb->rg_device = NULL;
 }
 
 /**
  * rpcrdma_free_regbuf - deregister and free registered buffer
- * @ia: controlling rpcrdma_ia
  * @rb: regbuf to be deregistered and freed
  */
 void
-rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
 {
-	struct ib_sge *iov;
-
 	if (!rb)
 		return;
 
-	iov = &rb->rg_iov;
-	ib_dma_unmap_single(ia->ri_device,
-			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
+	rpcrdma_dma_unmap_regbuf(rb);
 	kfree(rb);
 }
 
@@ -1248,39 +1297,28 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 		struct rpcrdma_ep *ep,
 		struct rpcrdma_req *req)
 {
-	struct ib_device *device = ia->ri_device;
-	struct ib_send_wr send_wr, *send_wr_fail;
-	struct rpcrdma_rep *rep = req->rl_reply;
-	struct ib_sge *iov = req->rl_send_iov;
-	int i, rc;
+	struct ib_send_wr *send_wr = &req->rl_send_wr;
+	struct ib_send_wr *send_wr_fail;
+	int rc;
 
-	if (rep) {
-		rc = rpcrdma_ep_post_recv(ia, ep, rep);
+	if (req->rl_reply) {
+		rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
 		if (rc)
 			return rc;
 		req->rl_reply = NULL;
 	}
 
-	send_wr.next = NULL;
-	send_wr.wr_cqe = &req->rl_cqe;
-	send_wr.sg_list = iov;
-	send_wr.num_sge = req->rl_niovs;
-	send_wr.opcode = IB_WR_SEND;
-
-	for (i = 0; i < send_wr.num_sge; i++)
-		ib_dma_sync_single_for_device(device, iov[i].addr,
-					      iov[i].length, DMA_TO_DEVICE);
 	dprintk("RPC:       %s: posting %d s/g entries\n",
-		__func__, send_wr.num_sge);
+		__func__, send_wr->num_sge);
 
 	if (DECR_CQCOUNT(ep) > 0)
-		send_wr.send_flags = 0;
+		send_wr->send_flags = 0;
 	else { /* Provider must take a send completion every now and then */
 		INIT_CQCOUNT(ep);
-		send_wr.send_flags = IB_SEND_SIGNALED;
+		send_wr->send_flags = IB_SEND_SIGNALED;
 	}
 
-	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
+	rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
 	if (rc)
 		goto out_postsend_err;
 	return 0;
@@ -1290,32 +1328,24 @@ out_postsend_err:
 	return -ENOTCONN;
 }
 
-/*
- * (Re)post a receive buffer.
- */
 int
 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
-		     struct rpcrdma_ep *ep,
 		     struct rpcrdma_rep *rep)
 {
-	struct ib_recv_wr recv_wr, *recv_wr_fail;
+	struct ib_recv_wr *recv_wr_fail;
 	int rc;
 
-	recv_wr.next = NULL;
-	recv_wr.wr_cqe = &rep->rr_cqe;
-	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
-	recv_wr.num_sge = 1;
-
-	ib_dma_sync_single_for_cpu(ia->ri_device,
-				   rdmab_addr(rep->rr_rdmabuf),
-				   rdmab_length(rep->rr_rdmabuf),
-				   DMA_BIDIRECTIONAL);
-
-	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
+	if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
+		goto out_map;
+	rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
 	if (rc)
 		goto out_postrecv;
 	return 0;
 
+out_map:
+	pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
+	return -EIO;
+
 out_postrecv:
 	pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
 	return -ENOTCONN;
@@ -1333,7 +1363,6 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
 {
 	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 	struct rpcrdma_rep *rep;
 	int rc;
 
@@ -1344,7 +1373,7 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
 		rep = rpcrdma_buffer_get_rep_locked(buffers);
 		spin_unlock(&buffers->rb_lock);
 
-		rc = rpcrdma_ep_post_recv(ia, ep, rep);
+		rc = rpcrdma_ep_post_recv(ia, rep);
 		if (rc)
 			goto out_rc;
 	}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index a71b0f5897d8..0d35b761c883 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -70,9 +70,11 @@ struct rpcrdma_ia {
 	struct ib_pd		*ri_pd;
 	struct completion	ri_done;
 	int			ri_async_rc;
+	unsigned int		ri_max_segs;
 	unsigned int		ri_max_frmr_depth;
 	unsigned int		ri_max_inline_write;
 	unsigned int		ri_max_inline_read;
+	bool			ri_reminv_expected;
 	struct ib_qp_attr	ri_qp_attr;
 	struct ib_qp_init_attr	ri_qp_init_attr;
 };
@@ -87,6 +89,7 @@ struct rpcrdma_ep {
 	int			rep_connected;
 	struct ib_qp_init_attr	rep_attr;
 	wait_queue_head_t 	rep_connect_wait;
+	struct rpcrdma_connect_private	rep_cm_private;
 	struct rdma_conn_param	rep_remote_cma;
 	struct sockaddr_storage	rep_remote_addr;
 	struct delayed_work	rep_connect_worker;
@@ -112,9 +115,9 @@ struct rpcrdma_ep {
  */
 
 struct rpcrdma_regbuf {
-	size_t			rg_size;
-	struct rpcrdma_req	*rg_owner;
 	struct ib_sge		rg_iov;
+	struct ib_device	*rg_device;
+	enum dma_data_direction	rg_direction;
 	__be32			rg_base[0] __attribute__ ((aligned(256)));
 };
 
@@ -162,7 +165,10 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
  * The smallest inline threshold is 1024 bytes, ensuring that
  * at least 750 bytes are available for RPC messages.
  */
-#define RPCRDMA_MAX_HDR_SEGS	(8)
+enum {
+	RPCRDMA_MAX_HDR_SEGS = 8,
+	RPCRDMA_HDRBUF_SIZE = 256,
+};
 
 /*
  * struct rpcrdma_rep -- this structure encapsulates state required to recv
@@ -182,10 +188,13 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
 struct rpcrdma_rep {
 	struct ib_cqe		rr_cqe;
 	unsigned int		rr_len;
+	int			rr_wc_flags;
+	u32			rr_inv_rkey;
 	struct ib_device	*rr_device;
 	struct rpcrdma_xprt	*rr_rxprt;
 	struct work_struct	rr_work;
 	struct list_head	rr_list;
+	struct ib_recv_wr	rr_recv_wr;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 };
 
@@ -276,19 +285,30 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
 	char		*mr_offset;	/* kva if no page, else offset */
 };
 
-#define RPCRDMA_MAX_IOVS	(2)
+/* Reserve enough Send SGEs to send a maximum size inline request:
+ * - RPC-over-RDMA header
+ * - xdr_buf head iovec
+ * - RPCRDMA_MAX_INLINE bytes, possibly unaligned, in pages
+ * - xdr_buf tail iovec
+ */
+enum {
+	RPCRDMA_MAX_SEND_PAGES = PAGE_SIZE + RPCRDMA_MAX_INLINE - 1,
+	RPCRDMA_MAX_PAGE_SGES = (RPCRDMA_MAX_SEND_PAGES >> PAGE_SHIFT) + 1,
+	RPCRDMA_MAX_SEND_SGES = 1 + 1 + RPCRDMA_MAX_PAGE_SGES + 1,
+};
 
 struct rpcrdma_buffer;
 struct rpcrdma_req {
 	struct list_head	rl_free;
-	unsigned int		rl_niovs;
+	unsigned int		rl_mapped_sges;
 	unsigned int		rl_connect_cookie;
-	struct rpc_task		*rl_task;
 	struct rpcrdma_buffer	*rl_buffer;
-	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
-	struct ib_sge		rl_send_iov[RPCRDMA_MAX_IOVS];
-	struct rpcrdma_regbuf	*rl_rdmabuf;
-	struct rpcrdma_regbuf	*rl_sendbuf;
+	struct rpcrdma_rep	*rl_reply;
+	struct ib_send_wr	rl_send_wr;
+	struct ib_sge		rl_send_sge[RPCRDMA_MAX_SEND_SGES];
+	struct rpcrdma_regbuf	*rl_rdmabuf;	/* xprt header */
+	struct rpcrdma_regbuf	*rl_sendbuf;	/* rq_snd_buf */
+	struct rpcrdma_regbuf	*rl_recvbuf;	/* rq_rcv_buf */
 
 	struct ib_cqe		rl_cqe;
 	struct list_head	rl_all;
@@ -298,14 +318,16 @@ struct rpcrdma_req {
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
 };
 
+static inline void
+rpcrdma_set_xprtdata(struct rpc_rqst *rqst, struct rpcrdma_req *req)
+{
+	rqst->rq_xprtdata = req;
+}
+
 static inline struct rpcrdma_req *
 rpcr_to_rdmar(struct rpc_rqst *rqst)
 {
-	void *buffer = rqst->rq_buffer;
-	struct rpcrdma_regbuf *rb;
-
-	rb = container_of(buffer, struct rpcrdma_regbuf, rg_base);
-	return rb->rg_owner;
+	return rqst->rq_xprtdata;
 }
 
 /*
@@ -356,15 +378,6 @@ struct rpcrdma_create_data_internal {
 	unsigned int	padding;	/* non-rdma write header padding */
 };
 
-#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
-	(rpcx_to_rdmad(rq->rq_xprt).inline_rsize)
-
-#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
-	(rpcx_to_rdmad(rq->rq_xprt).inline_wsize)
-
-#define RPCRDMA_INLINE_PAD_VALUE(rq)\
-	rpcx_to_rdmad(rq->rq_xprt).padding
-
 /*
  * Statistics for RPCRDMA
  */
@@ -386,6 +399,7 @@ struct rpcrdma_stats {
 	unsigned long		mrs_recovered;
 	unsigned long		mrs_orphaned;
 	unsigned long		mrs_allocated;
+	unsigned long		local_inv_needed;
 };
 
 /*
@@ -409,6 +423,7 @@ struct rpcrdma_memreg_ops {
 				      struct rpcrdma_mw *);
 	void		(*ro_release_mr)(struct rpcrdma_mw *);
 	const char	*ro_displayname;
+	const int	ro_send_w_inv_ok;
 };
 
 extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;
@@ -461,15 +476,14 @@ void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 
 int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
 				struct rpcrdma_req *);
-int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
-				struct rpcrdma_rep *);
+int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_rep *);
 
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
 struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
 struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
-void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
+void rpcrdma_destroy_req(struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
@@ -482,10 +496,24 @@ void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
 void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
 
-struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
-					    size_t, gfp_t);
-void rpcrdma_free_regbuf(struct rpcrdma_ia *,
-			 struct rpcrdma_regbuf *);
+struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(size_t, enum dma_data_direction,
+					    gfp_t);
+bool __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *);
+void rpcrdma_free_regbuf(struct rpcrdma_regbuf *);
+
+static inline bool
+rpcrdma_regbuf_is_mapped(struct rpcrdma_regbuf *rb)
+{
+	return rb->rg_device != NULL;
+}
+
+static inline bool
+rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+	if (likely(rpcrdma_regbuf_is_mapped(rb)))
+		return true;
+	return __rpcrdma_dma_map_regbuf(ia, rb);
+}
 
 int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
@@ -507,15 +535,25 @@ rpcrdma_data_dir(bool writing)
  */
 void rpcrdma_connect_worker(struct work_struct *);
 void rpcrdma_conn_func(struct rpcrdma_ep *);
-void rpcrdma_reply_handler(struct rpcrdma_rep *);
+void rpcrdma_reply_handler(struct work_struct *);
 
 /*
  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
  */
+
+enum rpcrdma_chunktype {
+	rpcrdma_noch = 0,
+	rpcrdma_readch,
+	rpcrdma_areadch,
+	rpcrdma_writech,
+	rpcrdma_replych
+};
+
+bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
+			       u32, struct xdr_buf *, enum rpcrdma_chunktype);
+void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
 int rpcrdma_marshal_req(struct rpc_rqst *);
-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
-				  struct rpcrdma_create_data_internal *,
-				  unsigned int);
+void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 
 /* RPC/RDMA module init - xprtrdma/transport.c
  */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index bf168838a029..0137af1c0916 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -473,7 +473,16 @@ static int xs_nospace(struct rpc_task *task)
 	spin_unlock_bh(&xprt->transport_lock);
 
 	/* Race breaker in case memory is freed before above code is called */
-	sk->sk_write_space(sk);
+	if (ret == -EAGAIN) {
+		struct socket_wq *wq;
+
+		rcu_read_lock();
+		wq = rcu_dereference(sk->sk_wq);
+		set_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags);
+		rcu_read_unlock();
+
+		sk->sk_write_space(sk);
+	}
 	return ret;
 }
 
@@ -2533,35 +2542,38 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
  * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
  * to use the server side send routines.
  */
-static void *bc_malloc(struct rpc_task *task, size_t size)
+static int bc_malloc(struct rpc_task *task)
 {
+	struct rpc_rqst *rqst = task->tk_rqstp;
+	size_t size = rqst->rq_callsize;
 	struct page *page;
 	struct rpc_buffer *buf;
 
-	WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
-	if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
-		return NULL;
+	if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) {
+		WARN_ONCE(1, "xprtsock: large bc buffer request (size %zu)\n",
+			  size);
+		return -EINVAL;
+	}
 
 	page = alloc_page(GFP_KERNEL);
 	if (!page)
-		return NULL;
+		return -ENOMEM;
 
 	buf = page_address(page);
 	buf->len = PAGE_SIZE;
 
-	return buf->data;
+	rqst->rq_buffer = buf->data;
+	return 0;
 }
 
 /*
  * Free the space allocated in the bc_alloc routine
  */
-static void bc_free(void *buffer)
+static void bc_free(struct rpc_task *task)
 {
+	void *buffer = task->tk_rqstp->rq_buffer;
 	struct rpc_buffer *buf;
 
-	if (!buffer)
-		return;
-
 	buf = container_of(buffer, struct rpc_buffer, data);
 	free_page((unsigned long)buf);
 }