summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2019-03-12 14:50:42 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2019-03-12 14:50:42 -0700
commit1fbf3e48123d701584bc75ccac67ef2fe412ac4c (patch)
treecaca042e8d753d4147219820db8497a766b873bd /net
parentf88c5942cfaf7d55e46d395136cccaca65b2e3bf (diff)
parent4d6c671ace569d4b0d3f8d92ab3aef18a5d166bc (diff)
downloadlinux-1fbf3e48123d701584bc75ccac67ef2fe412ac4c.tar.gz
Merge tag 'nfs-for-5.1-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Stable fixes:
   - Fixes for NFS I/O request leakages
   - Fix error handling paths in the NFS I/O recoalescing code
   - Reinitialise NFSv4.1 sequence results before retransmitting a
     request
   - Fix a soft lockup in the delegation recovery code
   - Bulk destroy of layouts needs to be safe w.r.t. umount
   - Prevent thundering herd issues when the SUNRPC socket is not
     connected
   - Respect RPC call timeouts when retrying transmission

  Features:
   - Convert rpc auth layer to use xdr_streams
   - Config option to disable insecure RPCSEC_GSS crypto types
   - Reduce size of RPC receive buffers
   - Readdirplus optimization by cache mechanism
   - Convert SUNRPC socket send code to use iov_iter()
   - SUNRPC micro-optimisations to avoid indirect calls
   - Add support for the pNFS LAYOUTERROR operation and use it with the
     pNFS/flexfiles driver
   - Add trace events to report non-zero NFS status codes
   - Various removals of unnecessary dprintks

  Bugfixes and cleanups:
   - Fix a number of sparse warnings and documentation format warnings
   - Fix nfs_parse_devname to not modify it's argument
   - Fix potential corruption of page being written through pNFS/blocks
   - fix xfstest generic/099 failures on nfsv3
   - Avoid NFSv4.1 "false retries" when RPC calls are interrupted
   - Abort I/O early if the pNFS/flexfiles layout segment was
     invalidated
   - Avoid unnecessary pNFS/flexfiles layout invalidations"

* tag 'nfs-for-5.1-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (90 commits)
  SUNRPC: Take the transport send lock before binding+connecting
  SUNRPC: Micro-optimise when the task is known not to be sleeping
  SUNRPC: Check whether the task was transmitted before rebind/reconnect
  SUNRPC: Remove redundant calls to RPC_IS_QUEUED()
  SUNRPC: Clean up
  SUNRPC: Respect RPC call timeouts when retrying transmission
  SUNRPC: Fix up RPC back channel transmission
  SUNRPC: Prevent thundering herd when the socket is not connected
  SUNRPC: Allow dynamic allocation of back channel slots
  NFSv4.1: Bump the default callback session slot count to 16
  SUNRPC: Convert remaining GFP_NOIO, and GFP_NOWAIT sites in sunrpc
  NFS/flexfiles: Clean up mirror DS initialisation
  NFS/flexfiles: Remove dead code in ff_layout_mirror_valid()
  NFS/flexfile: Simplify nfs4_ff_layout_select_ds_stateid()
  NFS/flexfile: Simplify nfs4_ff_layout_ds_version()
  NFS/flexfiles: Simplify ff_layout_get_ds_cred()
  NFS/flexfiles: Simplify nfs4_ff_find_or_create_ds_client()
  NFS/flexfiles: Simplify nfs4_ff_layout_select_ds_fh()
  NFS/flexfiles: Speed up read failover when DSes are down
  NFS/flexfiles: Don't invalidate DS deviceids for being unresponsive
  ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/Kconfig16
-rw-r--r--net/sunrpc/auth.c136
-rw-r--r--net/sunrpc/auth_gss/Makefile2
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c553
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_mech.c29
-rw-r--r--net/sunrpc/auth_gss/gss_krb5_wrap.c8
-rw-r--r--net/sunrpc/auth_gss/gss_mech_switch.c27
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_upcall.c15
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_upcall.h16
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_xdr.c15
-rw-r--r--net/sunrpc/auth_gss/gss_rpc_xdr.h17
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c3
-rw-r--r--net/sunrpc/auth_gss/trace.c11
-rw-r--r--net/sunrpc/auth_null.c56
-rw-r--r--net/sunrpc/auth_unix.c122
-rw-r--r--net/sunrpc/backchannel_rqst.c42
-rw-r--r--net/sunrpc/clnt.c660
-rw-r--r--net/sunrpc/sched.c17
-rw-r--r--net/sunrpc/svc.c19
-rw-r--r--net/sunrpc/xdr.c121
-rw-r--r--net/sunrpc/xprt.c23
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c3
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c4
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c22
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c1
-rw-r--r--net/sunrpc/xprtrdma/transport.c2
-rw-r--r--net/sunrpc/xprtrdma/verbs.c2
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h12
-rw-r--r--net/sunrpc/xprtsock.c317
29 files changed, 1203 insertions, 1068 deletions
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig
index ac09ca803296..83f5617bae07 100644
--- a/net/sunrpc/Kconfig
+++ b/net/sunrpc/Kconfig
@@ -34,6 +34,22 @@ config RPCSEC_GSS_KRB5
 
 	  If unsure, say Y.
 
+config CONFIG_SUNRPC_DISABLE_INSECURE_ENCTYPES
+	bool "Secure RPC: Disable insecure Kerberos encryption types"
+	depends on RPCSEC_GSS_KRB5
+	default n
+	help
+	  Choose Y here to disable the use of deprecated encryption types
+	  with the Kerberos version 5 GSS-API mechanism (RFC 1964). The
+	  deprecated encryption types include DES-CBC-MD5, DES-CBC-CRC,
+	  and DES-CBC-MD4. These types were deprecated by RFC 6649 because
+	  they were found to be insecure.
+
+	  N is the default because many sites have deployed KDCs and
+	  keytabs that contain only these deprecated encryption types.
+	  Choosing Y prevents the use of known-insecure encryption types
+	  but might result in compatibility problems.
+
 config SUNRPC_DEBUG
 	bool "RPC: Enable dprintk debugging"
 	depends on SUNRPC && SYSCTL
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index f3023bbc0b7f..e7861026b9e5 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -17,9 +17,7 @@
 #include <linux/sunrpc/gss_api.h>
 #include <linux/spinlock.h>
 
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-# define RPCDBG_FACILITY	RPCDBG_AUTH
-#endif
+#include <trace/events/sunrpc.h>
 
 #define RPC_CREDCACHE_DEFAULT_HASHBITS	(4)
 struct rpc_cred_cache {
@@ -267,8 +265,6 @@ rpcauth_list_flavors(rpc_authflavor_t *array, int size)
 		}
 	}
 	rcu_read_unlock();
-
-	dprintk("RPC:       %s returns %d\n", __func__, result);
 	return result;
 }
 EXPORT_SYMBOL_GPL(rpcauth_list_flavors);
@@ -636,9 +632,6 @@ rpcauth_lookupcred(struct rpc_auth *auth, int flags)
 	struct rpc_cred *ret;
 	const struct cred *cred = current_cred();
 
-	dprintk("RPC:       looking up %s cred\n",
-		auth->au_ops->au_name);
-
 	memset(&acred, 0, sizeof(acred));
 	acred.cred = cred;
 	ret = auth->au_ops->lookup_cred(auth, &acred, flags);
@@ -670,8 +663,6 @@ rpcauth_bind_root_cred(struct rpc_task *task, int lookupflags)
 	};
 	struct rpc_cred *ret;
 
-	dprintk("RPC: %5u looking up %s cred\n",
-		task->tk_pid, task->tk_client->cl_auth->au_ops->au_name);
 	ret = auth->au_ops->lookup_cred(auth, &acred, lookupflags);
 	put_cred(acred.cred);
 	return ret;
@@ -688,8 +679,6 @@ rpcauth_bind_machine_cred(struct rpc_task *task, int lookupflags)
 
 	if (!acred.principal)
 		return NULL;
-	dprintk("RPC: %5u looking up %s machine cred\n",
-		task->tk_pid, task->tk_client->cl_auth->au_ops->au_name);
 	return auth->au_ops->lookup_cred(auth, &acred, lookupflags);
 }
 
@@ -698,8 +687,6 @@ rpcauth_bind_new_cred(struct rpc_task *task, int lookupflags)
 {
 	struct rpc_auth *auth = task->tk_client->cl_auth;
 
-	dprintk("RPC: %5u looking up %s cred\n",
-		task->tk_pid, auth->au_ops->au_name);
 	return rpcauth_lookupcred(auth, lookupflags);
 }
 
@@ -771,75 +758,102 @@ destroy:
 }
 EXPORT_SYMBOL_GPL(put_rpccred);
 
-__be32 *
-rpcauth_marshcred(struct rpc_task *task, __be32 *p)
+/**
+ * rpcauth_marshcred - Append RPC credential to end of @xdr
+ * @task: controlling RPC task
+ * @xdr: xdr_stream containing initial portion of RPC Call header
+ *
+ * On success, an appropriate verifier is added to @xdr, @xdr is
+ * updated to point past the verifier, and zero is returned.
+ * Otherwise, @xdr is in an undefined state and a negative errno
+ * is returned.
+ */
+int rpcauth_marshcred(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct rpc_cred	*cred = task->tk_rqstp->rq_cred;
+	const struct rpc_credops *ops = task->tk_rqstp->rq_cred->cr_ops;
 
-	dprintk("RPC: %5u marshaling %s cred %p\n",
-		task->tk_pid, cred->cr_auth->au_ops->au_name, cred);
-
-	return cred->cr_ops->crmarshal(task, p);
+	return ops->crmarshal(task, xdr);
 }
 
-__be32 *
-rpcauth_checkverf(struct rpc_task *task, __be32 *p)
+/**
+ * rpcauth_wrap_req_encode - XDR encode the RPC procedure
+ * @task: controlling RPC task
+ * @xdr: stream where on-the-wire bytes are to be marshalled
+ *
+ * On success, @xdr contains the encoded and wrapped message.
+ * Otherwise, @xdr is in an undefined state.
+ */
+int rpcauth_wrap_req_encode(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct rpc_cred	*cred = task->tk_rqstp->rq_cred;
+	kxdreproc_t encode = task->tk_msg.rpc_proc->p_encode;
 
-	dprintk("RPC: %5u validating %s cred %p\n",
-		task->tk_pid, cred->cr_auth->au_ops->au_name, cred);
-
-	return cred->cr_ops->crvalidate(task, p);
+	encode(task->tk_rqstp, xdr, task->tk_msg.rpc_argp);
+	return 0;
 }
+EXPORT_SYMBOL_GPL(rpcauth_wrap_req_encode);
 
-static void rpcauth_wrap_req_encode(kxdreproc_t encode, struct rpc_rqst *rqstp,
-				   __be32 *data, void *obj)
+/**
+ * rpcauth_wrap_req - XDR encode and wrap the RPC procedure
+ * @task: controlling RPC task
+ * @xdr: stream where on-the-wire bytes are to be marshalled
+ *
+ * On success, @xdr contains the encoded and wrapped message,
+ * and zero is returned. Otherwise, @xdr is in an undefined
+ * state and a negative errno is returned.
+ */
+int rpcauth_wrap_req(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct xdr_stream xdr;
+	const struct rpc_credops *ops = task->tk_rqstp->rq_cred->cr_ops;
 
-	xdr_init_encode(&xdr, &rqstp->rq_snd_buf, data);
-	encode(rqstp, &xdr, obj);
+	return ops->crwrap_req(task, xdr);
 }
 
+/**
+ * rpcauth_checkverf - Validate verifier in RPC Reply header
+ * @task: controlling RPC task
+ * @xdr: xdr_stream containing RPC Reply header
+ *
+ * On success, @xdr is updated to point past the verifier and
+ * zero is returned. Otherwise, @xdr is in an undefined state
+ * and a negative errno is returned.
+ */
 int
-rpcauth_wrap_req(struct rpc_task *task, kxdreproc_t encode, void *rqstp,
-		__be32 *data, void *obj)
+rpcauth_checkverf(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
+	const struct rpc_credops *ops = task->tk_rqstp->rq_cred->cr_ops;
 
-	dprintk("RPC: %5u using %s cred %p to wrap rpc data\n",
-			task->tk_pid, cred->cr_ops->cr_name, cred);
-	if (cred->cr_ops->crwrap_req)
-		return cred->cr_ops->crwrap_req(task, encode, rqstp, data, obj);
-	/* By default, we encode the arguments normally. */
-	rpcauth_wrap_req_encode(encode, rqstp, data, obj);
-	return 0;
+	return ops->crvalidate(task, xdr);
 }
 
-static int
-rpcauth_unwrap_req_decode(kxdrdproc_t decode, struct rpc_rqst *rqstp,
-			  __be32 *data, void *obj)
+/**
+ * rpcauth_unwrap_resp_decode - Invoke XDR decode function
+ * @task: controlling RPC task
+ * @xdr: stream where the Reply message resides
+ *
+ * Returns zero on success; otherwise a negative errno is returned.
+ */
+int
+rpcauth_unwrap_resp_decode(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct xdr_stream xdr;
+	kxdrdproc_t decode = task->tk_msg.rpc_proc->p_decode;
 
-	xdr_init_decode(&xdr, &rqstp->rq_rcv_buf, data);
-	return decode(rqstp, &xdr, obj);
+	return decode(task->tk_rqstp, xdr, task->tk_msg.rpc_resp);
 }
+EXPORT_SYMBOL_GPL(rpcauth_unwrap_resp_decode);
 
+/**
+ * rpcauth_unwrap_resp - Invoke unwrap and decode function for the cred
+ * @task: controlling RPC task
+ * @xdr: stream where the Reply message resides
+ *
+ * Returns zero on success; otherwise a negative errno is returned.
+ */
 int
-rpcauth_unwrap_resp(struct rpc_task *task, kxdrdproc_t decode, void *rqstp,
-		__be32 *data, void *obj)
+rpcauth_unwrap_resp(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
+	const struct rpc_credops *ops = task->tk_rqstp->rq_cred->cr_ops;
 
-	dprintk("RPC: %5u using %s cred %p to unwrap rpc data\n",
-			task->tk_pid, cred->cr_ops->cr_name, cred);
-	if (cred->cr_ops->crunwrap_resp)
-		return cred->cr_ops->crunwrap_resp(task, decode, rqstp,
-						   data, obj);
-	/* By default, we decode the arguments normally. */
-	return rpcauth_unwrap_req_decode(decode, rqstp, data, obj);
+	return ops->crunwrap_resp(task, xdr);
 }
 
 bool
@@ -865,8 +879,6 @@ rpcauth_refreshcred(struct rpc_task *task)
 			goto out;
 		cred = task->tk_rqstp->rq_cred;
 	}
-	dprintk("RPC: %5u refreshing %s cred %p\n",
-		task->tk_pid, cred->cr_auth->au_ops->au_name, cred);
 
 	err = cred->cr_ops->crrefresh(task);
 out:
@@ -880,8 +892,6 @@ rpcauth_invalcred(struct rpc_task *task)
 {
 	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
 
-	dprintk("RPC: %5u invalidating %s cred %p\n",
-		task->tk_pid, cred->cr_auth->au_ops->au_name, cred);
 	if (cred)
 		clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
 }
diff --git a/net/sunrpc/auth_gss/Makefile b/net/sunrpc/auth_gss/Makefile
index c374268b008f..4a29f4c5dac4 100644
--- a/net/sunrpc/auth_gss/Makefile
+++ b/net/sunrpc/auth_gss/Makefile
@@ -7,7 +7,7 @@ obj-$(CONFIG_SUNRPC_GSS) += auth_rpcgss.o
 
 auth_rpcgss-y := auth_gss.o gss_generic_token.o \
 	gss_mech_switch.o svcauth_gss.o \
-	gss_rpc_upcall.o gss_rpc_xdr.o
+	gss_rpc_upcall.o gss_rpc_xdr.o trace.o
 
 obj-$(CONFIG_RPCSEC_GSS_KRB5) += rpcsec_gss_krb5.o
 
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 1531b0219344..3fd56c0c90ae 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: BSD-3-Clause
 /*
  * linux/net/sunrpc/auth_gss/auth_gss.c
  *
@@ -8,34 +9,8 @@
  *
  *  Dug Song       <dugsong@monkey.org>
  *  Andy Adamson   <andros@umich.edu>
- *
- *  Redistribution and use in source and binary forms, with or without
- *  modification, are permitted provided that the following conditions
- *  are met:
- *
- *  1. Redistributions of source code must retain the above copyright
- *     notice, this list of conditions and the following disclaimer.
- *  2. Redistributions in binary form must reproduce the above copyright
- *     notice, this list of conditions and the following disclaimer in the
- *     documentation and/or other materials provided with the distribution.
- *  3. Neither the name of the University nor the names of its
- *     contributors may be used to endorse or promote products derived
- *     from this software without specific prior written permission.
- *
- *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
- *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- *  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- *  DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
- *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
- *  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-
 #include <linux/module.h>
 #include <linux/init.h>
 #include <linux/types.h>
@@ -55,6 +30,8 @@
 
 #include "../netns.h"
 
+#include <trace/events/rpcgss.h>
+
 static const struct rpc_authops authgss_ops;
 
 static const struct rpc_credops gss_credops;
@@ -260,6 +237,7 @@ gss_fill_context(const void *p, const void *end, struct gss_cl_ctx *ctx, struct
 	}
 	ret = gss_import_sec_context(p, seclen, gm, &ctx->gc_gss_ctx, NULL, GFP_NOFS);
 	if (ret < 0) {
+		trace_rpcgss_import_ctx(ret);
 		p = ERR_PTR(ret);
 		goto err;
 	}
@@ -275,12 +253,9 @@ gss_fill_context(const void *p, const void *end, struct gss_cl_ctx *ctx, struct
 	if (IS_ERR(p))
 		goto err;
 done:
-	dprintk("RPC:       %s Success. gc_expiry %lu now %lu timeout %u acceptor %.*s\n",
-		__func__, ctx->gc_expiry, now, timeout, ctx->gc_acceptor.len,
-		ctx->gc_acceptor.data);
-	return p;
+	trace_rpcgss_context(ctx->gc_expiry, now, timeout,
+			     ctx->gc_acceptor.len, ctx->gc_acceptor.data);
 err:
-	dprintk("RPC:       %s returns error %ld\n", __func__, -PTR_ERR(p));
 	return p;
 }
 
@@ -354,10 +329,8 @@ __gss_find_upcall(struct rpc_pipe *pipe, kuid_t uid, const struct gss_auth *auth
 		if (auth && pos->auth->service != auth->service)
 			continue;
 		refcount_inc(&pos->count);
-		dprintk("RPC:       %s found msg %p\n", __func__, pos);
 		return pos;
 	}
-	dprintk("RPC:       %s found nothing\n", __func__);
 	return NULL;
 }
 
@@ -456,7 +429,7 @@ static int gss_encode_v1_msg(struct gss_upcall_msg *gss_msg,
 	size_t buflen = sizeof(gss_msg->databuf);
 	int len;
 
-	len = scnprintf(p, buflen, "mech=%s uid=%d ", mech->gm_name,
+	len = scnprintf(p, buflen, "mech=%s uid=%d", mech->gm_name,
 			from_kuid(&init_user_ns, gss_msg->uid));
 	buflen -= len;
 	p += len;
@@ -467,7 +440,7 @@ static int gss_encode_v1_msg(struct gss_upcall_msg *gss_msg,
 	 * identity that we are authenticating to.
 	 */
 	if (target_name) {
-		len = scnprintf(p, buflen, "target=%s ", target_name);
+		len = scnprintf(p, buflen, " target=%s", target_name);
 		buflen -= len;
 		p += len;
 		gss_msg->msg.len += len;
@@ -487,11 +460,11 @@ static int gss_encode_v1_msg(struct gss_upcall_msg *gss_msg,
 		char *c = strchr(service_name, '@');
 
 		if (!c)
-			len = scnprintf(p, buflen, "service=%s ",
+			len = scnprintf(p, buflen, " service=%s",
 					service_name);
 		else
 			len = scnprintf(p, buflen,
-					"service=%.*s srchost=%s ",
+					" service=%.*s srchost=%s",
 					(int)(c - service_name),
 					service_name, c + 1);
 		buflen -= len;
@@ -500,17 +473,17 @@ static int gss_encode_v1_msg(struct gss_upcall_msg *gss_msg,
 	}
 
 	if (mech->gm_upcall_enctypes) {
-		len = scnprintf(p, buflen, "enctypes=%s ",
+		len = scnprintf(p, buflen, " enctypes=%s",
 				mech->gm_upcall_enctypes);
 		buflen -= len;
 		p += len;
 		gss_msg->msg.len += len;
 	}
+	trace_rpcgss_upcall_msg(gss_msg->databuf);
 	len = scnprintf(p, buflen, "\n");
 	if (len == 0)
 		goto out_overflow;
 	gss_msg->msg.len += len;
-
 	gss_msg->msg.data = gss_msg->databuf;
 	return 0;
 out_overflow:
@@ -603,8 +576,6 @@ gss_refresh_upcall(struct rpc_task *task)
 	struct rpc_pipe *pipe;
 	int err = 0;
 
-	dprintk("RPC: %5u %s for uid %u\n",
-		task->tk_pid, __func__, from_kuid(&init_user_ns, cred->cr_cred->fsuid));
 	gss_msg = gss_setup_upcall(gss_auth, cred);
 	if (PTR_ERR(gss_msg) == -EAGAIN) {
 		/* XXX: warning on the first, under the assumption we
@@ -612,7 +583,8 @@ gss_refresh_upcall(struct rpc_task *task)
 		warn_gssd();
 		task->tk_timeout = 15*HZ;
 		rpc_sleep_on(&pipe_version_rpc_waitqueue, task, NULL);
-		return -EAGAIN;
+		err = -EAGAIN;
+		goto out;
 	}
 	if (IS_ERR(gss_msg)) {
 		err = PTR_ERR(gss_msg);
@@ -635,9 +607,8 @@ gss_refresh_upcall(struct rpc_task *task)
 	spin_unlock(&pipe->lock);
 	gss_release_msg(gss_msg);
 out:
-	dprintk("RPC: %5u %s for uid %u result %d\n",
-		task->tk_pid, __func__,
-		from_kuid(&init_user_ns, cred->cr_cred->fsuid),	err);
+	trace_rpcgss_upcall_result(from_kuid(&init_user_ns,
+					     cred->cr_cred->fsuid), err);
 	return err;
 }
 
@@ -652,14 +623,13 @@ gss_create_upcall(struct gss_auth *gss_auth, struct gss_cred *gss_cred)
 	DEFINE_WAIT(wait);
 	int err;
 
-	dprintk("RPC:       %s for uid %u\n",
-		__func__, from_kuid(&init_user_ns, cred->cr_cred->fsuid));
 retry:
 	err = 0;
 	/* if gssd is down, just skip upcalling altogether */
 	if (!gssd_running(net)) {
 		warn_gssd();
-		return -EACCES;
+		err = -EACCES;
+		goto out;
 	}
 	gss_msg = gss_setup_upcall(gss_auth, cred);
 	if (PTR_ERR(gss_msg) == -EAGAIN) {
@@ -700,8 +670,8 @@ out_intr:
 	finish_wait(&gss_msg->waitqueue, &wait);
 	gss_release_msg(gss_msg);
 out:
-	dprintk("RPC:       %s for uid %u result %d\n",
-		__func__, from_kuid(&init_user_ns, cred->cr_cred->fsuid), err);
+	trace_rpcgss_upcall_result(from_kuid(&init_user_ns,
+					     cred->cr_cred->fsuid), err);
 	return err;
 }
 
@@ -794,7 +764,6 @@ err_put_ctx:
 err:
 	kfree(buf);
 out:
-	dprintk("RPC:       %s returning %zd\n", __func__, err);
 	return err;
 }
 
@@ -863,8 +832,6 @@ gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
 	struct gss_upcall_msg *gss_msg = container_of(msg, struct gss_upcall_msg, msg);
 
 	if (msg->errno < 0) {
-		dprintk("RPC:       %s releasing msg %p\n",
-			__func__, gss_msg);
 		refcount_inc(&gss_msg->count);
 		gss_unhash_msg(gss_msg);
 		if (msg->errno == -ETIMEDOUT)
@@ -1024,8 +991,6 @@ gss_create_new(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 	struct rpc_auth * auth;
 	int err = -ENOMEM; /* XXX? */
 
-	dprintk("RPC:       creating GSS authenticator for client %p\n", clnt);
-
 	if (!try_module_get(THIS_MODULE))
 		return ERR_PTR(err);
 	if (!(gss_auth = kmalloc(sizeof(*gss_auth), GFP_KERNEL)))
@@ -1041,10 +1006,8 @@ gss_create_new(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 	gss_auth->net = get_net(rpc_net_ns(clnt));
 	err = -EINVAL;
 	gss_auth->mech = gss_mech_get_by_pseudoflavor(flavor);
-	if (!gss_auth->mech) {
-		dprintk("RPC:       Pseudoflavor %d not found!\n", flavor);
+	if (!gss_auth->mech)
 		goto err_put_net;
-	}
 	gss_auth->service = gss_pseudoflavor_to_service(gss_auth->mech, flavor);
 	if (gss_auth->service == 0)
 		goto err_put_mech;
@@ -1053,6 +1016,8 @@ gss_create_new(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 	auth = &gss_auth->rpc_auth;
 	auth->au_cslack = GSS_CRED_SLACK >> 2;
 	auth->au_rslack = GSS_VERF_SLACK >> 2;
+	auth->au_verfsize = GSS_VERF_SLACK >> 2;
+	auth->au_ralign = GSS_VERF_SLACK >> 2;
 	auth->au_flags = 0;
 	auth->au_ops = &authgss_ops;
 	auth->au_flavor = flavor;
@@ -1099,6 +1064,7 @@ err_free:
 	kfree(gss_auth);
 out_dec:
 	module_put(THIS_MODULE);
+	trace_rpcgss_createauth(flavor, err);
 	return ERR_PTR(err);
 }
 
@@ -1135,9 +1101,6 @@ gss_destroy(struct rpc_auth *auth)
 	struct gss_auth *gss_auth = container_of(auth,
 			struct gss_auth, rpc_auth);
 
-	dprintk("RPC:       destroying GSS authenticator %p flavor %d\n",
-			auth, auth->au_flavor);
-
 	if (hash_hashed(&gss_auth->hash)) {
 		spin_lock(&gss_auth_hash_lock);
 		hash_del(&gss_auth->hash);
@@ -1245,7 +1208,7 @@ gss_dup_cred(struct gss_auth *gss_auth, struct gss_cred *gss_cred)
 	struct gss_cred *new;
 
 	/* Make a copy of the cred so that we can reference count it */
-	new = kzalloc(sizeof(*gss_cred), GFP_NOIO);
+	new = kzalloc(sizeof(*gss_cred), GFP_NOFS);
 	if (new) {
 		struct auth_cred acred = {
 			.cred = gss_cred->gc_base.cr_cred,
@@ -1300,8 +1263,6 @@ gss_send_destroy_context(struct rpc_cred *cred)
 static void
 gss_do_free_ctx(struct gss_cl_ctx *ctx)
 {
-	dprintk("RPC:       %s\n", __func__);
-
 	gss_delete_sec_context(&ctx->gc_gss_ctx);
 	kfree(ctx->gc_wire_ctx.data);
 	kfree(ctx->gc_acceptor.data);
@@ -1324,7 +1285,6 @@ gss_free_ctx(struct gss_cl_ctx *ctx)
 static void
 gss_free_cred(struct gss_cred *gss_cred)
 {
-	dprintk("RPC:       %s cred=%p\n", __func__, gss_cred);
 	kfree(gss_cred);
 }
 
@@ -1381,10 +1341,6 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t
 	struct gss_cred	*cred = NULL;
 	int err = -ENOMEM;
 
-	dprintk("RPC:       %s for uid %d, flavor %d\n",
-		__func__, from_kuid(&init_user_ns, acred->cred->fsuid),
-		auth->au_flavor);
-
 	if (!(cred = kzalloc(sizeof(*cred), gfp)))
 		goto out_err;
 
@@ -1400,7 +1356,6 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t
 	return &cred->gc_base;
 
 out_err:
-	dprintk("RPC:       %s failed with error %d\n", __func__, err);
 	return ERR_PTR(err);
 }
 
@@ -1526,69 +1481,84 @@ out:
 }
 
 /*
-* Marshal credentials.
-* Maybe we should keep a cached credential for performance reasons.
-*/
-static __be32 *
-gss_marshal(struct rpc_task *task, __be32 *p)
+ * Marshal credentials.
+ *
+ * The expensive part is computing the verifier. We can't cache a
+ * pre-computed version of the verifier because the seqno, which
+ * is different every time, is included in the MIC.
+ */
+static int gss_marshal(struct rpc_task *task, struct xdr_stream *xdr)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_cred *cred = req->rq_cred;
 	struct gss_cred	*gss_cred = container_of(cred, struct gss_cred,
 						 gc_base);
 	struct gss_cl_ctx	*ctx = gss_cred_get_ctx(cred);
-	__be32		*cred_len;
+	__be32		*p, *cred_len;
 	u32             maj_stat = 0;
 	struct xdr_netobj mic;
 	struct kvec	iov;
 	struct xdr_buf	verf_buf;
+	int status;
 
-	dprintk("RPC: %5u %s\n", task->tk_pid, __func__);
+	/* Credential */
 
-	*p++ = htonl(RPC_AUTH_GSS);
+	p = xdr_reserve_space(xdr, 7 * sizeof(*p) +
+			      ctx->gc_wire_ctx.len);
+	if (!p)
+		goto marshal_failed;
+	*p++ = rpc_auth_gss;
 	cred_len = p++;
 
 	spin_lock(&ctx->gc_seq_lock);
 	req->rq_seqno = (ctx->gc_seq < MAXSEQ) ? ctx->gc_seq++ : MAXSEQ;
 	spin_unlock(&ctx->gc_seq_lock);
 	if (req->rq_seqno == MAXSEQ)
-		goto out_expired;
+		goto expired;
+	trace_rpcgss_seqno(task);
 
-	*p++ = htonl((u32) RPC_GSS_VERSION);
-	*p++ = htonl((u32) ctx->gc_proc);
-	*p++ = htonl((u32) req->rq_seqno);
-	*p++ = htonl((u32) gss_cred->gc_service);
+	*p++ = cpu_to_be32(RPC_GSS_VERSION);
+	*p++ = cpu_to_be32(ctx->gc_proc);
+	*p++ = cpu_to_be32(req->rq_seqno);
+	*p++ = cpu_to_be32(gss_cred->gc_service);
 	p = xdr_encode_netobj(p, &ctx->gc_wire_ctx);
-	*cred_len = htonl((p - (cred_len + 1)) << 2);
+	*cred_len = cpu_to_be32((p - (cred_len + 1)) << 2);
+
+	/* Verifier */
 
 	/* We compute the checksum for the verifier over the xdr-encoded bytes
 	 * starting with the xid and ending at the end of the credential: */
-	iov.iov_base = xprt_skip_transport_header(req->rq_xprt,
-					req->rq_snd_buf.head[0].iov_base);
+	iov.iov_base = req->rq_snd_buf.head[0].iov_base;
 	iov.iov_len = (u8 *)p - (u8 *)iov.iov_base;
 	xdr_buf_from_iov(&iov, &verf_buf);
 
-	/* set verifier flavor*/
-	*p++ = htonl(RPC_AUTH_GSS);
-
+	p = xdr_reserve_space(xdr, sizeof(*p));
+	if (!p)
+		goto marshal_failed;
+	*p++ = rpc_auth_gss;
 	mic.data = (u8 *)(p + 1);
 	maj_stat = gss_get_mic(ctx->gc_gss_ctx, &verf_buf, &mic);
-	if (maj_stat == GSS_S_CONTEXT_EXPIRED) {
-		goto out_expired;
-	} else if (maj_stat != 0) {
-		pr_warn("gss_marshal: gss_get_mic FAILED (%d)\n", maj_stat);
-		task->tk_status = -EIO;
-		goto out_put_ctx;
-	}
-	p = xdr_encode_opaque(p, NULL, mic.len);
+	if (maj_stat == GSS_S_CONTEXT_EXPIRED)
+		goto expired;
+	else if (maj_stat != 0)
+		goto bad_mic;
+	if (xdr_stream_encode_opaque_inline(xdr, (void **)&p, mic.len) < 0)
+		goto marshal_failed;
+	status = 0;
+out:
 	gss_put_ctx(ctx);
-	return p;
-out_expired:
+	return status;
+expired:
 	clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
-	task->tk_status = -EKEYEXPIRED;
-out_put_ctx:
-	gss_put_ctx(ctx);
-	return NULL;
+	status = -EKEYEXPIRED;
+	goto out;
+marshal_failed:
+	status = -EMSGSIZE;
+	goto out;
+bad_mic:
+	trace_rpcgss_get_mic(task, maj_stat);
+	status = -EIO;
+	goto out;
 }
 
 static int gss_renew_cred(struct rpc_task *task)
@@ -1662,116 +1632,105 @@ gss_refresh_null(struct rpc_task *task)
 	return 0;
 }
 
-static __be32 *
-gss_validate(struct rpc_task *task, __be32 *p)
+static int
+gss_validate(struct rpc_task *task, struct xdr_stream *xdr)
 {
 	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
 	struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
-	__be32		*seq = NULL;
+	__be32		*p, *seq = NULL;
 	struct kvec	iov;
 	struct xdr_buf	verf_buf;
 	struct xdr_netobj mic;
-	u32		flav,len;
-	u32		maj_stat;
-	__be32		*ret = ERR_PTR(-EIO);
+	u32		len, maj_stat;
+	int		status;
 
-	dprintk("RPC: %5u %s\n", task->tk_pid, __func__);
+	p = xdr_inline_decode(xdr, 2 * sizeof(*p));
+	if (!p)
+		goto validate_failed;
+	if (*p++ != rpc_auth_gss)
+		goto validate_failed;
+	len = be32_to_cpup(p);
+	if (len > RPC_MAX_AUTH_SIZE)
+		goto validate_failed;
+	p = xdr_inline_decode(xdr, len);
+	if (!p)
+		goto validate_failed;
 
-	flav = ntohl(*p++);
-	if ((len = ntohl(*p++)) > RPC_MAX_AUTH_SIZE)
-		goto out_bad;
-	if (flav != RPC_AUTH_GSS)
-		goto out_bad;
 	seq = kmalloc(4, GFP_NOFS);
 	if (!seq)
-		goto out_bad;
-	*seq = htonl(task->tk_rqstp->rq_seqno);
+		goto validate_failed;
+	*seq = cpu_to_be32(task->tk_rqstp->rq_seqno);
 	iov.iov_base = seq;
 	iov.iov_len = 4;
 	xdr_buf_from_iov(&iov, &verf_buf);
 	mic.data = (u8 *)p;
 	mic.len = len;
-
-	ret = ERR_PTR(-EACCES);
 	maj_stat = gss_verify_mic(ctx->gc_gss_ctx, &verf_buf, &mic);
 	if (maj_stat == GSS_S_CONTEXT_EXPIRED)
 		clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
-	if (maj_stat) {
-		dprintk("RPC: %5u %s: gss_verify_mic returned error 0x%08x\n",
-			task->tk_pid, __func__, maj_stat);
-		goto out_bad;
-	}
+	if (maj_stat)
+		goto bad_mic;
+
 	/* We leave it to unwrap to calculate au_rslack. For now we just
 	 * calculate the length of the verifier: */
 	cred->cr_auth->au_verfsize = XDR_QUADLEN(len) + 2;
+	status = 0;
+out:
 	gss_put_ctx(ctx);
-	dprintk("RPC: %5u %s: gss_verify_mic succeeded.\n",
-			task->tk_pid, __func__);
-	kfree(seq);
-	return p + XDR_QUADLEN(len);
-out_bad:
-	gss_put_ctx(ctx);
-	dprintk("RPC: %5u %s failed ret %ld.\n", task->tk_pid, __func__,
-		PTR_ERR(ret));
 	kfree(seq);
-	return ret;
-}
-
-static void gss_wrap_req_encode(kxdreproc_t encode, struct rpc_rqst *rqstp,
-				__be32 *p, void *obj)
-{
-	struct xdr_stream xdr;
+	return status;
 
-	xdr_init_encode(&xdr, &rqstp->rq_snd_buf, p);
-	encode(rqstp, &xdr, obj);
+validate_failed:
+	status = -EIO;
+	goto out;
+bad_mic:
+	trace_rpcgss_verify_mic(task, maj_stat);
+	status = -EACCES;
+	goto out;
 }
 
-static inline int
-gss_wrap_req_integ(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
-		   kxdreproc_t encode, struct rpc_rqst *rqstp,
-		   __be32 *p, void *obj)
+static int gss_wrap_req_integ(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
+			      struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct xdr_buf	*snd_buf = &rqstp->rq_snd_buf;
-	struct xdr_buf	integ_buf;
-	__be32          *integ_len = NULL;
+	struct rpc_rqst *rqstp = task->tk_rqstp;
+	struct xdr_buf integ_buf, *snd_buf = &rqstp->rq_snd_buf;
 	struct xdr_netobj mic;
-	u32		offset;
-	__be32		*q;
-	struct kvec	*iov;
-	u32             maj_stat = 0;
-	int		status = -EIO;
+	__be32 *p, *integ_len;
+	u32 offset, maj_stat;
 
+	p = xdr_reserve_space(xdr, 2 * sizeof(*p));
+	if (!p)
+		goto wrap_failed;
 	integ_len = p++;
-	offset = (u8 *)p - (u8 *)snd_buf->head[0].iov_base;
-	*p++ = htonl(rqstp->rq_seqno);
+	*p = cpu_to_be32(rqstp->rq_seqno);
 
-	gss_wrap_req_encode(encode, rqstp, p, obj);
+	if (rpcauth_wrap_req_encode(task, xdr))
+		goto wrap_failed;
 
+	offset = (u8 *)p - (u8 *)snd_buf->head[0].iov_base;
 	if (xdr_buf_subsegment(snd_buf, &integ_buf,
 				offset, snd_buf->len - offset))
-		return status;
-	*integ_len = htonl(integ_buf.len);
+		goto wrap_failed;
+	*integ_len = cpu_to_be32(integ_buf.len);
 
-	/* guess whether we're in the head or the tail: */
-	if (snd_buf->page_len || snd_buf->tail[0].iov_len)
-		iov = snd_buf->tail;
-	else
-		iov = snd_buf->head;
-	p = iov->iov_base + iov->iov_len;
+	p = xdr_reserve_space(xdr, 0);
+	if (!p)
+		goto wrap_failed;
 	mic.data = (u8 *)(p + 1);
-
 	maj_stat = gss_get_mic(ctx->gc_gss_ctx, &integ_buf, &mic);
-	status = -EIO; /* XXX? */
 	if (maj_stat == GSS_S_CONTEXT_EXPIRED)
 		clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
 	else if (maj_stat)
-		return status;
-	q = xdr_encode_opaque(p, NULL, mic.len);
-
-	offset = (u8 *)q - (u8 *)p;
-	iov->iov_len += offset;
-	snd_buf->len += offset;
+		goto bad_mic;
+	/* Check that the trailing MIC fit in the buffer, after the fact */
+	if (xdr_stream_encode_opaque_inline(xdr, (void **)&p, mic.len) < 0)
+		goto wrap_failed;
 	return 0;
+wrap_failed:
+	return -EMSGSIZE;
+bad_mic:
+	trace_rpcgss_get_mic(task, maj_stat);
+	return -EIO;
 }
 
 static void
@@ -1822,61 +1781,62 @@ out:
 	return -EAGAIN;
 }
 
-static inline int
-gss_wrap_req_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
-		  kxdreproc_t encode, struct rpc_rqst *rqstp,
-		  __be32 *p, void *obj)
+static int gss_wrap_req_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
+			     struct rpc_task *task, struct xdr_stream *xdr)
 {
+	struct rpc_rqst *rqstp = task->tk_rqstp;
 	struct xdr_buf	*snd_buf = &rqstp->rq_snd_buf;
-	u32		offset;
-	u32             maj_stat;
+	u32		pad, offset, maj_stat;
 	int		status;
-	__be32		*opaque_len;
+	__be32		*p, *opaque_len;
 	struct page	**inpages;
 	int		first;
-	int		pad;
 	struct kvec	*iov;
-	char		*tmp;
 
+	status = -EIO;
+	p = xdr_reserve_space(xdr, 2 * sizeof(*p));
+	if (!p)
+		goto wrap_failed;
 	opaque_len = p++;
-	offset = (u8 *)p - (u8 *)snd_buf->head[0].iov_base;
-	*p++ = htonl(rqstp->rq_seqno);
+	*p = cpu_to_be32(rqstp->rq_seqno);
 
-	gss_wrap_req_encode(encode, rqstp, p, obj);
+	if (rpcauth_wrap_req_encode(task, xdr))
+		goto wrap_failed;
 
 	status = alloc_enc_pages(rqstp);
-	if (status)
-		return status;
+	if (unlikely(status))
+		goto wrap_failed;
 	first = snd_buf->page_base >> PAGE_SHIFT;
 	inpages = snd_buf->pages + first;
 	snd_buf->pages = rqstp->rq_enc_pages;
 	snd_buf->page_base -= first << PAGE_SHIFT;
 	/*
-	 * Give the tail its own page, in case we need extra space in the
-	 * head when wrapping:
+	 * Move the tail into its own page, in case gss_wrap needs
+	 * more space in the head when wrapping.
 	 *
-	 * call_allocate() allocates twice the slack space required
-	 * by the authentication flavor to rq_callsize.
-	 * For GSS, slack is GSS_CRED_SLACK.
+	 * Still... Why can't gss_wrap just slide the tail down?
 	 */
 	if (snd_buf->page_len || snd_buf->tail[0].iov_len) {
+		char *tmp;
+
 		tmp = page_address(rqstp->rq_enc_pages[rqstp->rq_enc_pages_num - 1]);
 		memcpy(tmp, snd_buf->tail[0].iov_base, snd_buf->tail[0].iov_len);
 		snd_buf->tail[0].iov_base = tmp;
 	}
+	offset = (u8 *)p - (u8 *)snd_buf->head[0].iov_base;
 	maj_stat = gss_wrap(ctx->gc_gss_ctx, offset, snd_buf, inpages);
 	/* slack space should prevent this ever happening: */
-	BUG_ON(snd_buf->len > snd_buf->buflen);
-	status = -EIO;
+	if (unlikely(snd_buf->len > snd_buf->buflen))
+		goto wrap_failed;
 	/* We're assuming that when GSS_S_CONTEXT_EXPIRED, the encryption was
 	 * done anyway, so it's safe to put the request on the wire: */
 	if (maj_stat == GSS_S_CONTEXT_EXPIRED)
 		clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
 	else if (maj_stat)
-		return status;
+		goto bad_wrap;
 
-	*opaque_len = htonl(snd_buf->len - offset);
-	/* guess whether we're in the head or the tail: */
+	*opaque_len = cpu_to_be32(snd_buf->len - offset);
+	/* guess whether the pad goes into the head or the tail: */
 	if (snd_buf->page_len || snd_buf->tail[0].iov_len)
 		iov = snd_buf->tail;
 	else
@@ -1888,118 +1848,154 @@ gss_wrap_req_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
 	snd_buf->len += pad;
 
 	return 0;
+wrap_failed:
+	return status;
+bad_wrap:
+	trace_rpcgss_wrap(task, maj_stat);
+	return -EIO;
 }
 
-static int
-gss_wrap_req(struct rpc_task *task,
-	     kxdreproc_t encode, void *rqstp, __be32 *p, void *obj)
+static int gss_wrap_req(struct rpc_task *task, struct xdr_stream *xdr)
 {
 	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
 	struct gss_cred	*gss_cred = container_of(cred, struct gss_cred,
 			gc_base);
 	struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
-	int             status = -EIO;
+	int status;
 
-	dprintk("RPC: %5u %s\n", task->tk_pid, __func__);
+	status = -EIO;
 	if (ctx->gc_proc != RPC_GSS_PROC_DATA) {
 		/* The spec seems a little ambiguous here, but I think that not
 		 * wrapping context destruction requests makes the most sense.
 		 */
-		gss_wrap_req_encode(encode, rqstp, p, obj);
-		status = 0;
+		status = rpcauth_wrap_req_encode(task, xdr);
 		goto out;
 	}
 	switch (gss_cred->gc_service) {
 	case RPC_GSS_SVC_NONE:
-		gss_wrap_req_encode(encode, rqstp, p, obj);
-		status = 0;
+		status = rpcauth_wrap_req_encode(task, xdr);
 		break;
 	case RPC_GSS_SVC_INTEGRITY:
-		status = gss_wrap_req_integ(cred, ctx, encode, rqstp, p, obj);
+		status = gss_wrap_req_integ(cred, ctx, task, xdr);
 		break;
 	case RPC_GSS_SVC_PRIVACY:
-		status = gss_wrap_req_priv(cred, ctx, encode, rqstp, p, obj);
+		status = gss_wrap_req_priv(cred, ctx, task, xdr);
 		break;
+	default:
+		status = -EIO;
 	}
 out:
 	gss_put_ctx(ctx);
-	dprintk("RPC: %5u %s returning %d\n", task->tk_pid, __func__, status);
 	return status;
 }
 
-static inline int
-gss_unwrap_resp_integ(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
-		struct rpc_rqst *rqstp, __be32 **p)
+static int
+gss_unwrap_resp_auth(struct rpc_cred *cred)
 {
-	struct xdr_buf	*rcv_buf = &rqstp->rq_rcv_buf;
-	struct xdr_buf integ_buf;
+	struct rpc_auth *auth = cred->cr_auth;
+
+	auth->au_rslack = auth->au_verfsize;
+	auth->au_ralign = auth->au_verfsize;
+	return 0;
+}
+
+static int
+gss_unwrap_resp_integ(struct rpc_task *task, struct rpc_cred *cred,
+		      struct gss_cl_ctx *ctx, struct rpc_rqst *rqstp,
+		      struct xdr_stream *xdr)
+{
+	struct xdr_buf integ_buf, *rcv_buf = &rqstp->rq_rcv_buf;
+	u32 data_offset, mic_offset, integ_len, maj_stat;
+	struct rpc_auth *auth = cred->cr_auth;
 	struct xdr_netobj mic;
-	u32 data_offset, mic_offset;
-	u32 integ_len;
-	u32 maj_stat;
-	int status = -EIO;
+	__be32 *p;
 
-	integ_len = ntohl(*(*p)++);
+	p = xdr_inline_decode(xdr, 2 * sizeof(*p));
+	if (unlikely(!p))
+		goto unwrap_failed;
+	integ_len = be32_to_cpup(p++);
 	if (integ_len & 3)
-		return status;
-	data_offset = (u8 *)(*p) - (u8 *)rcv_buf->head[0].iov_base;
+		goto unwrap_failed;
+	data_offset = (u8 *)(p) - (u8 *)rcv_buf->head[0].iov_base;
 	mic_offset = integ_len + data_offset;
 	if (mic_offset > rcv_buf->len)
-		return status;
-	if (ntohl(*(*p)++) != rqstp->rq_seqno)
-		return status;
-
-	if (xdr_buf_subsegment(rcv_buf, &integ_buf, data_offset,
-				mic_offset - data_offset))
-		return status;
+		goto unwrap_failed;
+	if (be32_to_cpup(p) != rqstp->rq_seqno)
+		goto bad_seqno;
 
+	if (xdr_buf_subsegment(rcv_buf, &integ_buf, data_offset, integ_len))
+		goto unwrap_failed;
 	if (xdr_buf_read_netobj(rcv_buf, &mic, mic_offset))
-		return status;
-
+		goto unwrap_failed;
 	maj_stat = gss_verify_mic(ctx->gc_gss_ctx, &integ_buf, &mic);
 	if (maj_stat == GSS_S_CONTEXT_EXPIRED)
 		clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
 	if (maj_stat != GSS_S_COMPLETE)
-		return status;
+		goto bad_mic;
+
+	auth->au_rslack = auth->au_verfsize + 2 + 1 + XDR_QUADLEN(mic.len);
+	auth->au_ralign = auth->au_verfsize + 2;
 	return 0;
+unwrap_failed:
+	trace_rpcgss_unwrap_failed(task);
+	return -EIO;
+bad_seqno:
+	trace_rpcgss_bad_seqno(task, rqstp->rq_seqno, be32_to_cpup(p));
+	return -EIO;
+bad_mic:
+	trace_rpcgss_verify_mic(task, maj_stat);
+	return -EIO;
 }
 
-static inline int
-gss_unwrap_resp_priv(struct rpc_cred *cred, struct gss_cl_ctx *ctx,
-		struct rpc_rqst *rqstp, __be32 **p)
-{
-	struct xdr_buf  *rcv_buf = &rqstp->rq_rcv_buf;
-	u32 offset;
-	u32 opaque_len;
-	u32 maj_stat;
-	int status = -EIO;
-
-	opaque_len = ntohl(*(*p)++);
-	offset = (u8 *)(*p) - (u8 *)rcv_buf->head[0].iov_base;
+static int
+gss_unwrap_resp_priv(struct rpc_task *task, struct rpc_cred *cred,
+		     struct gss_cl_ctx *ctx, struct rpc_rqst *rqstp,
+		     struct xdr_stream *xdr)
+{
+	struct xdr_buf *rcv_buf = &rqstp->rq_rcv_buf;
+	struct kvec *head = rqstp->rq_rcv_buf.head;
+	struct rpc_auth *auth = cred->cr_auth;
+	unsigned int savedlen = rcv_buf->len;
+	u32 offset, opaque_len, maj_stat;
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, 2 * sizeof(*p));
+	if (unlikely(!p))
+		goto unwrap_failed;
+	opaque_len = be32_to_cpup(p++);
+	offset = (u8 *)(p) - (u8 *)head->iov_base;
 	if (offset + opaque_len > rcv_buf->len)
-		return status;
-	/* remove padding: */
+		goto unwrap_failed;
 	rcv_buf->len = offset + opaque_len;
 
 	maj_stat = gss_unwrap(ctx->gc_gss_ctx, offset, rcv_buf);
 	if (maj_stat == GSS_S_CONTEXT_EXPIRED)
 		clear_bit(RPCAUTH_CRED_UPTODATE, &cred->cr_flags);
 	if (maj_stat != GSS_S_COMPLETE)
-		return status;
-	if (ntohl(*(*p)++) != rqstp->rq_seqno)
-		return status;
+		goto bad_unwrap;
+	/* gss_unwrap decrypted the sequence number */
+	if (be32_to_cpup(p++) != rqstp->rq_seqno)
+		goto bad_seqno;
 
-	return 0;
-}
-
-static int
-gss_unwrap_req_decode(kxdrdproc_t decode, struct rpc_rqst *rqstp,
-		      __be32 *p, void *obj)
-{
-	struct xdr_stream xdr;
+	/* gss_unwrap redacts the opaque blob from the head iovec.
+	 * rcv_buf has changed, thus the stream needs to be reset.
+	 */
+	xdr_init_decode(xdr, rcv_buf, p, rqstp);
 
-	xdr_init_decode(&xdr, &rqstp->rq_rcv_buf, p);
-	return decode(rqstp, &xdr, obj);
+	auth->au_rslack = auth->au_verfsize + 2 +
+			  XDR_QUADLEN(savedlen - rcv_buf->len);
+	auth->au_ralign = auth->au_verfsize + 2 +
+			  XDR_QUADLEN(savedlen - rcv_buf->len);
+	return 0;
+unwrap_failed:
+	trace_rpcgss_unwrap_failed(task);
+	return -EIO;
+bad_seqno:
+	trace_rpcgss_bad_seqno(task, rqstp->rq_seqno, be32_to_cpup(--p));
+	return -EIO;
+bad_unwrap:
+	trace_rpcgss_unwrap(task, maj_stat);
+	return -EIO;
 }
 
 static bool
@@ -2014,14 +2010,14 @@ gss_xmit_need_reencode(struct rpc_task *task)
 	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_cred *cred = req->rq_cred;
 	struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
-	u32 win, seq_xmit;
+	u32 win, seq_xmit = 0;
 	bool ret = true;
 
 	if (!ctx)
-		return true;
+		goto out;
 
 	if (gss_seq_is_newer(req->rq_seqno, READ_ONCE(ctx->gc_seq)))
-		goto out;
+		goto out_ctx;
 
 	seq_xmit = READ_ONCE(ctx->gc_seq_xmit);
 	while (gss_seq_is_newer(req->rq_seqno, seq_xmit)) {
@@ -2030,56 +2026,51 @@ gss_xmit_need_reencode(struct rpc_task *task)
 		seq_xmit = cmpxchg(&ctx->gc_seq_xmit, tmp, req->rq_seqno);
 		if (seq_xmit == tmp) {
 			ret = false;
-			goto out;
+			goto out_ctx;
 		}
 	}
 
 	win = ctx->gc_win;
 	if (win > 0)
 		ret = !gss_seq_is_newer(req->rq_seqno, seq_xmit - win);
-out:
+
+out_ctx:
 	gss_put_ctx(ctx);
+out:
+	trace_rpcgss_need_reencode(task, seq_xmit, ret);
 	return ret;
 }
 
 static int
-gss_unwrap_resp(struct rpc_task *task,
-		kxdrdproc_t decode, void *rqstp, __be32 *p, void *obj)
+gss_unwrap_resp(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	struct rpc_cred *cred = task->tk_rqstp->rq_cred;
+	struct rpc_rqst *rqstp = task->tk_rqstp;
+	struct rpc_cred *cred = rqstp->rq_cred;
 	struct gss_cred *gss_cred = container_of(cred, struct gss_cred,
 			gc_base);
 	struct gss_cl_ctx *ctx = gss_cred_get_ctx(cred);
-	__be32		*savedp = p;
-	struct kvec	*head = ((struct rpc_rqst *)rqstp)->rq_rcv_buf.head;
-	int		savedlen = head->iov_len;
-	int             status = -EIO;
+	int status = -EIO;
 
 	if (ctx->gc_proc != RPC_GSS_PROC_DATA)
 		goto out_decode;
 	switch (gss_cred->gc_service) {
 	case RPC_GSS_SVC_NONE:
+		status = gss_unwrap_resp_auth(cred);
 		break;
 	case RPC_GSS_SVC_INTEGRITY:
-		status = gss_unwrap_resp_integ(cred, ctx, rqstp, &p);
-		if (status)
-			goto out;
+		status = gss_unwrap_resp_integ(task, cred, ctx, rqstp, xdr);
 		break;
 	case RPC_GSS_SVC_PRIVACY:
-		status = gss_unwrap_resp_priv(cred, ctx, rqstp, &p);
-		if (status)
-			goto out;
+		status = gss_unwrap_resp_priv(task, cred, ctx, rqstp, xdr);
 		break;
 	}
-	/* take into account extra slack for integrity and privacy cases: */
-	cred->cr_auth->au_rslack = cred->cr_auth->au_verfsize + (p - savedp)
-						+ (savedlen - head->iov_len);
+	if (status)
+		goto out;
+
 out_decode:
-	status = gss_unwrap_req_decode(decode, rqstp, p, obj);
+	status = rpcauth_unwrap_resp_decode(task, xdr);
 out:
 	gss_put_ctx(ctx);
-	dprintk("RPC: %5u %s returning %d\n",
-		task->tk_pid, __func__, status);
 	return status;
 }
 
diff --git a/net/sunrpc/auth_gss/gss_krb5_mech.c b/net/sunrpc/auth_gss/gss_krb5_mech.c
index eab71fc7af3e..56cc85c5bc06 100644
--- a/net/sunrpc/auth_gss/gss_krb5_mech.c
+++ b/net/sunrpc/auth_gss/gss_krb5_mech.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: BSD-3-Clause
 /*
  *  linux/net/sunrpc/gss_krb5_mech.c
  *
@@ -6,32 +7,6 @@
  *
  *  Andy Adamson <andros@umich.edu>
  *  J. Bruce Fields <bfields@umich.edu>
- *
- *  Redistribution and use in source and binary forms, with or without
- *  modification, are permitted provided that the following conditions
- *  are met:
- *
- *  1. Redistributions of source code must retain the above copyright
- *     notice, this list of conditions and the following disclaimer.
- *  2. Redistributions in binary form must reproduce the above copyright
- *     notice, this list of conditions and the following disclaimer in the
- *     documentation and/or other materials provided with the distribution.
- *  3. Neither the name of the University nor the names of its
- *     contributors may be used to endorse or promote products derived
- *     from this software without specific prior written permission.
- *
- *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
- *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- *  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- *  DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
- *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
- *  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
  */
 
 #include <crypto/hash.h>
@@ -53,6 +28,7 @@
 static struct gss_api_mech gss_kerberos_mech;	/* forward declaration */
 
 static const struct gss_krb5_enctype supported_gss_krb5_enctypes[] = {
+#ifndef CONFIG_SUNRPC_DISABLE_INSECURE_ENCTYPES
 	/*
 	 * DES (All DES enctypes are mapped to the same gss functionality)
 	 */
@@ -74,6 +50,7 @@ static const struct gss_krb5_enctype supported_gss_krb5_enctypes[] = {
 	  .cksumlength = 8,
 	  .keyed_cksum = 0,
 	},
+#endif	/* CONFIG_SUNRPC_DISABLE_INSECURE_ENCTYPES */
 	/*
 	 * RC4-HMAC
 	 */
diff --git a/net/sunrpc/auth_gss/gss_krb5_wrap.c b/net/sunrpc/auth_gss/gss_krb5_wrap.c
index 5cdde6cb703a..14a0aff0cd84 100644
--- a/net/sunrpc/auth_gss/gss_krb5_wrap.c
+++ b/net/sunrpc/auth_gss/gss_krb5_wrap.c
@@ -570,14 +570,16 @@ gss_unwrap_kerberos_v2(struct krb5_ctx *kctx, int offset, struct xdr_buf *buf)
 	 */
 	movelen = min_t(unsigned int, buf->head[0].iov_len, buf->len);
 	movelen -= offset + GSS_KRB5_TOK_HDR_LEN + headskip;
-	BUG_ON(offset + GSS_KRB5_TOK_HDR_LEN + headskip + movelen >
-							buf->head[0].iov_len);
+	if (offset + GSS_KRB5_TOK_HDR_LEN + headskip + movelen >
+	    buf->head[0].iov_len)
+		return GSS_S_FAILURE;
 	memmove(ptr, ptr + GSS_KRB5_TOK_HDR_LEN + headskip, movelen);
 	buf->head[0].iov_len -= GSS_KRB5_TOK_HDR_LEN + headskip;
 	buf->len -= GSS_KRB5_TOK_HDR_LEN + headskip;
 
 	/* Trim off the trailing "extra count" and checksum blob */
-	xdr_buf_trim(buf, ec + GSS_KRB5_TOK_HDR_LEN + tailskip);
+	buf->len -= ec + GSS_KRB5_TOK_HDR_LEN + tailskip;
+
 	return GSS_S_COMPLETE;
 }
 
diff --git a/net/sunrpc/auth_gss/gss_mech_switch.c b/net/sunrpc/auth_gss/gss_mech_switch.c
index 379318dff534..82060099a429 100644
--- a/net/sunrpc/auth_gss/gss_mech_switch.c
+++ b/net/sunrpc/auth_gss/gss_mech_switch.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: BSD-3-Clause
 /*
  *  linux/net/sunrpc/gss_mech_switch.c
  *
@@ -5,32 +6,6 @@
  *  All rights reserved.
  *
  *  J. Bruce Fields   <bfields@umich.edu>
- *
- *  Redistribution and use in source and binary forms, with or without
- *  modification, are permitted provided that the following conditions
- *  are met:
- *
- *  1. Redistributions of source code must retain the above copyright
- *     notice, this list of conditions and the following disclaimer.
- *  2. Redistributions in binary form must reproduce the above copyright
- *     notice, this list of conditions and the following disclaimer in the
- *     documentation and/or other materials provided with the distribution.
- *  3. Neither the name of the University nor the names of its
- *     contributors may be used to endorse or promote products derived
- *     from this software without specific prior written permission.
- *
- *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED
- *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- *  MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- *  DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
- *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- *  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- *  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
- *  BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
  */
 
 #include <linux/types.h>
diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.c b/net/sunrpc/auth_gss/gss_rpc_upcall.c
index 73dcda060335..0349f455a862 100644
--- a/net/sunrpc/auth_gss/gss_rpc_upcall.c
+++ b/net/sunrpc/auth_gss/gss_rpc_upcall.c
@@ -1,21 +1,8 @@
+// SPDX-License-Identifier: GPL-2.0+
 /*
  *  linux/net/sunrpc/gss_rpc_upcall.c
  *
  *  Copyright (C) 2012 Simo Sorce <simo@redhat.com>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 #include <linux/types.h>
diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.h b/net/sunrpc/auth_gss/gss_rpc_upcall.h
index 1e542aded90a..31e96344167e 100644
--- a/net/sunrpc/auth_gss/gss_rpc_upcall.h
+++ b/net/sunrpc/auth_gss/gss_rpc_upcall.h
@@ -1,21 +1,8 @@
+/* SPDX-License-Identifier: GPL-2.0+ */
 /*
  *  linux/net/sunrpc/gss_rpc_upcall.h
  *
  *  Copyright (C) 2012 Simo Sorce <simo@redhat.com>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 #ifndef _GSS_RPC_UPCALL_H
@@ -45,4 +32,5 @@ void gssp_free_upcall_data(struct gssp_upcall_data *data);
 void init_gssp_clnt(struct sunrpc_net *);
 int set_gssp_clnt(struct net *);
 void clear_gssp_clnt(struct sunrpc_net *);
+
 #endif /* _GSS_RPC_UPCALL_H */
diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.c b/net/sunrpc/auth_gss/gss_rpc_xdr.c
index 006062ad5f58..2ff7b7083eba 100644
--- a/net/sunrpc/auth_gss/gss_rpc_xdr.c
+++ b/net/sunrpc/auth_gss/gss_rpc_xdr.c
@@ -1,21 +1,8 @@
+// SPDX-License-Identifier: GPL-2.0+
 /*
  * GSS Proxy upcall module
  *
  *  Copyright (C) 2012 Simo Sorce <simo@redhat.com>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 #include <linux/sunrpc/svcauth.h>
diff --git a/net/sunrpc/auth_gss/gss_rpc_xdr.h b/net/sunrpc/auth_gss/gss_rpc_xdr.h
index 146c31032917..3f17411b7e65 100644
--- a/net/sunrpc/auth_gss/gss_rpc_xdr.h
+++ b/net/sunrpc/auth_gss/gss_rpc_xdr.h
@@ -1,21 +1,8 @@
+/* SPDX-License-Identifier: GPL-2.0+ */
 /*
  * GSS Proxy upcall module
  *
  *  Copyright (C) 2012 Simo Sorce <simo@redhat.com>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
 #ifndef _LINUX_GSS_RPC_XDR_H
@@ -262,6 +249,4 @@ int gssx_dec_accept_sec_context(struct rpc_rqst *rqstp,
 #define GSSX_ARG_wrap_size_limit_sz 0
 #define GSSX_RES_wrap_size_limit_sz 0
 
-
-
 #endif /* _LINUX_GSS_RPC_XDR_H */
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 152790ed309c..0c5d7896d6dd 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-2.0
 /*
  * Neil Brown <neilb@cse.unsw.edu.au>
  * J. Bruce Fields <bfields@umich.edu>
@@ -896,7 +897,7 @@ unwrap_integ_data(struct svc_rqst *rqstp, struct xdr_buf *buf, u32 seq, struct g
 	if (svc_getnl(&buf->head[0]) != seq)
 		goto out;
 	/* trim off the mic and padding at the end before returning */
-	xdr_buf_trim(buf, round_up_to_quad(mic.len) + 4);
+	buf->len -= 4 + round_up_to_quad(mic.len);
 	stat = 0;
 out:
 	kfree(mic.data);
diff --git a/net/sunrpc/auth_gss/trace.c b/net/sunrpc/auth_gss/trace.c
new file mode 100644
index 000000000000..5576f1e66de9
--- /dev/null
+++ b/net/sunrpc/auth_gss/trace.c
@@ -0,0 +1,11 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2018, 2019 Oracle. All rights reserved.
+ */
+
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/gss_err.h>
+
+#define CREATE_TRACE_POINTS
+#include <trace/events/rpcgss.h>
diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c
index d0ceac57c06e..41a633a4049e 100644
--- a/net/sunrpc/auth_null.c
+++ b/net/sunrpc/auth_null.c
@@ -59,15 +59,21 @@ nul_match(struct auth_cred *acred, struct rpc_cred *cred, int taskflags)
 /*
  * Marshal credential.
  */
-static __be32 *
-nul_marshal(struct rpc_task *task, __be32 *p)
+static int
+nul_marshal(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	*p++ = htonl(RPC_AUTH_NULL);
-	*p++ = 0;
-	*p++ = htonl(RPC_AUTH_NULL);
-	*p++ = 0;
-
-	return p;
+	__be32 *p;
+
+	p = xdr_reserve_space(xdr, 4 * sizeof(*p));
+	if (!p)
+		return -EMSGSIZE;
+	/* Credential */
+	*p++ = rpc_auth_null;
+	*p++ = xdr_zero;
+	/* Verifier */
+	*p++ = rpc_auth_null;
+	*p   = xdr_zero;
+	return 0;
 }
 
 /*
@@ -80,25 +86,19 @@ nul_refresh(struct rpc_task *task)
 	return 0;
 }
 
-static __be32 *
-nul_validate(struct rpc_task *task, __be32 *p)
+static int
+nul_validate(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	rpc_authflavor_t	flavor;
-	u32			size;
-
-	flavor = ntohl(*p++);
-	if (flavor != RPC_AUTH_NULL) {
-		printk("RPC: bad verf flavor: %u\n", flavor);
-		return ERR_PTR(-EIO);
-	}
-
-	size = ntohl(*p++);
-	if (size != 0) {
-		printk("RPC: bad verf size: %u\n", size);
-		return ERR_PTR(-EIO);
-	}
-
-	return p;
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, 2 * sizeof(*p));
+	if (!p)
+		return -EIO;
+	if (*p++ != rpc_auth_null)
+		return -EIO;
+	if (*p != xdr_zero)
+		return -EIO;
+	return 0;
 }
 
 const struct rpc_authops authnull_ops = {
@@ -114,6 +114,8 @@ static
 struct rpc_auth null_auth = {
 	.au_cslack	= NUL_CALLSLACK,
 	.au_rslack	= NUL_REPLYSLACK,
+	.au_verfsize	= NUL_REPLYSLACK,
+	.au_ralign	= NUL_REPLYSLACK,
 	.au_ops		= &authnull_ops,
 	.au_flavor	= RPC_AUTH_NULL,
 	.au_count	= REFCOUNT_INIT(1),
@@ -125,8 +127,10 @@ const struct rpc_credops null_credops = {
 	.crdestroy	= nul_destroy_cred,
 	.crmatch	= nul_match,
 	.crmarshal	= nul_marshal,
+	.crwrap_req	= rpcauth_wrap_req_encode,
 	.crrefresh	= nul_refresh,
 	.crvalidate	= nul_validate,
+	.crunwrap_resp	= rpcauth_unwrap_resp_decode,
 };
 
 static
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 387f6b3ffbea..d4018e5a24c5 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -28,8 +28,6 @@ static mempool_t		*unix_pool;
 static struct rpc_auth *
 unx_create(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 {
-	dprintk("RPC:       creating UNIX authenticator for client %p\n",
-			clnt);
 	refcount_inc(&unix_auth.au_count);
 	return &unix_auth;
 }
@@ -37,7 +35,6 @@ unx_create(const struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
 static void
 unx_destroy(struct rpc_auth *auth)
 {
-	dprintk("RPC:       destroying UNIX authenticator %p\n", auth);
 }
 
 /*
@@ -48,10 +45,6 @@ unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 {
 	struct rpc_cred *ret = mempool_alloc(unix_pool, GFP_NOFS);
 
-	dprintk("RPC:       allocating UNIX cred for uid %d gid %d\n",
-			from_kuid(&init_user_ns, acred->cred->fsuid),
-			from_kgid(&init_user_ns, acred->cred->fsgid));
-
 	rpcauth_init_cred(ret, acred, auth, &unix_credops);
 	ret->cr_flags = 1UL << RPCAUTH_CRED_UPTODATE;
 	return ret;
@@ -61,7 +54,7 @@ static void
 unx_free_cred_callback(struct rcu_head *head)
 {
 	struct rpc_cred *rpc_cred = container_of(head, struct rpc_cred, cr_rcu);
-	dprintk("RPC:       unx_free_cred %p\n", rpc_cred);
+
 	put_cred(rpc_cred->cr_cred);
 	mempool_free(rpc_cred, unix_pool);
 }
@@ -87,7 +80,7 @@ unx_match(struct auth_cred *acred, struct rpc_cred *cred, int flags)
 	if (!uid_eq(cred->cr_cred->fsuid, acred->cred->fsuid) || !gid_eq(cred->cr_cred->fsgid, acred->cred->fsgid))
 		return 0;
 
-	if (acred->cred && acred->cred->group_info != NULL)
+	if (acred->cred->group_info != NULL)
 		groups = acred->cred->group_info->ngroups;
 	if (groups > UNX_NGROUPS)
 		groups = UNX_NGROUPS;
@@ -106,37 +99,55 @@ unx_match(struct auth_cred *acred, struct rpc_cred *cred, int flags)
  * Marshal credentials.
  * Maybe we should keep a cached credential for performance reasons.
  */
-static __be32 *
-unx_marshal(struct rpc_task *task, __be32 *p)
+static int
+unx_marshal(struct rpc_task *task, struct xdr_stream *xdr)
 {
 	struct rpc_clnt	*clnt = task->tk_client;
 	struct rpc_cred	*cred = task->tk_rqstp->rq_cred;
-	__be32		*base, *hold;
+	__be32		*p, *cred_len, *gidarr_len;
 	int		i;
 	struct group_info *gi = cred->cr_cred->group_info;
 
-	*p++ = htonl(RPC_AUTH_UNIX);
-	base = p++;
-	*p++ = htonl(jiffies/HZ);
-
-	/*
-	 * Copy the UTS nodename captured when the client was created.
-	 */
-	p = xdr_encode_array(p, clnt->cl_nodename, clnt->cl_nodelen);
-
-	*p++ = htonl((u32) from_kuid(&init_user_ns, cred->cr_cred->fsuid));
-	*p++ = htonl((u32) from_kgid(&init_user_ns, cred->cr_cred->fsgid));
-	hold = p++;
+	/* Credential */
+
+	p = xdr_reserve_space(xdr, 3 * sizeof(*p));
+	if (!p)
+		goto marshal_failed;
+	*p++ = rpc_auth_unix;
+	cred_len = p++;
+	*p++ = xdr_zero;	/* stamp */
+	if (xdr_stream_encode_opaque(xdr, clnt->cl_nodename,
+				     clnt->cl_nodelen) < 0)
+		goto marshal_failed;
+	p = xdr_reserve_space(xdr, 3 * sizeof(*p));
+	if (!p)
+		goto marshal_failed;
+	*p++ = cpu_to_be32(from_kuid(&init_user_ns, cred->cr_cred->fsuid));
+	*p++ = cpu_to_be32(from_kgid(&init_user_ns, cred->cr_cred->fsgid));
+
+	gidarr_len = p++;
 	if (gi)
 		for (i = 0; i < UNX_NGROUPS && i < gi->ngroups; i++)
-			*p++ = htonl((u32) from_kgid(&init_user_ns, gi->gid[i]));
-	*hold = htonl(p - hold - 1);		/* gid array length */
-	*base = htonl((p - base - 1) << 2);	/* cred length */
+			*p++ = cpu_to_be32(from_kgid(&init_user_ns,
+						     gi->gid[i]));
+	*gidarr_len = cpu_to_be32(p - gidarr_len - 1);
+	*cred_len = cpu_to_be32((p - cred_len - 1) << 2);
+	p = xdr_reserve_space(xdr, (p - gidarr_len - 1) << 2);
+	if (!p)
+		goto marshal_failed;
+
+	/* Verifier */
+
+	p = xdr_reserve_space(xdr, 2 * sizeof(*p));
+	if (!p)
+		goto marshal_failed;
+	*p++ = rpc_auth_null;
+	*p   = xdr_zero;
 
-	*p++ = htonl(RPC_AUTH_NULL);
-	*p++ = htonl(0);
+	return 0;
 
-	return p;
+marshal_failed:
+	return -EMSGSIZE;
 }
 
 /*
@@ -149,29 +160,35 @@ unx_refresh(struct rpc_task *task)
 	return 0;
 }
 
-static __be32 *
-unx_validate(struct rpc_task *task, __be32 *p)
+static int
+unx_validate(struct rpc_task *task, struct xdr_stream *xdr)
 {
-	rpc_authflavor_t	flavor;
-	u32			size;
-
-	flavor = ntohl(*p++);
-	if (flavor != RPC_AUTH_NULL &&
-	    flavor != RPC_AUTH_UNIX &&
-	    flavor != RPC_AUTH_SHORT) {
-		printk("RPC: bad verf flavor: %u\n", flavor);
-		return ERR_PTR(-EIO);
-	}
-
-	size = ntohl(*p++);
-	if (size > RPC_MAX_AUTH_SIZE) {
-		printk("RPC: giant verf size: %u\n", size);
-		return ERR_PTR(-EIO);
+	struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
+	__be32 *p;
+	u32 size;
+
+	p = xdr_inline_decode(xdr, 2 * sizeof(*p));
+	if (!p)
+		return -EIO;
+	switch (*p++) {
+	case rpc_auth_null:
+	case rpc_auth_unix:
+	case rpc_auth_short:
+		break;
+	default:
+		return -EIO;
 	}
-	task->tk_rqstp->rq_cred->cr_auth->au_rslack = (size >> 2) + 2;
-	p += (size >> 2);
-
-	return p;
+	size = be32_to_cpup(p);
+	if (size > RPC_MAX_AUTH_SIZE)
+		return -EIO;
+	p = xdr_inline_decode(xdr, size);
+	if (!p)
+		return -EIO;
+
+	auth->au_verfsize = XDR_QUADLEN(size) + 2;
+	auth->au_rslack = XDR_QUADLEN(size) + 2;
+	auth->au_ralign = XDR_QUADLEN(size) + 2;
+	return 0;
 }
 
 int __init rpc_init_authunix(void)
@@ -198,6 +215,7 @@ static
 struct rpc_auth		unix_auth = {
 	.au_cslack	= UNX_CALLSLACK,
 	.au_rslack	= NUL_REPLYSLACK,
+	.au_verfsize	= NUL_REPLYSLACK,
 	.au_ops		= &authunix_ops,
 	.au_flavor	= RPC_AUTH_UNIX,
 	.au_count	= REFCOUNT_INIT(1),
@@ -209,6 +227,8 @@ const struct rpc_credops unix_credops = {
 	.crdestroy	= unx_destroy_cred,
 	.crmatch	= unx_match,
 	.crmarshal	= unx_marshal,
+	.crwrap_req	= rpcauth_wrap_req_encode,
 	.crrefresh	= unx_refresh,
 	.crvalidate	= unx_validate,
+	.crunwrap_resp	= rpcauth_unwrap_resp_decode,
 };
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index ec451b8114b0..c47d82622fd1 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -235,7 +235,8 @@ out:
 		list_empty(&xprt->bc_pa_list) ? "true" : "false");
 }
 
-static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
+static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
+		struct rpc_rqst *new)
 {
 	struct rpc_rqst *req = NULL;
 
@@ -243,22 +244,20 @@ static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
 	if (atomic_read(&xprt->bc_free_slots) <= 0)
 		goto not_found;
 	if (list_empty(&xprt->bc_pa_list)) {
-		req = xprt_alloc_bc_req(xprt, GFP_ATOMIC);
-		if (!req)
+		if (!new)
 			goto not_found;
-		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
+		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
 		xprt->bc_alloc_count++;
 	}
 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
 				rq_bc_pa_list);
 	req->rq_reply_bytes_recvd = 0;
-	req->rq_bytes_sent = 0;
 	memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
 			sizeof(req->rq_private_buf));
 	req->rq_xid = xid;
 	req->rq_connect_cookie = xprt->connect_cookie;
-not_found:
 	dprintk("RPC:       backchannel req=%p\n", req);
+not_found:
 	return req;
 }
 
@@ -321,18 +320,27 @@ void xprt_free_bc_rqst(struct rpc_rqst *req)
  */
 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
 {
-	struct rpc_rqst *req;
-
-	spin_lock(&xprt->bc_pa_lock);
-	list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
-		if (req->rq_connect_cookie != xprt->connect_cookie)
-			continue;
-		if (req->rq_xid == xid)
-			goto found;
-	}
-	req = xprt_alloc_bc_request(xprt, xid);
+	struct rpc_rqst *req, *new = NULL;
+
+	do {
+		spin_lock(&xprt->bc_pa_lock);
+		list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
+			if (req->rq_connect_cookie != xprt->connect_cookie)
+				continue;
+			if (req->rq_xid == xid)
+				goto found;
+		}
+		req = xprt_get_bc_request(xprt, xid, new);
 found:
-	spin_unlock(&xprt->bc_pa_lock);
+		spin_unlock(&xprt->bc_pa_lock);
+		if (new) {
+			if (req != new)
+				xprt_free_bc_rqst(new);
+			break;
+		} else if (req)
+			break;
+		new = xprt_alloc_bc_req(xprt, GFP_KERNEL);
+	} while (new);
 	return req;
 }
 
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index d7ec6132c046..4216fe33204a 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -66,20 +66,19 @@ static void	call_decode(struct rpc_task *task);
 static void	call_bind(struct rpc_task *task);
 static void	call_bind_status(struct rpc_task *task);
 static void	call_transmit(struct rpc_task *task);
-#if defined(CONFIG_SUNRPC_BACKCHANNEL)
-static void	call_bc_transmit(struct rpc_task *task);
-#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 static void	call_status(struct rpc_task *task);
 static void	call_transmit_status(struct rpc_task *task);
 static void	call_refresh(struct rpc_task *task);
 static void	call_refreshresult(struct rpc_task *task);
-static void	call_timeout(struct rpc_task *task);
 static void	call_connect(struct rpc_task *task);
 static void	call_connect_status(struct rpc_task *task);
 
-static __be32	*rpc_encode_header(struct rpc_task *task);
-static __be32	*rpc_verify_header(struct rpc_task *task);
+static int	rpc_encode_header(struct rpc_task *task,
+				  struct xdr_stream *xdr);
+static int	rpc_decode_header(struct rpc_task *task,
+				  struct xdr_stream *xdr);
 static int	rpc_ping(struct rpc_clnt *clnt);
+static void	rpc_check_timeout(struct rpc_task *task);
 
 static void rpc_register_client(struct rpc_clnt *clnt)
 {
@@ -834,9 +833,6 @@ void rpc_killall_tasks(struct rpc_clnt *clnt)
 		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
 			rovr->tk_flags |= RPC_TASK_KILLED;
 			rpc_exit(rovr, -EIO);
-			if (RPC_IS_QUEUED(rovr))
-				rpc_wake_up_queued_task(rovr->tk_waitqueue,
-							rovr);
 		}
 	}
 	spin_unlock(&clnt->cl_lock);
@@ -1131,6 +1127,8 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 EXPORT_SYMBOL_GPL(rpc_call_async);
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static void call_bc_encode(struct rpc_task *task);
+
 /**
  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
  * rpc_execute against it
@@ -1152,7 +1150,7 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
 	task = rpc_new_task(&task_setup_data);
 	xprt_init_bc_request(req, task);
 
-	task->tk_action = call_bc_transmit;
+	task->tk_action = call_bc_encode;
 	atomic_inc(&task->tk_count);
 	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
 	rpc_execute(task);
@@ -1162,6 +1160,29 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
+/**
+ * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
+ * @req: RPC request to prepare
+ * @pages: vector of struct page pointers
+ * @base: offset in first page where receive should start, in bytes
+ * @len: expected size of the upper layer data payload, in bytes
+ * @hdrsize: expected size of upper layer reply header, in XDR words
+ *
+ */
+void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
+			     unsigned int base, unsigned int len,
+			     unsigned int hdrsize)
+{
+	/* Subtract one to force an extra word of buffer space for the
+	 * payload's XDR pad to fall into the rcv_buf's tail iovec.
+	 */
+	hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign - 1;
+
+	xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
+	trace_rpc_reply_pages(req);
+}
+EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
+
 void
 rpc_call_start(struct rpc_task *task)
 {
@@ -1519,6 +1540,7 @@ call_start(struct rpc_task *task)
 	clnt->cl_stats->rpccnt++;
 	task->tk_action = call_reserve;
 	rpc_task_set_transport(task, clnt);
+	call_reserve(task);
 }
 
 /*
@@ -1532,6 +1554,9 @@ call_reserve(struct rpc_task *task)
 	task->tk_status  = 0;
 	task->tk_action  = call_reserveresult;
 	xprt_reserve(task);
+	if (rpc_task_need_resched(task))
+		return;
+	 call_reserveresult(task);
 }
 
 static void call_retry_reserve(struct rpc_task *task);
@@ -1554,6 +1579,7 @@ call_reserveresult(struct rpc_task *task)
 	if (status >= 0) {
 		if (task->tk_rqstp) {
 			task->tk_action = call_refresh;
+			call_refresh(task);
 			return;
 		}
 
@@ -1579,6 +1605,7 @@ call_reserveresult(struct rpc_task *task)
 		/* fall through */
 	case -EAGAIN:	/* woken up; retry */
 		task->tk_action = call_retry_reserve;
+		call_retry_reserve(task);
 		return;
 	case -EIO:	/* probably a shutdown */
 		break;
@@ -1601,6 +1628,9 @@ call_retry_reserve(struct rpc_task *task)
 	task->tk_status  = 0;
 	task->tk_action  = call_reserveresult;
 	xprt_retry_reserve(task);
+	if (rpc_task_need_resched(task))
+		return;
+	call_reserveresult(task);
 }
 
 /*
@@ -1615,6 +1645,9 @@ call_refresh(struct rpc_task *task)
 	task->tk_status = 0;
 	task->tk_client->cl_stats->rpcauthrefresh++;
 	rpcauth_refreshcred(task);
+	if (rpc_task_need_resched(task))
+		return;
+	call_refreshresult(task);
 }
 
 /*
@@ -1633,6 +1666,7 @@ call_refreshresult(struct rpc_task *task)
 	case 0:
 		if (rpcauth_uptodatecred(task)) {
 			task->tk_action = call_allocate;
+			call_allocate(task);
 			return;
 		}
 		/* Use rate-limiting and a max number of retries if refresh
@@ -1651,6 +1685,7 @@ call_refreshresult(struct rpc_task *task)
 		task->tk_cred_retry--;
 		dprintk("RPC: %5u %s: retry refresh creds\n",
 				task->tk_pid, __func__);
+		call_refresh(task);
 		return;
 	}
 	dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
@@ -1665,7 +1700,7 @@ call_refreshresult(struct rpc_task *task)
 static void
 call_allocate(struct rpc_task *task)
 {
-	unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
+	const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
 	struct rpc_rqst *req = task->tk_rqstp;
 	struct rpc_xprt *xprt = req->rq_xprt;
 	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
@@ -1676,8 +1711,10 @@ call_allocate(struct rpc_task *task)
 	task->tk_status = 0;
 	task->tk_action = call_encode;
 
-	if (req->rq_buffer)
+	if (req->rq_buffer) {
+		call_encode(task);
 		return;
+	}
 
 	if (proc->p_proc != 0) {
 		BUG_ON(proc->p_arglen == 0);
@@ -1690,15 +1727,20 @@ call_allocate(struct rpc_task *task)
 	 * and reply headers, and convert both values
 	 * to byte sizes.
 	 */
-	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
+	req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
+			   proc->p_arglen;
 	req->rq_callsize <<= 2;
-	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
+	req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + proc->p_replen;
 	req->rq_rcvsize <<= 2;
 
 	status = xprt->ops->buf_alloc(task);
 	xprt_inject_disconnect(xprt);
-	if (status == 0)
+	if (status == 0) {
+		if (rpc_task_need_resched(task))
+			return;
+		call_encode(task);
 		return;
+	}
 	if (status != -ENOMEM) {
 		rpc_exit(task, status);
 		return;
@@ -1728,10 +1770,7 @@ static void
 rpc_xdr_encode(struct rpc_task *task)
 {
 	struct rpc_rqst	*req = task->tk_rqstp;
-	kxdreproc_t	encode;
-	__be32		*p;
-
-	dprint_status(task);
+	struct xdr_stream xdr;
 
 	xdr_buf_init(&req->rq_snd_buf,
 		     req->rq_buffer,
@@ -1740,18 +1779,13 @@ rpc_xdr_encode(struct rpc_task *task)
 		     req->rq_rbuffer,
 		     req->rq_rcvsize);
 
-	p = rpc_encode_header(task);
-	if (p == NULL)
-		return;
-
-	encode = task->tk_msg.rpc_proc->p_encode;
-	if (encode == NULL)
+	req->rq_snd_buf.head[0].iov_len = 0;
+	xdr_init_encode(&xdr, &req->rq_snd_buf,
+			req->rq_snd_buf.head[0].iov_base, req);
+	if (rpc_encode_header(task, &xdr))
 		return;
 
-	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
-			task->tk_msg.rpc_argp);
-	if (task->tk_status == 0)
-		xprt_request_prepare(req);
+	task->tk_status = rpcauth_wrap_req(task, &xdr);
 }
 
 /*
@@ -1762,6 +1796,7 @@ call_encode(struct rpc_task *task)
 {
 	if (!rpc_task_need_encode(task))
 		goto out;
+	dprint_status(task);
 	/* Encode here so that rpcsec_gss can use correct sequence number. */
 	rpc_xdr_encode(task);
 	/* Did the encode result in an error condition? */
@@ -1779,6 +1814,8 @@ call_encode(struct rpc_task *task)
 			rpc_exit(task, task->tk_status);
 		}
 		return;
+	} else {
+		xprt_request_prepare(task->tk_rqstp);
 	}
 
 	/* Add task to reply queue before transmission to avoid races */
@@ -1787,6 +1824,25 @@ call_encode(struct rpc_task *task)
 	xprt_request_enqueue_transmit(task);
 out:
 	task->tk_action = call_bind;
+	call_bind(task);
+}
+
+/*
+ * Helpers to check if the task was already transmitted, and
+ * to take action when that is the case.
+ */
+static bool
+rpc_task_transmitted(struct rpc_task *task)
+{
+	return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+}
+
+static void
+rpc_task_handle_transmitted(struct rpc_task *task)
+{
+	xprt_end_transmit(task);
+	task->tk_action = call_transmit_status;
+	call_transmit_status(task);
 }
 
 /*
@@ -1797,14 +1853,25 @@ call_bind(struct rpc_task *task)
 {
 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 
-	dprint_status(task);
+	if (rpc_task_transmitted(task)) {
+		rpc_task_handle_transmitted(task);
+		return;
+	}
 
-	task->tk_action = call_connect;
-	if (!xprt_bound(xprt)) {
-		task->tk_action = call_bind_status;
-		task->tk_timeout = xprt->bind_timeout;
-		xprt->ops->rpcbind(task);
+	if (xprt_bound(xprt)) {
+		task->tk_action = call_connect;
+		call_connect(task);
+		return;
 	}
+
+	dprint_status(task);
+
+	task->tk_action = call_bind_status;
+	if (!xprt_prepare_transmit(task))
+		return;
+
+	task->tk_timeout = xprt->bind_timeout;
+	xprt->ops->rpcbind(task);
 }
 
 /*
@@ -1815,10 +1882,16 @@ call_bind_status(struct rpc_task *task)
 {
 	int status = -EIO;
 
+	if (rpc_task_transmitted(task)) {
+		rpc_task_handle_transmitted(task);
+		return;
+	}
+
 	if (task->tk_status >= 0) {
 		dprint_status(task);
 		task->tk_status = 0;
 		task->tk_action = call_connect;
+		call_connect(task);
 		return;
 	}
 
@@ -1841,6 +1914,8 @@ call_bind_status(struct rpc_task *task)
 		task->tk_rebind_retry--;
 		rpc_delay(task, 3*HZ);
 		goto retry_timeout;
+	case -EAGAIN:
+		goto retry_timeout;
 	case -ETIMEDOUT:
 		dprintk("RPC: %5u rpcbind request timed out\n",
 				task->tk_pid);
@@ -1882,7 +1957,8 @@ call_bind_status(struct rpc_task *task)
 
 retry_timeout:
 	task->tk_status = 0;
-	task->tk_action = call_timeout;
+	task->tk_action = call_bind;
+	rpc_check_timeout(task);
 }
 
 /*
@@ -1893,21 +1969,31 @@ call_connect(struct rpc_task *task)
 {
 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
 
+	if (rpc_task_transmitted(task)) {
+		rpc_task_handle_transmitted(task);
+		return;
+	}
+
+	if (xprt_connected(xprt)) {
+		task->tk_action = call_transmit;
+		call_transmit(task);
+		return;
+	}
+
 	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
 			task->tk_pid, xprt,
 			(xprt_connected(xprt) ? "is" : "is not"));
 
-	task->tk_action = call_transmit;
-	if (!xprt_connected(xprt)) {
-		task->tk_action = call_connect_status;
-		if (task->tk_status < 0)
-			return;
-		if (task->tk_flags & RPC_TASK_NOCONNECT) {
-			rpc_exit(task, -ENOTCONN);
-			return;
-		}
-		xprt_connect(task);
+	task->tk_action = call_connect_status;
+	if (task->tk_status < 0)
+		return;
+	if (task->tk_flags & RPC_TASK_NOCONNECT) {
+		rpc_exit(task, -ENOTCONN);
+		return;
 	}
+	if (!xprt_prepare_transmit(task))
+		return;
+	xprt_connect(task);
 }
 
 /*
@@ -1919,10 +2005,8 @@ call_connect_status(struct rpc_task *task)
 	struct rpc_clnt *clnt = task->tk_client;
 	int status = task->tk_status;
 
-	/* Check if the task was already transmitted */
-	if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
-		xprt_end_transmit(task);
-		task->tk_action = call_transmit_status;
+	if (rpc_task_transmitted(task)) {
+		rpc_task_handle_transmitted(task);
 		return;
 	}
 
@@ -1937,8 +2021,7 @@ call_connect_status(struct rpc_task *task)
 			break;
 		if (clnt->cl_autobind) {
 			rpc_force_rebind(clnt);
-			task->tk_action = call_bind;
-			return;
+			goto out_retry;
 		}
 		/* fall through */
 	case -ECONNRESET:
@@ -1958,16 +2041,20 @@ call_connect_status(struct rpc_task *task)
 		/* fall through */
 	case -ENOTCONN:
 	case -EAGAIN:
-		/* Check for timeouts before looping back to call_bind */
 	case -ETIMEDOUT:
-		task->tk_action = call_timeout;
-		return;
+		goto out_retry;
 	case 0:
 		clnt->cl_stats->netreconn++;
 		task->tk_action = call_transmit;
+		call_transmit(task);
 		return;
 	}
 	rpc_exit(task, status);
+	return;
+out_retry:
+	/* Check for timeouts before looping back to call_bind */
+	task->tk_action = call_bind;
+	rpc_check_timeout(task);
 }
 
 /*
@@ -1976,16 +2063,28 @@ call_connect_status(struct rpc_task *task)
 static void
 call_transmit(struct rpc_task *task)
 {
+	if (rpc_task_transmitted(task)) {
+		rpc_task_handle_transmitted(task);
+		return;
+	}
+
 	dprint_status(task);
 
+	task->tk_action = call_transmit_status;
+	if (!xprt_prepare_transmit(task))
+		return;
 	task->tk_status = 0;
 	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
-		if (!xprt_prepare_transmit(task))
+		if (!xprt_connected(task->tk_xprt)) {
+			task->tk_status = -ENOTCONN;
 			return;
+		}
 		xprt_transmit(task);
 	}
-	task->tk_action = call_transmit_status;
 	xprt_end_transmit(task);
+	if (rpc_task_need_resched(task))
+		return;
+	call_transmit_status(task);
 }
 
 /*
@@ -2000,8 +2099,12 @@ call_transmit_status(struct rpc_task *task)
 	 * Common case: success.  Force the compiler to put this
 	 * test first.
 	 */
-	if (task->tk_status == 0) {
-		xprt_request_wait_receive(task);
+	if (rpc_task_transmitted(task)) {
+		if (task->tk_status == 0)
+			xprt_request_wait_receive(task);
+		if (rpc_task_need_resched(task))
+			return;
+		call_status(task);
 		return;
 	}
 
@@ -2038,7 +2141,7 @@ call_transmit_status(struct rpc_task *task)
 				trace_xprt_ping(task->tk_xprt,
 						task->tk_status);
 			rpc_exit(task, task->tk_status);
-			break;
+			return;
 		}
 		/* fall through */
 	case -ECONNRESET:
@@ -2046,11 +2149,25 @@ call_transmit_status(struct rpc_task *task)
 	case -EADDRINUSE:
 	case -ENOTCONN:
 	case -EPIPE:
+		task->tk_action = call_bind;
+		task->tk_status = 0;
 		break;
 	}
+	rpc_check_timeout(task);
 }
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static void call_bc_transmit(struct rpc_task *task);
+static void call_bc_transmit_status(struct rpc_task *task);
+
+static void
+call_bc_encode(struct rpc_task *task)
+{
+	xprt_request_enqueue_transmit(task);
+	task->tk_action = call_bc_transmit;
+	call_bc_transmit(task);
+}
+
 /*
  * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
  * addition, disconnect on connectivity errors.
@@ -2058,26 +2175,23 @@ call_transmit_status(struct rpc_task *task)
 static void
 call_bc_transmit(struct rpc_task *task)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
-
-	if (rpc_task_need_encode(task))
-		xprt_request_enqueue_transmit(task);
-	if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
-		goto out_wakeup;
-
-	if (!xprt_prepare_transmit(task))
-		goto out_retry;
-
-	if (task->tk_status < 0) {
-		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
-			"error: %d\n", task->tk_status);
-		goto out_done;
+	task->tk_action = call_bc_transmit_status;
+	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
+		if (!xprt_prepare_transmit(task))
+			return;
+		task->tk_status = 0;
+		xprt_transmit(task);
 	}
+	xprt_end_transmit(task);
+}
 
-	xprt_transmit(task);
+static void
+call_bc_transmit_status(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
 
-	xprt_end_transmit(task);
 	dprint_status(task);
+
 	switch (task->tk_status) {
 	case 0:
 		/* Success */
@@ -2091,8 +2205,14 @@ call_bc_transmit(struct rpc_task *task)
 	case -ENOTCONN:
 	case -EPIPE:
 		break;
+	case -ENOBUFS:
+		rpc_delay(task, HZ>>2);
+		/* fall through */
+	case -EBADSLT:
 	case -EAGAIN:
-		goto out_retry;
+		task->tk_status = 0;
+		task->tk_action = call_bc_transmit;
+		return;
 	case -ETIMEDOUT:
 		/*
 		 * Problem reaching the server.  Disconnect and let the
@@ -2111,18 +2231,11 @@ call_bc_transmit(struct rpc_task *task)
 		 * We were unable to reply and will have to drop the
 		 * request.  The server should reconnect and retransmit.
 		 */
-		WARN_ON_ONCE(task->tk_status == -EAGAIN);
 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
 			"error: %d\n", task->tk_status);
 		break;
 	}
-out_wakeup:
-	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
-out_done:
 	task->tk_action = rpc_exit_task;
-	return;
-out_retry:
-	task->tk_status = 0;
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
@@ -2143,6 +2256,7 @@ call_status(struct rpc_task *task)
 	status = task->tk_status;
 	if (status >= 0) {
 		task->tk_action = call_decode;
+		call_decode(task);
 		return;
 	}
 
@@ -2154,10 +2268,8 @@ call_status(struct rpc_task *task)
 	case -EHOSTUNREACH:
 	case -ENETUNREACH:
 	case -EPERM:
-		if (RPC_IS_SOFTCONN(task)) {
-			rpc_exit(task, status);
-			break;
-		}
+		if (RPC_IS_SOFTCONN(task))
+			goto out_exit;
 		/*
 		 * Delay any retries for 3 seconds, then handle as if it
 		 * were a timeout.
@@ -2165,7 +2277,6 @@ call_status(struct rpc_task *task)
 		rpc_delay(task, 3*HZ);
 		/* fall through */
 	case -ETIMEDOUT:
-		task->tk_action = call_timeout;
 		break;
 	case -ECONNREFUSED:
 	case -ECONNRESET:
@@ -2178,34 +2289,30 @@ call_status(struct rpc_task *task)
 	case -EPIPE:
 	case -ENOTCONN:
 	case -EAGAIN:
-		task->tk_action = call_encode;
 		break;
 	case -EIO:
 		/* shutdown or soft timeout */
-		rpc_exit(task, status);
-		break;
+		goto out_exit;
 	default:
 		if (clnt->cl_chatty)
 			printk("%s: RPC call returned error %d\n",
 			       clnt->cl_program->name, -status);
-		rpc_exit(task, status);
+		goto out_exit;
 	}
+	task->tk_action = call_encode;
+	rpc_check_timeout(task);
+	return;
+out_exit:
+	rpc_exit(task, status);
 }
 
-/*
- * 6a.	Handle RPC timeout
- * 	We do not release the request slot, so we keep using the
- *	same XID for all retransmits.
- */
 static void
-call_timeout(struct rpc_task *task)
+rpc_check_timeout(struct rpc_task *task)
 {
 	struct rpc_clnt	*clnt = task->tk_client;
 
-	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
-		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
-		goto retry;
-	}
+	if (xprt_adjust_timeout(task->tk_rqstp) == 0)
+		return;
 
 	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
 	task->tk_timeouts++;
@@ -2241,10 +2348,6 @@ call_timeout(struct rpc_task *task)
 	 * event? RFC2203 requires the server to drop all such requests.
 	 */
 	rpcauth_invalcred(task);
-
-retry:
-	task->tk_action = call_encode;
-	task->tk_status = 0;
 }
 
 /*
@@ -2255,12 +2358,11 @@ call_decode(struct rpc_task *task)
 {
 	struct rpc_clnt	*clnt = task->tk_client;
 	struct rpc_rqst	*req = task->tk_rqstp;
-	kxdrdproc_t	decode = task->tk_msg.rpc_proc->p_decode;
-	__be32		*p;
+	struct xdr_stream xdr;
 
 	dprint_status(task);
 
-	if (!decode) {
+	if (!task->tk_msg.rpc_proc->p_decode) {
 		task->tk_action = rpc_exit_task;
 		return;
 	}
@@ -2285,223 +2387,195 @@ call_decode(struct rpc_task *task)
 	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
 				sizeof(req->rq_rcv_buf)) != 0);
 
-	if (req->rq_rcv_buf.len < 12) {
-		if (!RPC_IS_SOFT(task)) {
-			task->tk_action = call_encode;
-			goto out_retry;
-		}
-		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
-				clnt->cl_program->name, task->tk_status);
-		task->tk_action = call_timeout;
+	if (req->rq_rcv_buf.len < 12)
 		goto out_retry;
-	}
 
-	p = rpc_verify_header(task);
-	if (IS_ERR(p)) {
-		if (p == ERR_PTR(-EAGAIN))
-			goto out_retry;
+	xdr_init_decode(&xdr, &req->rq_rcv_buf,
+			req->rq_rcv_buf.head[0].iov_base, req);
+	switch (rpc_decode_header(task, &xdr)) {
+	case 0:
+		task->tk_action = rpc_exit_task;
+		task->tk_status = rpcauth_unwrap_resp(task, &xdr);
+		dprintk("RPC: %5u %s result %d\n",
+			task->tk_pid, __func__, task->tk_status);
 		return;
-	}
-	task->tk_action = rpc_exit_task;
-
-	task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
-					      task->tk_msg.rpc_resp);
-
-	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
-			task->tk_status);
-	return;
+	case -EAGAIN:
 out_retry:
-	task->tk_status = 0;
-	/* Note: rpc_verify_header() may have freed the RPC slot */
-	if (task->tk_rqstp == req) {
-		xdr_free_bvec(&req->rq_rcv_buf);
-		req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
-		if (task->tk_client->cl_discrtry)
-			xprt_conditional_disconnect(req->rq_xprt,
-					req->rq_connect_cookie);
+		task->tk_status = 0;
+		/* Note: rpc_decode_header() may have freed the RPC slot */
+		if (task->tk_rqstp == req) {
+			xdr_free_bvec(&req->rq_rcv_buf);
+			req->rq_reply_bytes_recvd = 0;
+			req->rq_rcv_buf.len = 0;
+			if (task->tk_client->cl_discrtry)
+				xprt_conditional_disconnect(req->rq_xprt,
+							    req->rq_connect_cookie);
+		}
+		task->tk_action = call_encode;
+		rpc_check_timeout(task);
 	}
 }
 
-static __be32 *
-rpc_encode_header(struct rpc_task *task)
+static int
+rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
 {
 	struct rpc_clnt *clnt = task->tk_client;
 	struct rpc_rqst	*req = task->tk_rqstp;
-	__be32		*p = req->rq_svec[0].iov_base;
-
-	/* FIXME: check buffer size? */
-
-	p = xprt_skip_transport_header(req->rq_xprt, p);
-	*p++ = req->rq_xid;		/* XID */
-	*p++ = htonl(RPC_CALL);		/* CALL */
-	*p++ = htonl(RPC_VERSION);	/* RPC version */
-	*p++ = htonl(clnt->cl_prog);	/* program number */
-	*p++ = htonl(clnt->cl_vers);	/* program version */
-	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
-	p = rpcauth_marshcred(task, p);
-	if (p)
-		req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
-	return p;
+	__be32 *p;
+	int error;
+
+	error = -EMSGSIZE;
+	p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
+	if (!p)
+		goto out_fail;
+	*p++ = req->rq_xid;
+	*p++ = rpc_call;
+	*p++ = cpu_to_be32(RPC_VERSION);
+	*p++ = cpu_to_be32(clnt->cl_prog);
+	*p++ = cpu_to_be32(clnt->cl_vers);
+	*p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
+
+	error = rpcauth_marshcred(task, xdr);
+	if (error < 0)
+		goto out_fail;
+	return 0;
+out_fail:
+	trace_rpc_bad_callhdr(task);
+	rpc_exit(task, error);
+	return error;
 }
 
-static __be32 *
-rpc_verify_header(struct rpc_task *task)
+static noinline int
+rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
 {
 	struct rpc_clnt *clnt = task->tk_client;
-	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
-	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
-	__be32	*p = iov->iov_base;
-	u32 n;
 	int error = -EACCES;
+	__be32 *p;
 
-	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
-		/* RFC-1014 says that the representation of XDR data must be a
-		 * multiple of four bytes
-		 * - if it isn't pointer subtraction in the NFS client may give
-		 *   undefined results
-		 */
-		dprintk("RPC: %5u %s: XDR representation not a multiple of"
-		       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
-		       task->tk_rqstp->rq_rcv_buf.len);
-		error = -EIO;
-		goto out_err;
-	}
-	if ((len -= 3) < 0)
-		goto out_overflow;
-
-	p += 1; /* skip XID */
-	if ((n = ntohl(*p++)) != RPC_REPLY) {
-		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
-			task->tk_pid, __func__, n);
-		error = -EIO;
-		goto out_garbage;
-	}
-
-	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
-		if (--len < 0)
-			goto out_overflow;
-		switch ((n = ntohl(*p++))) {
-		case RPC_AUTH_ERROR:
-			break;
-		case RPC_MISMATCH:
-			dprintk("RPC: %5u %s: RPC call version mismatch!\n",
-				task->tk_pid, __func__);
-			error = -EPROTONOSUPPORT;
-			goto out_err;
-		default:
-			dprintk("RPC: %5u %s: RPC call rejected, "
-				"unknown error: %x\n",
-				task->tk_pid, __func__, n);
-			error = -EIO;
-			goto out_err;
-		}
-		if (--len < 0)
-			goto out_overflow;
-		switch ((n = ntohl(*p++))) {
-		case RPC_AUTH_REJECTEDCRED:
-		case RPC_AUTH_REJECTEDVERF:
-		case RPCSEC_GSS_CREDPROBLEM:
-		case RPCSEC_GSS_CTXPROBLEM:
-			if (!task->tk_cred_retry)
-				break;
-			task->tk_cred_retry--;
-			dprintk("RPC: %5u %s: retry stale creds\n",
-					task->tk_pid, __func__);
-			rpcauth_invalcred(task);
-			/* Ensure we obtain a new XID! */
-			xprt_release(task);
-			task->tk_action = call_reserve;
-			goto out_retry;
-		case RPC_AUTH_BADCRED:
-		case RPC_AUTH_BADVERF:
-			/* possibly garbled cred/verf? */
-			if (!task->tk_garb_retry)
-				break;
-			task->tk_garb_retry--;
-			dprintk("RPC: %5u %s: retry garbled creds\n",
-					task->tk_pid, __func__);
-			task->tk_action = call_encode;
-			goto out_retry;
-		case RPC_AUTH_TOOWEAK:
-			printk(KERN_NOTICE "RPC: server %s requires stronger "
-			       "authentication.\n",
-			       task->tk_xprt->servername);
-			break;
-		default:
-			dprintk("RPC: %5u %s: unknown auth error: %x\n",
-					task->tk_pid, __func__, n);
-			error = -EIO;
-		}
-		dprintk("RPC: %5u %s: call rejected %d\n",
-				task->tk_pid, __func__, n);
-		goto out_err;
-	}
-	p = rpcauth_checkverf(task, p);
-	if (IS_ERR(p)) {
-		error = PTR_ERR(p);
-		dprintk("RPC: %5u %s: auth check failed with %d\n",
-				task->tk_pid, __func__, error);
-		goto out_garbage;		/* bad verifier, retry */
-	}
-	len = p - (__be32 *)iov->iov_base - 1;
-	if (len < 0)
-		goto out_overflow;
-	switch ((n = ntohl(*p++))) {
-	case RPC_SUCCESS:
-		return p;
-	case RPC_PROG_UNAVAIL:
-		dprintk("RPC: %5u %s: program %u is unsupported "
-				"by server %s\n", task->tk_pid, __func__,
-				(unsigned int)clnt->cl_prog,
-				task->tk_xprt->servername);
+	/* RFC-1014 says that the representation of XDR data must be a
+	 * multiple of four bytes
+	 * - if it isn't pointer subtraction in the NFS client may give
+	 *   undefined results
+	 */
+	if (task->tk_rqstp->rq_rcv_buf.len & 3)
+		goto out_badlen;
+
+	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
+	if (!p)
+		goto out_unparsable;
+	p++;	/* skip XID */
+	if (*p++ != rpc_reply)
+		goto out_unparsable;
+	if (*p++ != rpc_msg_accepted)
+		goto out_msg_denied;
+
+	error = rpcauth_checkverf(task, xdr);
+	if (error)
+		goto out_verifier;
+
+	p = xdr_inline_decode(xdr, sizeof(*p));
+	if (!p)
+		goto out_unparsable;
+	switch (*p) {
+	case rpc_success:
+		return 0;
+	case rpc_prog_unavail:
+		trace_rpc__prog_unavail(task);
 		error = -EPFNOSUPPORT;
 		goto out_err;
-	case RPC_PROG_MISMATCH:
-		dprintk("RPC: %5u %s: program %u, version %u unsupported "
-				"by server %s\n", task->tk_pid, __func__,
-				(unsigned int)clnt->cl_prog,
-				(unsigned int)clnt->cl_vers,
-				task->tk_xprt->servername);
+	case rpc_prog_mismatch:
+		trace_rpc__prog_mismatch(task);
 		error = -EPROTONOSUPPORT;
 		goto out_err;
-	case RPC_PROC_UNAVAIL:
-		dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
-				"version %u on server %s\n",
-				task->tk_pid, __func__,
-				rpc_proc_name(task),
-				clnt->cl_prog, clnt->cl_vers,
-				task->tk_xprt->servername);
+	case rpc_proc_unavail:
+		trace_rpc__proc_unavail(task);
 		error = -EOPNOTSUPP;
 		goto out_err;
-	case RPC_GARBAGE_ARGS:
-		dprintk("RPC: %5u %s: server saw garbage\n",
-				task->tk_pid, __func__);
-		break;			/* retry */
+	case rpc_garbage_args:
+		trace_rpc__garbage_args(task);
+		break;
 	default:
-		dprintk("RPC: %5u %s: server accept status: %x\n",
-				task->tk_pid, __func__, n);
-		/* Also retry */
+		trace_rpc__unparsable(task);
 	}
 
 out_garbage:
 	clnt->cl_stats->rpcgarbage++;
 	if (task->tk_garb_retry) {
 		task->tk_garb_retry--;
-		dprintk("RPC: %5u %s: retrying\n",
-				task->tk_pid, __func__);
 		task->tk_action = call_encode;
-out_retry:
-		return ERR_PTR(-EAGAIN);
+		return -EAGAIN;
 	}
 out_err:
 	rpc_exit(task, error);
-	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
-			__func__, error);
-	return ERR_PTR(error);
-out_overflow:
-	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
-			__func__);
+	return error;
+
+out_badlen:
+	trace_rpc__unparsable(task);
+	error = -EIO;
+	goto out_err;
+
+out_unparsable:
+	trace_rpc__unparsable(task);
+	error = -EIO;
+	goto out_garbage;
+
+out_verifier:
+	trace_rpc_bad_verifier(task);
 	goto out_garbage;
+
+out_msg_denied:
+	p = xdr_inline_decode(xdr, sizeof(*p));
+	if (!p)
+		goto out_unparsable;
+	switch (*p++) {
+	case rpc_auth_error:
+		break;
+	case rpc_mismatch:
+		trace_rpc__mismatch(task);
+		error = -EPROTONOSUPPORT;
+		goto out_err;
+	default:
+		trace_rpc__unparsable(task);
+		error = -EIO;
+		goto out_err;
+	}
+
+	p = xdr_inline_decode(xdr, sizeof(*p));
+	if (!p)
+		goto out_unparsable;
+	switch (*p++) {
+	case rpc_autherr_rejectedcred:
+	case rpc_autherr_rejectedverf:
+	case rpcsec_gsserr_credproblem:
+	case rpcsec_gsserr_ctxproblem:
+		if (!task->tk_cred_retry)
+			break;
+		task->tk_cred_retry--;
+		trace_rpc__stale_creds(task);
+		rpcauth_invalcred(task);
+		/* Ensure we obtain a new XID! */
+		xprt_release(task);
+		task->tk_action = call_reserve;
+		return -EAGAIN;
+	case rpc_autherr_badcred:
+	case rpc_autherr_badverf:
+		/* possibly garbled cred/verf? */
+		if (!task->tk_garb_retry)
+			break;
+		task->tk_garb_retry--;
+		trace_rpc__bad_creds(task);
+		task->tk_action = call_encode;
+		return -EAGAIN;
+	case rpc_autherr_tooweak:
+		trace_rpc__auth_tooweak(task);
+		pr_warn("RPC: server %s requires stronger authentication.\n",
+			task->tk_xprt->servername);
+		break;
+	default:
+		trace_rpc__unparsable(task);
+		error = -EIO;
+	}
+	goto out_err;
 }
 
 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index adc3c40cc733..28956c70100a 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -19,6 +19,7 @@
 #include <linux/spinlock.h>
 #include <linux/mutex.h>
 #include <linux/freezer.h>
+#include <linux/sched/mm.h>
 
 #include <linux/sunrpc/clnt.h>
 
@@ -784,8 +785,7 @@ void rpc_exit(struct rpc_task *task, int status)
 {
 	task->tk_status = status;
 	task->tk_action = rpc_exit_task;
-	if (RPC_IS_QUEUED(task))
-		rpc_wake_up_queued_task(task->tk_waitqueue, task);
+	rpc_wake_up_queued_task(task->tk_waitqueue, task);
 }
 EXPORT_SYMBOL_GPL(rpc_exit);
 
@@ -902,7 +902,10 @@ void rpc_execute(struct rpc_task *task)
 
 static void rpc_async_schedule(struct work_struct *work)
 {
+	unsigned int pflags = memalloc_nofs_save();
+
 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
+	memalloc_nofs_restore(pflags);
 }
 
 /**
@@ -921,16 +924,13 @@ static void rpc_async_schedule(struct work_struct *work)
  * Most requests are 'small' (under 2KiB) and can be serviced from a
  * mempool, ensuring that NFS reads and writes can always proceed,
  * and that there is good locality of reference for these buffers.
- *
- * In order to avoid memory starvation triggering more writebacks of
- * NFS requests, we avoid using GFP_KERNEL.
  */
 int rpc_malloc(struct rpc_task *task)
 {
 	struct rpc_rqst *rqst = task->tk_rqstp;
 	size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
 	struct rpc_buffer *buf;
-	gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
+	gfp_t gfp = GFP_NOFS;
 
 	if (RPC_IS_SWAPPER(task))
 		gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
@@ -1011,7 +1011,7 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
 static struct rpc_task *
 rpc_alloc_task(void)
 {
-	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOIO);
+	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
 /*
@@ -1067,7 +1067,10 @@ static void rpc_free_task(struct rpc_task *task)
 
 static void rpc_async_release(struct work_struct *work)
 {
+	unsigned int pflags = memalloc_nofs_save();
+
 	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
+	memalloc_nofs_restore(pflags);
 }
 
 static void rpc_release_resources_task(struct rpc_task *task)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index e87ddb9f7feb..dbd19697ee38 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1145,17 +1145,6 @@ static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ..
 #endif
 
 /*
- * Setup response header for TCP, it has a 4B record length field.
- */
-static void svc_tcp_prep_reply_hdr(struct svc_rqst *rqstp)
-{
-	struct kvec *resv = &rqstp->rq_res.head[0];
-
-	/* tcp needs a space for the record length... */
-	svc_putnl(resv, 0);
-}
-
-/*
  * Common routine for processing the RPC request.
  */
 static int
@@ -1182,10 +1171,6 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
 	set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
 	clear_bit(RQ_DROPME, &rqstp->rq_flags);
 
-	/* Setup reply header */
-	if (rqstp->rq_prot == IPPROTO_TCP)
-		svc_tcp_prep_reply_hdr(rqstp);
-
 	svc_putu32(resv, rqstp->rq_xid);
 
 	vers = svc_getnl(argv);
@@ -1443,6 +1428,10 @@ svc_process(struct svc_rqst *rqstp)
 		goto out_drop;
 	}
 
+	/* Reserve space for the record marker */
+	if (rqstp->rq_prot == IPPROTO_TCP)
+		svc_putnl(resv, 0);
+
 	/* Returns 1 for send, 0 for drop */
 	if (likely(svc_process_common(rqstp, argv, resv)))
 		return svc_send(rqstp);
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index f302c6eb8779..aa8177ddcbda 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -16,6 +16,7 @@
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/msg_prot.h>
 #include <linux/bvec.h>
+#include <trace/events/sunrpc.h>
 
 /*
  * XDR functions for basic NFS types
@@ -162,6 +163,15 @@ xdr_free_bvec(struct xdr_buf *buf)
 	buf->bvec = NULL;
 }
 
+/**
+ * xdr_inline_pages - Prepare receive buffer for a large reply
+ * @xdr: xdr_buf into which reply will be placed
+ * @offset: expected offset where data payload will start, in bytes
+ * @pages: vector of struct page pointers
+ * @base: offset in first page where receive should start, in bytes
+ * @len: expected size of the upper layer data payload, in bytes
+ *
+ */
 void
 xdr_inline_pages(struct xdr_buf *xdr, unsigned int offset,
 		 struct page **pages, unsigned int base, unsigned int len)
@@ -179,6 +189,8 @@ xdr_inline_pages(struct xdr_buf *xdr, unsigned int offset,
 
 	tail->iov_base = buf + offset;
 	tail->iov_len = buflen - offset;
+	if ((xdr->page_len & 3) == 0)
+		tail->iov_len -= sizeof(__be32);
 
 	xdr->buflen += len;
 }
@@ -346,13 +358,15 @@ EXPORT_SYMBOL_GPL(_copy_from_pages);
  * 'len' bytes. The extra data is not lost, but is instead
  * moved into the inlined pages and/or the tail.
  */
-static void
+static unsigned int
 xdr_shrink_bufhead(struct xdr_buf *buf, size_t len)
 {
 	struct kvec *head, *tail;
 	size_t copy, offs;
 	unsigned int pglen = buf->page_len;
+	unsigned int result;
 
+	result = 0;
 	tail = buf->tail;
 	head = buf->head;
 
@@ -366,6 +380,7 @@ xdr_shrink_bufhead(struct xdr_buf *buf, size_t len)
 			copy = tail->iov_len - len;
 			memmove((char *)tail->iov_base + len,
 					tail->iov_base, copy);
+			result += copy;
 		}
 		/* Copy from the inlined pages into the tail */
 		copy = len;
@@ -376,11 +391,13 @@ xdr_shrink_bufhead(struct xdr_buf *buf, size_t len)
 			copy = 0;
 		else if (copy > tail->iov_len - offs)
 			copy = tail->iov_len - offs;
-		if (copy != 0)
+		if (copy != 0) {
 			_copy_from_pages((char *)tail->iov_base + offs,
 					buf->pages,
 					buf->page_base + pglen + offs - len,
 					copy);
+			result += copy;
+		}
 		/* Do we also need to copy data from the head into the tail ? */
 		if (len > pglen) {
 			offs = copy = len - pglen;
@@ -390,6 +407,7 @@ xdr_shrink_bufhead(struct xdr_buf *buf, size_t len)
 					(char *)head->iov_base +
 					head->iov_len - offs,
 					copy);
+			result += copy;
 		}
 	}
 	/* Now handle pages */
@@ -405,12 +423,15 @@ xdr_shrink_bufhead(struct xdr_buf *buf, size_t len)
 		_copy_to_pages(buf->pages, buf->page_base,
 				(char *)head->iov_base + head->iov_len - len,
 				copy);
+		result += copy;
 	}
 	head->iov_len -= len;
 	buf->buflen -= len;
 	/* Have we truncated the message? */
 	if (buf->len > buf->buflen)
 		buf->len = buf->buflen;
+
+	return result;
 }
 
 /**
@@ -422,14 +443,16 @@ xdr_shrink_bufhead(struct xdr_buf *buf, size_t len)
  * 'len' bytes. The extra data is not lost, but is instead
  * moved into the tail.
  */
-static void
+static unsigned int
 xdr_shrink_pagelen(struct xdr_buf *buf, size_t len)
 {
 	struct kvec *tail;
 	size_t copy;
 	unsigned int pglen = buf->page_len;
 	unsigned int tailbuf_len;
+	unsigned int result;
 
+	result = 0;
 	tail = buf->tail;
 	BUG_ON (len > pglen);
 
@@ -447,18 +470,22 @@ xdr_shrink_pagelen(struct xdr_buf *buf, size_t len)
 		if (tail->iov_len > len) {
 			char *p = (char *)tail->iov_base + len;
 			memmove(p, tail->iov_base, tail->iov_len - len);
+			result += tail->iov_len - len;
 		} else
 			copy = tail->iov_len;
 		/* Copy from the inlined pages into the tail */
 		_copy_from_pages((char *)tail->iov_base,
 				buf->pages, buf->page_base + pglen - len,
 				copy);
+		result += copy;
 	}
 	buf->page_len -= len;
 	buf->buflen -= len;
 	/* Have we truncated the message? */
 	if (buf->len > buf->buflen)
 		buf->len = buf->buflen;
+
+	return result;
 }
 
 void
@@ -483,6 +510,7 @@ EXPORT_SYMBOL_GPL(xdr_stream_pos);
  * @xdr: pointer to xdr_stream struct
  * @buf: pointer to XDR buffer in which to encode data
  * @p: current pointer inside XDR buffer
+ * @rqst: pointer to controlling rpc_rqst, for debugging
  *
  * Note: at the moment the RPC client only passes the length of our
  *	 scratch buffer in the xdr_buf's header kvec. Previously this
@@ -491,7 +519,8 @@ EXPORT_SYMBOL_GPL(xdr_stream_pos);
  *	 of the buffer length, and takes care of adjusting the kvec
  *	 length for us.
  */
-void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
+void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p,
+		     struct rpc_rqst *rqst)
 {
 	struct kvec *iov = buf->head;
 	int scratch_len = buf->buflen - buf->page_len - buf->tail[0].iov_len;
@@ -513,6 +542,7 @@ void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
 		buf->len += len;
 		iov->iov_len += len;
 	}
+	xdr->rqst = rqst;
 }
 EXPORT_SYMBOL_GPL(xdr_init_encode);
 
@@ -551,9 +581,9 @@ static __be32 *xdr_get_next_encode_buffer(struct xdr_stream *xdr,
 	int frag1bytes, frag2bytes;
 
 	if (nbytes > PAGE_SIZE)
-		return NULL; /* Bigger buffers require special handling */
+		goto out_overflow; /* Bigger buffers require special handling */
 	if (xdr->buf->len + nbytes > xdr->buf->buflen)
-		return NULL; /* Sorry, we're totally out of space */
+		goto out_overflow; /* Sorry, we're totally out of space */
 	frag1bytes = (xdr->end - xdr->p) << 2;
 	frag2bytes = nbytes - frag1bytes;
 	if (xdr->iov)
@@ -582,6 +612,9 @@ static __be32 *xdr_get_next_encode_buffer(struct xdr_stream *xdr,
 	xdr->buf->page_len += frag2bytes;
 	xdr->buf->len += nbytes;
 	return p;
+out_overflow:
+	trace_rpc_xdr_overflow(xdr, nbytes);
+	return NULL;
 }
 
 /**
@@ -819,8 +852,10 @@ static bool xdr_set_next_buffer(struct xdr_stream *xdr)
  * @xdr: pointer to xdr_stream struct
  * @buf: pointer to XDR buffer from which to decode data
  * @p: current pointer inside XDR buffer
+ * @rqst: pointer to controlling rpc_rqst, for debugging
  */
-void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
+void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p,
+		     struct rpc_rqst *rqst)
 {
 	xdr->buf = buf;
 	xdr->scratch.iov_base = NULL;
@@ -836,6 +871,7 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
 		xdr->nwords -= p - xdr->p;
 		xdr->p = p;
 	}
+	xdr->rqst = rqst;
 }
 EXPORT_SYMBOL_GPL(xdr_init_decode);
 
@@ -854,7 +890,7 @@ void xdr_init_decode_pages(struct xdr_stream *xdr, struct xdr_buf *buf,
 	buf->page_len =  len;
 	buf->buflen =  len;
 	buf->len = len;
-	xdr_init_decode(xdr, buf, NULL);
+	xdr_init_decode(xdr, buf, NULL, NULL);
 }
 EXPORT_SYMBOL_GPL(xdr_init_decode_pages);
 
@@ -896,20 +932,23 @@ static __be32 *xdr_copy_to_scratch(struct xdr_stream *xdr, size_t nbytes)
 	size_t cplen = (char *)xdr->end - (char *)xdr->p;
 
 	if (nbytes > xdr->scratch.iov_len)
-		return NULL;
+		goto out_overflow;
 	p = __xdr_inline_decode(xdr, cplen);
 	if (p == NULL)
 		return NULL;
 	memcpy(cpdest, p, cplen);
+	if (!xdr_set_next_buffer(xdr))
+		goto out_overflow;
 	cpdest += cplen;
 	nbytes -= cplen;
-	if (!xdr_set_next_buffer(xdr))
-		return NULL;
 	p = __xdr_inline_decode(xdr, nbytes);
 	if (p == NULL)
 		return NULL;
 	memcpy(cpdest, p, nbytes);
 	return xdr->scratch.iov_base;
+out_overflow:
+	trace_rpc_xdr_overflow(xdr, nbytes);
+	return NULL;
 }
 
 /**
@@ -926,14 +965,17 @@ __be32 * xdr_inline_decode(struct xdr_stream *xdr, size_t nbytes)
 {
 	__be32 *p;
 
-	if (nbytes == 0)
+	if (unlikely(nbytes == 0))
 		return xdr->p;
 	if (xdr->p == xdr->end && !xdr_set_next_buffer(xdr))
-		return NULL;
+		goto out_overflow;
 	p = __xdr_inline_decode(xdr, nbytes);
 	if (p != NULL)
 		return p;
 	return xdr_copy_to_scratch(xdr, nbytes);
+out_overflow:
+	trace_rpc_xdr_overflow(xdr, nbytes);
+	return NULL;
 }
 EXPORT_SYMBOL_GPL(xdr_inline_decode);
 
@@ -943,13 +985,17 @@ static unsigned int xdr_align_pages(struct xdr_stream *xdr, unsigned int len)
 	struct kvec *iov;
 	unsigned int nwords = XDR_QUADLEN(len);
 	unsigned int cur = xdr_stream_pos(xdr);
+	unsigned int copied, offset;
 
 	if (xdr->nwords == 0)
 		return 0;
+
 	/* Realign pages to current pointer position */
-	iov  = buf->head;
+	iov = buf->head;
 	if (iov->iov_len > cur) {
-		xdr_shrink_bufhead(buf, iov->iov_len - cur);
+		offset = iov->iov_len - cur;
+		copied = xdr_shrink_bufhead(buf, offset);
+		trace_rpc_xdr_alignment(xdr, offset, copied);
 		xdr->nwords = XDR_QUADLEN(buf->len - cur);
 	}
 
@@ -961,7 +1007,9 @@ static unsigned int xdr_align_pages(struct xdr_stream *xdr, unsigned int len)
 		len = buf->page_len;
 	else if (nwords < xdr->nwords) {
 		/* Truncate page data and move it into the tail */
-		xdr_shrink_pagelen(buf, buf->page_len - len);
+		offset = buf->page_len - len;
+		copied = xdr_shrink_pagelen(buf, offset);
+		trace_rpc_xdr_alignment(xdr, offset, copied);
 		xdr->nwords = XDR_QUADLEN(buf->len - cur);
 	}
 	return len;
@@ -1102,47 +1150,6 @@ xdr_buf_subsegment(struct xdr_buf *buf, struct xdr_buf *subbuf,
 }
 EXPORT_SYMBOL_GPL(xdr_buf_subsegment);
 
-/**
- * xdr_buf_trim - lop at most "len" bytes off the end of "buf"
- * @buf: buf to be trimmed
- * @len: number of bytes to reduce "buf" by
- *
- * Trim an xdr_buf by the given number of bytes by fixing up the lengths. Note
- * that it's possible that we'll trim less than that amount if the xdr_buf is
- * too small, or if (for instance) it's all in the head and the parser has
- * already read too far into it.
- */
-void xdr_buf_trim(struct xdr_buf *buf, unsigned int len)
-{
-	size_t cur;
-	unsigned int trim = len;
-
-	if (buf->tail[0].iov_len) {
-		cur = min_t(size_t, buf->tail[0].iov_len, trim);
-		buf->tail[0].iov_len -= cur;
-		trim -= cur;
-		if (!trim)
-			goto fix_len;
-	}
-
-	if (buf->page_len) {
-		cur = min_t(unsigned int, buf->page_len, trim);
-		buf->page_len -= cur;
-		trim -= cur;
-		if (!trim)
-			goto fix_len;
-	}
-
-	if (buf->head[0].iov_len) {
-		cur = min_t(size_t, buf->head[0].iov_len, trim);
-		buf->head[0].iov_len -= cur;
-		trim -= cur;
-	}
-fix_len:
-	buf->len -= (len - trim);
-}
-EXPORT_SYMBOL_GPL(xdr_buf_trim);
-
 static void __read_bytes_from_xdr_buf(struct xdr_buf *subbuf, void *obj, unsigned int len)
 {
 	unsigned int this_len;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index f1ec2110efeb..e096c5a725df 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -49,6 +49,7 @@
 #include <linux/sunrpc/metrics.h>
 #include <linux/sunrpc/bc_xprt.h>
 #include <linux/rcupdate.h>
+#include <linux/sched/mm.h>
 
 #include <trace/events/sunrpc.h>
 
@@ -643,11 +644,13 @@ static void xprt_autoclose(struct work_struct *work)
 {
 	struct rpc_xprt *xprt =
 		container_of(work, struct rpc_xprt, task_cleanup);
+	unsigned int pflags = memalloc_nofs_save();
 
 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	xprt->ops->close(xprt);
 	xprt_release_write(xprt, NULL);
 	wake_up_bit(&xprt->state, XPRT_LOCKED);
+	memalloc_nofs_restore(pflags);
 }
 
 /**
@@ -1165,6 +1168,7 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
 				/* Note: req is added _before_ pos */
 				list_add_tail(&req->rq_xmit, &pos->rq_xmit);
 				INIT_LIST_HEAD(&req->rq_xmit2);
+				trace_xprt_enq_xmit(task, 1);
 				goto out;
 			}
 		} else if (RPC_IS_SWAPPER(task)) {
@@ -1176,6 +1180,7 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
 				/* Note: req is added _before_ pos */
 				list_add_tail(&req->rq_xmit, &pos->rq_xmit);
 				INIT_LIST_HEAD(&req->rq_xmit2);
+				trace_xprt_enq_xmit(task, 2);
 				goto out;
 			}
 		} else if (!req->rq_seqno) {
@@ -1184,11 +1189,13 @@ xprt_request_enqueue_transmit(struct rpc_task *task)
 					continue;
 				list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
 				INIT_LIST_HEAD(&req->rq_xmit);
+				trace_xprt_enq_xmit(task, 3);
 				goto out;
 			}
 		}
 		list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
 		INIT_LIST_HEAD(&req->rq_xmit2);
+		trace_xprt_enq_xmit(task, 4);
 out:
 		set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 		spin_unlock(&xprt->queue_lock);
@@ -1313,8 +1320,6 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 	int is_retrans = RPC_WAS_SENT(task);
 	int status;
 
-	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
-
 	if (!req->rq_bytes_sent) {
 		if (xprt_request_data_received(task)) {
 			status = 0;
@@ -1325,6 +1330,13 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 			status = -EBADMSG;
 			goto out_dequeue;
 		}
+		if (task->tk_ops->rpc_call_prepare_transmit) {
+			task->tk_ops->rpc_call_prepare_transmit(task,
+					task->tk_calldata);
+			status = task->tk_status;
+			if (status < 0)
+				goto out_dequeue;
+		}
 	}
 
 	/*
@@ -1336,9 +1348,9 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 
 	connect_cookie = xprt->connect_cookie;
 	status = xprt->ops->send_request(req);
-	trace_xprt_transmit(xprt, req->rq_xid, status);
 	if (status != 0) {
 		req->rq_ntrans--;
+		trace_xprt_transmit(req, status);
 		return status;
 	}
 
@@ -1347,7 +1359,6 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 
 	xprt_inject_disconnect(xprt);
 
-	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 	task->tk_flags |= RPC_TASK_SENT;
 	spin_lock_bh(&xprt->transport_lock);
 
@@ -1360,6 +1371,7 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
 
 	req->rq_connect_cookie = connect_cookie;
 out_dequeue:
+	trace_xprt_transmit(req, status);
 	xprt_request_dequeue_transmit(task);
 	rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
 	return status;
@@ -1599,7 +1611,6 @@ xprt_request_init(struct rpc_task *task)
 	req->rq_buffer  = NULL;
 	req->rq_xid	= xprt_alloc_xid(xprt);
 	xprt_init_connect_cookie(req, xprt);
-	req->rq_bytes_sent = 0;
 	req->rq_snd_buf.len = 0;
 	req->rq_snd_buf.buflen = 0;
 	req->rq_rcv_buf.len = 0;
@@ -1721,6 +1732,7 @@ void xprt_release(struct rpc_task *task)
 		xprt->ops->buf_free(task);
 	xprt_inject_disconnect(xprt);
 	xdr_free_bvec(&req->rq_rcv_buf);
+	xdr_free_bvec(&req->rq_snd_buf);
 	if (req->rq_cred != NULL)
 		put_rpccred(req->rq_cred);
 	task->tk_rqstp = NULL;
@@ -1749,7 +1761,6 @@ xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
 	 */
 	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
 		xbufp->tail[0].iov_len;
-	req->rq_bytes_sent = 0;
 }
 #endif
 
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 0de9b3e63770..d79b18c1f4cd 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -123,7 +123,7 @@ static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 
 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
 	xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf,
-			req->rl_rdmabuf->rg_base);
+			req->rl_rdmabuf->rg_base, rqst);
 
 	p = xdr_reserve_space(&req->rl_stream, 28);
 	if (unlikely(!p))
@@ -267,7 +267,6 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
 
 	/* Prepare rqst */
 	rqst->rq_reply_bytes_recvd = 0;
-	rqst->rq_bytes_sent = 0;
 	rqst->rq_xid = *p;
 
 	rqst->rq_private_buf.len = size;
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 6a561056b538..52cb6c1b0c2b 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -391,7 +391,7 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
  */
 struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
 				struct rpcrdma_mr_seg *seg,
-				int nsegs, bool writing, u32 xid,
+				int nsegs, bool writing, __be32 xid,
 				struct rpcrdma_mr **out)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
@@ -446,7 +446,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
 		goto out_mapmr_err;
 
 	ibmr->iova &= 0x00000000ffffffff;
-	ibmr->iova |= ((u64)cpu_to_be32(xid)) << 32;
+	ibmr->iova |= ((u64)be32_to_cpu(xid)) << 32;
 	key = (u8)(ibmr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(ibmr, ++key);
 
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index d18614e02b4e..6c1fb270f127 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -164,6 +164,21 @@ static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
 }
 
+/* The client is required to provide a Reply chunk if the maximum
+ * size of the non-payload part of the RPC Reply is larger than
+ * the inline threshold.
+ */
+static bool
+rpcrdma_nonpayload_inline(const struct rpcrdma_xprt *r_xprt,
+			  const struct rpc_rqst *rqst)
+{
+	const struct xdr_buf *buf = &rqst->rq_rcv_buf;
+	const struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+
+	return buf->head[0].iov_len + buf->tail[0].iov_len <
+		ia->ri_max_inline_read;
+}
+
 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
  * a byte range. Other modes coalesce these SGEs into a single MR
  * when they can.
@@ -733,7 +748,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 
 	rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
 	xdr_init_encode(xdr, &req->rl_hdrbuf,
-			req->rl_rdmabuf->rg_base);
+			req->rl_rdmabuf->rg_base, rqst);
 
 	/* Fixed header fields */
 	ret = -EMSGSIZE;
@@ -762,7 +777,8 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
 	 */
 	if (rpcrdma_results_inline(r_xprt, rqst))
 		wtype = rpcrdma_noch;
-	else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
+	else if ((ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) &&
+		 rpcrdma_nonpayload_inline(r_xprt, rqst))
 		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;
@@ -1313,7 +1329,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 
 	/* Fixed transport header fields */
 	xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
-			rep->rr_hdrbuf.head[0].iov_base);
+			rep->rr_hdrbuf.head[0].iov_base, NULL);
 	p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
 	if (unlikely(!p))
 		goto out_shortreply;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index b908f2ca08fd..907464c2a9f0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -304,7 +304,6 @@ xprt_setup_rdma_bc(struct xprt_create *args)
 	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 
 	xprt->prot = XPRT_TRANSPORT_BC_RDMA;
-	xprt->tsh_size = 0;
 	xprt->ops = &xprt_rdma_bc_procs;
 
 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index fbc171ebfe91..5d261353bd90 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -332,7 +332,6 @@ xprt_setup_rdma(struct xprt_create *args)
 	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
 
 	xprt->resvport = 0;		/* privileged port not needed */
-	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
 	xprt->ops = &xprt_rdma_procs;
 
 	/*
@@ -738,7 +737,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst)
 		goto drop_connection;
 
 	rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
-	rqst->rq_bytes_sent = 0;
 
 	/* An RPC with no reply will throw off credit accounting,
 	 * so drop the connection to reset the credit grant.
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 21113bfd4eca..89a63391d4d4 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1481,6 +1481,8 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp)
 	if (ep->rep_receive_count > needed)
 		goto out;
 	needed -= ep->rep_receive_count;
+	if (!temp)
+		needed += RPCRDMA_MAX_RECV_BATCH;
 
 	count = 0;
 	wr = NULL;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 5a18472f2c9c..10f6593e1a6a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -205,6 +205,16 @@ struct rpcrdma_rep {
 	struct ib_recv_wr	rr_recv_wr;
 };
 
+/* To reduce the rate at which a transport invokes ib_post_recv
+ * (and thus the hardware doorbell rate), xprtrdma posts Receive
+ * WRs in batches.
+ *
+ * Setting this to zero disables Receive post batching.
+ */
+enum {
+	RPCRDMA_MAX_RECV_BATCH = 7,
+};
+
 /* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
  */
 struct rpcrdma_req;
@@ -577,7 +587,7 @@ void frwr_release_mr(struct rpcrdma_mr *mr);
 size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt);
 struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt,
 				struct rpcrdma_mr_seg *seg,
-				int nsegs, bool writing, u32 xid,
+				int nsegs, bool writing, __be32 xid,
 				struct rpcrdma_mr **mr);
 int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req);
 void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 7754aa3e434f..42f45d33dc56 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -50,6 +50,7 @@
 #include <linux/bvec.h>
 #include <linux/highmem.h>
 #include <linux/uio.h>
+#include <linux/sched/mm.h>
 
 #include <trace/events/sunrpc.h>
 
@@ -404,8 +405,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
 	size_t want, seek_init = seek, offset = 0;
 	ssize_t ret;
 
-	if (seek < buf->head[0].iov_len) {
-		want = min_t(size_t, count, buf->head[0].iov_len);
+	want = min_t(size_t, count, buf->head[0].iov_len);
+	if (seek < want) {
 		ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
 		if (ret <= 0)
 			goto sock_err;
@@ -416,13 +417,13 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
 			goto out;
 		seek = 0;
 	} else {
-		seek -= buf->head[0].iov_len;
-		offset += buf->head[0].iov_len;
+		seek -= want;
+		offset += want;
 	}
 
 	want = xs_alloc_sparse_pages(buf,
 			min_t(size_t, count - offset, buf->page_len),
-			GFP_NOWAIT);
+			GFP_KERNEL);
 	if (seek < want) {
 		ret = xs_read_bvec(sock, msg, flags, buf->bvec,
 				xdr_buf_pagecount(buf),
@@ -442,8 +443,8 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
 		offset += want;
 	}
 
-	if (seek < buf->tail[0].iov_len) {
-		want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+	want = min_t(size_t, count - offset, buf->tail[0].iov_len);
+	if (seek < want) {
 		ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
 		if (ret <= 0)
 			goto sock_err;
@@ -453,7 +454,7 @@ xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
 		if (ret != want)
 			goto out;
 	} else
-		offset += buf->tail[0].iov_len;
+		offset = seek_init;
 	ret = -EMSGSIZE;
 out:
 	*read = offset - seek_init;
@@ -481,6 +482,14 @@ xs_read_stream_request_done(struct sock_xprt *transport)
 	return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
 }
 
+static void
+xs_read_stream_check_eor(struct sock_xprt *transport,
+		struct msghdr *msg)
+{
+	if (xs_read_stream_request_done(transport))
+		msg->msg_flags |= MSG_EOR;
+}
+
 static ssize_t
 xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
 		int flags, struct rpc_rqst *req)
@@ -492,17 +501,21 @@ xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
 	xs_read_header(transport, buf);
 
 	want = transport->recv.len - transport->recv.offset;
-	ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
-			transport->recv.copied + want, transport->recv.copied,
-			&read);
-	transport->recv.offset += read;
-	transport->recv.copied += read;
-	if (transport->recv.offset == transport->recv.len) {
-		if (xs_read_stream_request_done(transport))
-			msg->msg_flags |= MSG_EOR;
-		return read;
+	if (want != 0) {
+		ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
+				transport->recv.copied + want,
+				transport->recv.copied,
+				&read);
+		transport->recv.offset += read;
+		transport->recv.copied += read;
 	}
 
+	if (transport->recv.offset == transport->recv.len)
+		xs_read_stream_check_eor(transport, msg);
+
+	if (want == 0)
+		return 0;
+
 	switch (ret) {
 	default:
 		break;
@@ -655,13 +668,35 @@ out_err:
 	return ret != 0 ? ret : -ESHUTDOWN;
 }
 
+static __poll_t xs_poll_socket(struct sock_xprt *transport)
+{
+	return transport->sock->ops->poll(transport->file, transport->sock,
+			NULL);
+}
+
+static bool xs_poll_socket_readable(struct sock_xprt *transport)
+{
+	__poll_t events = xs_poll_socket(transport);
+
+	return (events & (EPOLLIN | EPOLLRDNORM)) && !(events & EPOLLRDHUP);
+}
+
+static void xs_poll_check_readable(struct sock_xprt *transport)
+{
+
+	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+	if (!xs_poll_socket_readable(transport))
+		return;
+	if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+		queue_work(xprtiod_workqueue, &transport->recv_worker);
+}
+
 static void xs_stream_data_receive(struct sock_xprt *transport)
 {
 	size_t read = 0;
 	ssize_t ret = 0;
 
 	mutex_lock(&transport->recv_mutex);
-	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
 	if (transport->sock == NULL)
 		goto out;
 	for (;;) {
@@ -671,6 +706,10 @@ static void xs_stream_data_receive(struct sock_xprt *transport)
 		read += ret;
 		cond_resched();
 	}
+	if (ret == -ESHUTDOWN)
+		kernel_sock_shutdown(transport->sock, SHUT_RDWR);
+	else
+		xs_poll_check_readable(transport);
 out:
 	mutex_unlock(&transport->recv_mutex);
 	trace_xs_stream_read_data(&transport->xprt, ret, read);
@@ -680,7 +719,10 @@ static void xs_stream_data_receive_workfn(struct work_struct *work)
 {
 	struct sock_xprt *transport =
 		container_of(work, struct sock_xprt, recv_worker);
+	unsigned int pflags = memalloc_nofs_save();
+
 	xs_stream_data_receive(transport);
+	memalloc_nofs_restore(pflags);
 }
 
 static void
@@ -690,65 +732,65 @@ xs_stream_reset_connect(struct sock_xprt *transport)
 	transport->recv.len = 0;
 	transport->recv.copied = 0;
 	transport->xmit.offset = 0;
+}
+
+static void
+xs_stream_start_connect(struct sock_xprt *transport)
+{
 	transport->xprt.stat.connect_count++;
 	transport->xprt.stat.connect_start = jiffies;
 }
 
 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
 
-static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
+static int xs_sendmsg(struct socket *sock, struct msghdr *msg, size_t seek)
 {
-	struct msghdr msg = {
-		.msg_name	= addr,
-		.msg_namelen	= addrlen,
-		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
-	};
-	struct kvec iov = {
-		.iov_base	= vec->iov_base + base,
-		.iov_len	= vec->iov_len - base,
-	};
+	if (seek)
+		iov_iter_advance(&msg->msg_iter, seek);
+	return sock_sendmsg(sock, msg);
+}
 
-	if (iov.iov_len != 0)
-		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
-	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
+static int xs_send_kvec(struct socket *sock, struct msghdr *msg, struct kvec *vec, size_t seek)
+{
+	iov_iter_kvec(&msg->msg_iter, WRITE, vec, 1, vec->iov_len);
+	return xs_sendmsg(sock, msg, seek);
 }
 
-static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy, int *sent_p)
+static int xs_send_pagedata(struct socket *sock, struct msghdr *msg, struct xdr_buf *xdr, size_t base)
 {
-	ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
-			int offset, size_t size, int flags);
-	struct page **ppage;
-	unsigned int remainder;
 	int err;
 
-	remainder = xdr->page_len - base;
-	base += xdr->page_base;
-	ppage = xdr->pages + (base >> PAGE_SHIFT);
-	base &= ~PAGE_MASK;
-	do_sendpage = sock->ops->sendpage;
-	if (!zerocopy)
-		do_sendpage = sock_no_sendpage;
-	for(;;) {
-		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
-		int flags = XS_SENDMSG_FLAGS;
+	err = xdr_alloc_bvec(xdr, GFP_KERNEL);
+	if (err < 0)
+		return err;
 
-		remainder -= len;
-		if (more)
-			flags |= MSG_MORE;
-		if (remainder != 0)
-			flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
-		err = do_sendpage(sock, *ppage, base, len, flags);
-		if (remainder == 0 || err != len)
-			break;
-		*sent_p += err;
-		ppage++;
-		base = 0;
-	}
-	if (err > 0) {
-		*sent_p += err;
-		err = 0;
-	}
-	return err;
+	iov_iter_bvec(&msg->msg_iter, WRITE, xdr->bvec,
+			xdr_buf_pagecount(xdr),
+			xdr->page_len + xdr->page_base);
+	return xs_sendmsg(sock, msg, base + xdr->page_base);
+}
+
+#define xs_record_marker_len() sizeof(rpc_fraghdr)
+
+/* Common case:
+ *  - stream transport
+ *  - sending from byte 0 of the message
+ *  - the message is wholly contained in @xdr's head iovec
+ */
+static int xs_send_rm_and_kvec(struct socket *sock, struct msghdr *msg,
+		rpc_fraghdr marker, struct kvec *vec, size_t base)
+{
+	struct kvec iov[2] = {
+		[0] = {
+			.iov_base	= &marker,
+			.iov_len	= sizeof(marker)
+		},
+		[1] = *vec,
+	};
+	size_t len = iov[0].iov_len + iov[1].iov_len;
+
+	iov_iter_kvec(&msg->msg_iter, WRITE, iov, 2, len);
+	return xs_sendmsg(sock, msg, base);
 }
 
 /**
@@ -758,49 +800,60 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
  * @addrlen: UDP only -- length of destination address
  * @xdr: buffer containing this request
  * @base: starting position in the buffer
- * @zerocopy: true if it is safe to use sendpage()
+ * @rm: stream record marker field
  * @sent_p: return the total number of bytes successfully queued for sending
  *
  */
-static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy, int *sent_p)
+static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, rpc_fraghdr rm, int *sent_p)
 {
-	unsigned int remainder = xdr->len - base;
+	struct msghdr msg = {
+		.msg_name = addr,
+		.msg_namelen = addrlen,
+		.msg_flags = XS_SENDMSG_FLAGS | MSG_MORE,
+	};
+	unsigned int rmsize = rm ? sizeof(rm) : 0;
+	unsigned int remainder = rmsize + xdr->len - base;
+	unsigned int want;
 	int err = 0;
-	int sent = 0;
 
 	if (unlikely(!sock))
 		return -ENOTSOCK;
 
-	if (base != 0) {
-		addr = NULL;
-		addrlen = 0;
-	}
-
-	if (base < xdr->head[0].iov_len || addr != NULL) {
-		unsigned int len = xdr->head[0].iov_len - base;
+	want = xdr->head[0].iov_len + rmsize;
+	if (base < want) {
+		unsigned int len = want - base;
 		remainder -= len;
-		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
+		if (remainder == 0)
+			msg.msg_flags &= ~MSG_MORE;
+		if (rmsize)
+			err = xs_send_rm_and_kvec(sock, &msg, rm,
+					&xdr->head[0], base);
+		else
+			err = xs_send_kvec(sock, &msg, &xdr->head[0], base);
 		if (remainder == 0 || err != len)
 			goto out;
 		*sent_p += err;
 		base = 0;
 	} else
-		base -= xdr->head[0].iov_len;
+		base -= want;
 
 	if (base < xdr->page_len) {
 		unsigned int len = xdr->page_len - base;
 		remainder -= len;
-		err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy, &sent);
-		*sent_p += sent;
-		if (remainder == 0 || sent != len)
+		if (remainder == 0)
+			msg.msg_flags &= ~MSG_MORE;
+		err = xs_send_pagedata(sock, &msg, xdr, base);
+		if (remainder == 0 || err != len)
 			goto out;
+		*sent_p += err;
 		base = 0;
 	} else
 		base -= xdr->page_len;
 
 	if (base >= xdr->tail[0].iov_len)
 		return 0;
-	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
+	msg.msg_flags &= ~MSG_MORE;
+	err = xs_send_kvec(sock, &msg, &xdr->tail[0], base);
 out:
 	if (err > 0) {
 		*sent_p += err;
@@ -856,7 +909,7 @@ static int xs_nospace(struct rpc_rqst *req)
 static void
 xs_stream_prepare_request(struct rpc_rqst *req)
 {
-	req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
+	req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_KERNEL);
 }
 
 /*
@@ -870,13 +923,14 @@ xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
 }
 
 /*
- * Construct a stream transport record marker in @buf.
+ * Return the stream record marker field for a record of length < 2^31-1
  */
-static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
+static rpc_fraghdr
+xs_stream_record_marker(struct xdr_buf *xdr)
 {
-	u32 reclen = buf->len - sizeof(rpc_fraghdr);
-	rpc_fraghdr *base = buf->head[0].iov_base;
-	*base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
+	if (!xdr->len)
+		return 0;
+	return cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | (u32)xdr->len);
 }
 
 /**
@@ -905,15 +959,14 @@ static int xs_local_send_request(struct rpc_rqst *req)
 		return -ENOTCONN;
 	}
 
-	xs_encode_stream_record_marker(&req->rq_snd_buf);
-
 	xs_pktdump("packet data:",
 			req->rq_svec->iov_base, req->rq_svec->iov_len);
 
 	req->rq_xtime = ktime_get();
 	status = xs_sendpages(transport->sock, NULL, 0, xdr,
 			      transport->xmit.offset,
-			      true, &sent);
+			      xs_stream_record_marker(xdr),
+			      &sent);
 	dprintk("RPC:       %s(%u) = %d\n",
 			__func__, xdr->len - transport->xmit.offset, status);
 
@@ -925,7 +978,6 @@ static int xs_local_send_request(struct rpc_rqst *req)
 		req->rq_bytes_sent = transport->xmit.offset;
 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 			req->rq_xmit_bytes_sent += transport->xmit.offset;
-			req->rq_bytes_sent = 0;
 			transport->xmit.offset = 0;
 			return 0;
 		}
@@ -981,7 +1033,7 @@ static int xs_udp_send_request(struct rpc_rqst *req)
 
 	req->rq_xtime = ktime_get();
 	status = xs_sendpages(transport->sock, xs_addr(xprt), xprt->addrlen,
-			      xdr, 0, true, &sent);
+			      xdr, 0, 0, &sent);
 
 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
 			xdr->len, status);
@@ -1045,7 +1097,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
 	struct rpc_xprt *xprt = req->rq_xprt;
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct xdr_buf *xdr = &req->rq_snd_buf;
-	bool zerocopy = true;
 	bool vm_wait = false;
 	int status;
 	int sent;
@@ -1057,17 +1108,9 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
 		return -ENOTCONN;
 	}
 
-	xs_encode_stream_record_marker(&req->rq_snd_buf);
-
 	xs_pktdump("packet data:",
 				req->rq_svec->iov_base,
 				req->rq_svec->iov_len);
-	/* Don't use zero copy if this is a resend. If the RPC call
-	 * completes while the socket holds a reference to the pages,
-	 * then we may end up resending corrupted data.
-	 */
-	if (req->rq_task->tk_flags & RPC_TASK_SENT)
-		zerocopy = false;
 
 	if (test_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state))
 		xs_tcp_set_socket_timeouts(xprt, transport->sock);
@@ -1080,7 +1123,8 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
 		sent = 0;
 		status = xs_sendpages(transport->sock, NULL, 0, xdr,
 				      transport->xmit.offset,
-				      zerocopy, &sent);
+				      xs_stream_record_marker(xdr),
+				      &sent);
 
 		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
 				xdr->len - transport->xmit.offset, status);
@@ -1091,7 +1135,6 @@ static int xs_tcp_send_request(struct rpc_rqst *req)
 		req->rq_bytes_sent = transport->xmit.offset;
 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
 			req->rq_xmit_bytes_sent += transport->xmit.offset;
-			req->rq_bytes_sent = 0;
 			transport->xmit.offset = 0;
 			return 0;
 		}
@@ -1211,6 +1254,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
 	struct socket *sock = transport->sock;
 	struct sock *sk = transport->inet;
 	struct rpc_xprt *xprt = &transport->xprt;
+	struct file *filp = transport->file;
 
 	if (sk == NULL)
 		return;
@@ -1224,6 +1268,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
 	write_lock_bh(&sk->sk_callback_lock);
 	transport->inet = NULL;
 	transport->sock = NULL;
+	transport->file = NULL;
 
 	sk->sk_user_data = NULL;
 
@@ -1231,10 +1276,12 @@ static void xs_reset_transport(struct sock_xprt *transport)
 	xprt_clear_connected(xprt);
 	write_unlock_bh(&sk->sk_callback_lock);
 	xs_sock_reset_connection_flags(xprt);
+	/* Reset stream record info */
+	xs_stream_reset_connect(transport);
 	mutex_unlock(&transport->recv_mutex);
 
 	trace_rpc_socket_close(xprt, sock);
-	sock_release(sock);
+	fput(filp);
 
 	xprt_disconnect_done(xprt);
 }
@@ -1358,7 +1405,6 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
 	int err;
 
 	mutex_lock(&transport->recv_mutex);
-	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
 	sk = transport->inet;
 	if (sk == NULL)
 		goto out;
@@ -1370,6 +1416,7 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
 		consume_skb(skb);
 		cond_resched();
 	}
+	xs_poll_check_readable(transport);
 out:
 	mutex_unlock(&transport->recv_mutex);
 }
@@ -1378,7 +1425,10 @@ static void xs_udp_data_receive_workfn(struct work_struct *work)
 {
 	struct sock_xprt *transport =
 		container_of(work, struct sock_xprt, recv_worker);
+	unsigned int pflags = memalloc_nofs_save();
+
 	xs_udp_data_receive(transport);
+	memalloc_nofs_restore(pflags);
 }
 
 /**
@@ -1826,6 +1876,7 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt,
 		struct sock_xprt *transport, int family, int type,
 		int protocol, bool reuseport)
 {
+	struct file *filp;
 	struct socket *sock;
 	int err;
 
@@ -1846,6 +1897,11 @@ static struct socket *xs_create_sock(struct rpc_xprt *xprt,
 		goto out;
 	}
 
+	filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
+	if (IS_ERR(filp))
+		return ERR_CAST(filp);
+	transport->file = filp;
+
 	return sock;
 out:
 	return ERR_PTR(err);
@@ -1869,7 +1925,6 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
 		sk->sk_write_space = xs_udp_write_space;
 		sock_set_flag(sk, SOCK_FASYNC);
 		sk->sk_error_report = xs_error_report;
-		sk->sk_allocation = GFP_NOIO;
 
 		xprt_clear_connected(xprt);
 
@@ -1880,7 +1935,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
 		write_unlock_bh(&sk->sk_callback_lock);
 	}
 
-	xs_stream_reset_connect(transport);
+	xs_stream_start_connect(transport);
 
 	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
 }
@@ -1892,6 +1947,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
 static int xs_local_setup_socket(struct sock_xprt *transport)
 {
 	struct rpc_xprt *xprt = &transport->xprt;
+	struct file *filp;
 	struct socket *sock;
 	int status = -EIO;
 
@@ -1904,6 +1960,13 @@ static int xs_local_setup_socket(struct sock_xprt *transport)
 	}
 	xs_reclassify_socket(AF_LOCAL, sock);
 
+	filp = sock_alloc_file(sock, O_NONBLOCK, NULL);
+	if (IS_ERR(filp)) {
+		status = PTR_ERR(filp);
+		goto out;
+	}
+	transport->file = filp;
+
 	dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
 			xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
 
@@ -2057,7 +2120,6 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 		sk->sk_data_ready = xs_data_ready;
 		sk->sk_write_space = xs_udp_write_space;
 		sock_set_flag(sk, SOCK_FASYNC);
-		sk->sk_allocation = GFP_NOIO;
 
 		xprt_set_connected(xprt);
 
@@ -2220,7 +2282,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 		sk->sk_write_space = xs_tcp_write_space;
 		sock_set_flag(sk, SOCK_FASYNC);
 		sk->sk_error_report = xs_error_report;
-		sk->sk_allocation = GFP_NOIO;
 
 		/* socket options */
 		sock_reset_flag(sk, SOCK_LINGER);
@@ -2240,8 +2301,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 
 	xs_set_memalloc(xprt);
 
-	/* Reset TCP record info */
-	xs_stream_reset_connect(transport);
+	xs_stream_start_connect(transport);
 
 	/* Tell the socket layer to start connecting... */
 	set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
@@ -2534,26 +2594,35 @@ static int bc_sendto(struct rpc_rqst *req)
 {
 	int len;
 	struct xdr_buf *xbufp = &req->rq_snd_buf;
-	struct rpc_xprt *xprt = req->rq_xprt;
 	struct sock_xprt *transport =
-				container_of(xprt, struct sock_xprt, xprt);
-	struct socket *sock = transport->sock;
+			container_of(req->rq_xprt, struct sock_xprt, xprt);
 	unsigned long headoff;
 	unsigned long tailoff;
+	struct page *tailpage;
+	struct msghdr msg = {
+		.msg_flags	= MSG_MORE
+	};
+	rpc_fraghdr marker = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT |
+					 (u32)xbufp->len);
+	struct kvec iov = {
+		.iov_base	= &marker,
+		.iov_len	= sizeof(marker),
+	};
 
-	xs_encode_stream_record_marker(xbufp);
+	len = kernel_sendmsg(transport->sock, &msg, &iov, 1, iov.iov_len);
+	if (len != iov.iov_len)
+		return -EAGAIN;
 
+	tailpage = NULL;
+	if (xbufp->tail[0].iov_len)
+		tailpage = virt_to_page(xbufp->tail[0].iov_base);
 	tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
 	headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
-	len = svc_send_common(sock, xbufp,
+	len = svc_send_common(transport->sock, xbufp,
 			      virt_to_page(xbufp->head[0].iov_base), headoff,
-			      xbufp->tail[0].iov_base, tailoff);
-
-	if (len != xbufp->len) {
-		printk(KERN_NOTICE "Error sending entire callback!\n");
-		len = -EAGAIN;
-	}
-
+			      tailpage, tailoff);
+	if (len != xbufp->len)
+		return -EAGAIN;
 	return len;
 }
 
@@ -2793,7 +2862,6 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
 	transport = container_of(xprt, struct sock_xprt, xprt);
 
 	xprt->prot = 0;
-	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
 	xprt->bind_timeout = XS_BIND_TO;
@@ -2862,7 +2930,6 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
 	transport = container_of(xprt, struct sock_xprt, xprt);
 
 	xprt->prot = IPPROTO_UDP;
-	xprt->tsh_size = 0;
 	/* XXX: header size can vary due to auth type, IPv6, etc. */
 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
@@ -2942,7 +3009,6 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
 	transport = container_of(xprt, struct sock_xprt, xprt);
 
 	xprt->prot = IPPROTO_TCP;
-	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
 	xprt->bind_timeout = XS_BIND_TO;
@@ -3015,7 +3081,6 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
 	transport = container_of(xprt, struct sock_xprt, xprt);
 
 	xprt->prot = IPPROTO_TCP;
-	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 	xprt->timeout = &xs_tcp_default_timeout;