summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--fs/nfs/blocklayout/blocklayout.c7
-rw-r--r--fs/nfs/callback.c40
-rw-r--r--fs/nfs/callback.h12
-rw-r--r--fs/nfs/callback_proc.c2
-rw-r--r--fs/nfs/callback_xdr.c39
-rw-r--r--fs/nfs/client.c1
-rw-r--r--fs/nfs/delegation.c6
-rw-r--r--fs/nfs/dir.c3
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.c40
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.h7
-rw-r--r--fs/nfs/mount_clnt.c4
-rw-r--r--fs/nfs/nfs42.h1
-rw-r--r--fs/nfs/nfs42proc.c71
-rw-r--r--fs/nfs/nfs42xdr.c97
-rw-r--r--fs/nfs/nfs4_fs.h6
-rw-r--r--fs/nfs/nfs4file.c136
-rw-r--r--fs/nfs/nfs4proc.c182
-rw-r--r--fs/nfs/nfs4xdr.c53
-rw-r--r--fs/nfs/nfsroot.c2
-rw-r--r--fs/nfs/pnfs.c12
-rw-r--r--fs/nfs/read.c9
-rw-r--r--fs/nfs/super.c2
-rw-r--r--fs/nfs/write.c7
-rw-r--r--include/linux/nfs4.h3
-rw-r--r--include/linux/nfs_fs_sb.h2
-rw-r--r--include/linux/nfs_xdr.h28
-rw-r--r--include/linux/sunrpc/bc_xprt.h5
-rw-r--r--include/linux/sunrpc/svc_rdma.h6
-rw-r--r--include/linux/sunrpc/xprt.h9
-rw-r--r--include/linux/sunrpc/xprtsock.h2
-rw-r--r--include/uapi/linux/nfs.h13
-rw-r--r--net/sunrpc/backchannel_rqst.c24
-rw-r--r--net/sunrpc/svc.c5
-rw-r--r--net/sunrpc/sysctl.c23
-rw-r--r--net/sunrpc/xprtrdma/Makefile1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c394
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c7
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c58
-rw-r--r--net/sunrpc/xprtrdma/transport.c18
-rw-r--r--net/sunrpc/xprtrdma/verbs.c479
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h54
-rw-r--r--net/sunrpc/xprtsock.c260
44 files changed, 1687 insertions, 597 deletions
diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c
index 9cd4eb3a1e22..ddd0138f410c 100644
--- a/fs/nfs/blocklayout/blocklayout.c
+++ b/fs/nfs/blocklayout/blocklayout.c
@@ -229,7 +229,7 @@ bl_read_pagelist(struct nfs_pgio_header *header)
 	struct parallel_io *par;
 	loff_t f_offset = header->args.offset;
 	size_t bytes_left = header->args.count;
-	unsigned int pg_offset, pg_len;
+	unsigned int pg_offset = header->args.pgbase, pg_len;
 	struct page **pages = header->args.pages;
 	int pg_index = header->args.pgbase >> PAGE_CACHE_SHIFT;
 	const bool is_dio = (header->dreq != NULL);
@@ -262,7 +262,6 @@ bl_read_pagelist(struct nfs_pgio_header *header)
 			extent_length = be.be_length - (isect - be.be_f_offset);
 		}
 
-		pg_offset = f_offset & ~PAGE_CACHE_MASK;
 		if (is_dio) {
 			if (pg_offset + bytes_left > PAGE_CACHE_SIZE)
 				pg_len = PAGE_CACHE_SIZE - pg_offset;
@@ -273,9 +272,6 @@ bl_read_pagelist(struct nfs_pgio_header *header)
 			pg_len = PAGE_CACHE_SIZE;
 		}
 
-		isect += (pg_offset >> SECTOR_SHIFT);
-		extent_length -= (pg_offset >> SECTOR_SHIFT);
-
 		if (is_hole(&be)) {
 			bio = bl_submit_bio(READ, bio);
 			/* Fill hole w/ zeroes w/o accessing device */
@@ -301,6 +297,7 @@ bl_read_pagelist(struct nfs_pgio_header *header)
 		extent_length -= (pg_len >> SECTOR_SHIFT);
 		f_offset += pg_len;
 		bytes_left -= pg_len;
+		pg_offset = 0;
 	}
 	if ((isect << SECTOR_SHIFT) >= header->inode->i_size) {
 		header->res.eof = 1;
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index 75f7c0a7538a..a7f2e6e33305 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -99,17 +99,6 @@ nfs4_callback_up(struct svc_serv *serv)
 }
 
 #if defined(CONFIG_NFS_V4_1)
-static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
-{
-	/*
-	 * Create an svc_sock for the back channel service that shares the
-	 * fore channel connection.
-	 * Returns the input port (0) and sets the svc_serv bc_xprt on success
-	 */
-	return svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
-			      SVC_SOCK_ANONYMOUS);
-}
-
 /*
  * The callback service for NFSv4.1 callbacks
  */
@@ -184,11 +173,6 @@ static inline void nfs_callback_bc_serv(u32 minorversion, struct rpc_xprt *xprt,
 		xprt->bc_serv = serv;
 }
 #else
-static int nfs41_callback_up_net(struct svc_serv *serv, struct net *net)
-{
-	return 0;
-}
-
 static void nfs_minorversion_callback_svc_setup(struct svc_serv *serv,
 		struct svc_rqst **rqstpp, int (**callback_svc)(void *vrqstp))
 {
@@ -259,7 +243,8 @@ static void nfs_callback_down_net(u32 minorversion, struct svc_serv *serv, struc
 	svc_shutdown_net(serv, net);
 }
 
-static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct net *net)
+static int nfs_callback_up_net(int minorversion, struct svc_serv *serv,
+			       struct net *net, struct rpc_xprt *xprt)
 {
 	struct nfs_net *nn = net_generic(net, nfs_net_id);
 	int ret;
@@ -275,20 +260,11 @@ static int nfs_callback_up_net(int minorversion, struct svc_serv *serv, struct n
 		goto err_bind;
 	}
 
-	switch (minorversion) {
-		case 0:
-			ret = nfs4_callback_up_net(serv, net);
-			break;
-		case 1:
-		case 2:
-			ret = nfs41_callback_up_net(serv, net);
-			break;
-		default:
-			printk(KERN_ERR "NFS: unknown callback version: %d\n",
-					minorversion);
-			ret = -EINVAL;
-			break;
-	}
+	ret = -EPROTONOSUPPORT;
+	if (minorversion == 0)
+		ret = nfs4_callback_up_net(serv, net);
+	else if (xprt->ops->bc_up)
+		ret = xprt->ops->bc_up(serv, net);
 
 	if (ret < 0) {
 		printk(KERN_ERR "NFS: callback service start failed\n");
@@ -364,7 +340,7 @@ int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt)
 		goto err_create;
 	}
 
-	ret = nfs_callback_up_net(minorversion, serv, net);
+	ret = nfs_callback_up_net(minorversion, serv, net, xprt);
 	if (ret < 0)
 		goto err_net;
 
diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
index 84326e9fb47a..ff8195bd75ea 100644
--- a/fs/nfs/callback.h
+++ b/fs/nfs/callback.h
@@ -61,7 +61,6 @@ struct cb_compound_hdr_res {
 };
 
 struct cb_getattrargs {
-	struct sockaddr *addr;
 	struct nfs_fh fh;
 	uint32_t bitmap[2];
 };
@@ -76,7 +75,6 @@ struct cb_getattrres {
 };
 
 struct cb_recallargs {
-	struct sockaddr *addr;
 	struct nfs_fh fh;
 	nfs4_stateid stateid;
 	uint32_t truncate;
@@ -119,9 +117,6 @@ extern __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
 				       struct cb_sequenceres *res,
 				       struct cb_process_state *cps);
 
-extern int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation,
-					     const nfs4_stateid *stateid);
-
 #define RCA4_TYPE_MASK_RDATA_DLG	0
 #define RCA4_TYPE_MASK_WDATA_DLG	1
 #define RCA4_TYPE_MASK_DIR_DLG         2
@@ -134,7 +129,6 @@ extern int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation,
 #define RCA4_TYPE_MASK_ALL 0xf31f
 
 struct cb_recallanyargs {
-	struct sockaddr	*craa_addr;
 	uint32_t	craa_objs_to_keep;
 	uint32_t	craa_type_mask;
 };
@@ -144,7 +138,6 @@ extern __be32 nfs4_callback_recallany(struct cb_recallanyargs *args,
 					struct cb_process_state *cps);
 
 struct cb_recallslotargs {
-	struct sockaddr	*crsa_addr;
 	uint32_t	crsa_target_highest_slotid;
 };
 extern __be32 nfs4_callback_recallslot(struct cb_recallslotargs *args,
@@ -152,7 +145,6 @@ extern __be32 nfs4_callback_recallslot(struct cb_recallslotargs *args,
 					 struct cb_process_state *cps);
 
 struct cb_layoutrecallargs {
-	struct sockaddr		*cbl_addr;
 	uint32_t		cbl_recall_type;
 	uint32_t		cbl_layout_type;
 	uint32_t		cbl_layoutchanged;
@@ -196,9 +188,6 @@ extern __be32 nfs4_callback_recall(struct cb_recallargs *args, void *dummy,
 #if IS_ENABLED(CONFIG_NFS_V4)
 extern int nfs_callback_up(u32 minorversion, struct rpc_xprt *xprt);
 extern void nfs_callback_down(int minorversion, struct net *net);
-extern int nfs4_validate_delegation_stateid(struct nfs_delegation *delegation,
-					    const nfs4_stateid *stateid);
-extern int nfs4_set_callback_sessionid(struct nfs_client *clp);
 #endif /* CONFIG_NFS_V4 */
 /*
  * nfs41: Callbacks are expected to not cause substantial latency,
@@ -209,6 +198,5 @@ extern int nfs4_set_callback_sessionid(struct nfs_client *clp);
 #define NFS41_BC_MAX_CALLBACKS 1
 
 extern unsigned int nfs_callback_set_tcpport;
-extern unsigned short nfs_callback_tcpport;
 
 #endif /* __LINUX_FS_NFS_CALLBACK_H */
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index b85cf7a30232..807eb6ef4f91 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -17,9 +17,7 @@
 #include "nfs4session.h"
 #include "nfs4trace.h"
 
-#ifdef NFS_DEBUG
 #define NFSDBG_FACILITY NFSDBG_CALLBACK
-#endif
 
 __be32 nfs4_callback_getattr(struct cb_getattrargs *args,
 			     struct cb_getattrres *res,
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index 6b1697a01dde..646cdac73488 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -18,19 +18,21 @@
 #include "internal.h"
 #include "nfs4session.h"
 
-#define CB_OP_TAGLEN_MAXSZ	(512)
-#define CB_OP_HDR_RES_MAXSZ	(2 + CB_OP_TAGLEN_MAXSZ)
-#define CB_OP_GETATTR_BITMAP_MAXSZ	(4)
-#define CB_OP_GETATTR_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ + \
-				CB_OP_GETATTR_BITMAP_MAXSZ + \
-				2 + 2 + 3 + 3)
-#define CB_OP_RECALL_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
+#define CB_OP_TAGLEN_MAXSZ		(512)
+#define CB_OP_HDR_RES_MAXSZ		(2 * 4) // opcode, status
+#define CB_OP_GETATTR_BITMAP_MAXSZ	(4 * 4) // bitmap length, 3 bitmaps
+#define CB_OP_GETATTR_RES_MAXSZ		(CB_OP_HDR_RES_MAXSZ + \
+					 CB_OP_GETATTR_BITMAP_MAXSZ + \
+					 /* change, size, ctime, mtime */\
+					 (2 + 2 + 3 + 3) * 4)
+#define CB_OP_RECALL_RES_MAXSZ		(CB_OP_HDR_RES_MAXSZ)
 
 #if defined(CONFIG_NFS_V4_1)
 #define CB_OP_LAYOUTRECALL_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
 #define CB_OP_DEVICENOTIFY_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
 #define CB_OP_SEQUENCE_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ + \
-					4 + 1 + 3)
+					 NFS4_MAX_SESSIONID_LEN + \
+					 (1 + 3) * 4) // seqid, 3 slotids
 #define CB_OP_RECALLANY_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
 #define CB_OP_RECALLSLOT_RES_MAXSZ	(CB_OP_HDR_RES_MAXSZ)
 #endif /* CONFIG_NFS_V4_1 */
@@ -157,7 +159,7 @@ static __be32 decode_compound_hdr_arg(struct xdr_stream *xdr, struct cb_compound
 	if (unlikely(status != 0))
 		return status;
 	/* We do not like overly long tags! */
-	if (hdr->taglen > CB_OP_TAGLEN_MAXSZ - 12) {
+	if (hdr->taglen > CB_OP_TAGLEN_MAXSZ) {
 		printk("NFS: NFSv4 CALLBACK %s: client sent tag of length %u\n",
 				__func__, hdr->taglen);
 		return htonl(NFS4ERR_RESOURCE);
@@ -198,7 +200,6 @@ static __be32 decode_getattr_args(struct svc_rqst *rqstp, struct xdr_stream *xdr
 	status = decode_fh(xdr, &args->fh);
 	if (unlikely(status != 0))
 		goto out;
-	args->addr = svc_addr(rqstp);
 	status = decode_bitmap(xdr, args->bitmap);
 out:
 	dprintk("%s: exit with status = %d\n", __func__, ntohl(status));
@@ -210,7 +211,6 @@ static __be32 decode_recall_args(struct svc_rqst *rqstp, struct xdr_stream *xdr,
 	__be32 *p;
 	__be32 status;
 
-	args->addr = svc_addr(rqstp);
 	status = decode_stateid(xdr, &args->stateid);
 	if (unlikely(status != 0))
 		goto out;
@@ -236,7 +236,6 @@ static __be32 decode_layoutrecall_args(struct svc_rqst *rqstp,
 	__be32 status = 0;
 	uint32_t iomode;
 
-	args->cbl_addr = svc_addr(rqstp);
 	p = read_buf(xdr, 4 * sizeof(uint32_t));
 	if (unlikely(p == NULL)) {
 		status = htonl(NFS4ERR_BADXDR);
@@ -383,13 +382,12 @@ static __be32 decode_sessionid(struct xdr_stream *xdr,
 				 struct nfs4_sessionid *sid)
 {
 	__be32 *p;
-	int len = NFS4_MAX_SESSIONID_LEN;
 
-	p = read_buf(xdr, len);
+	p = read_buf(xdr, NFS4_MAX_SESSIONID_LEN);
 	if (unlikely(p == NULL))
 		return htonl(NFS4ERR_RESOURCE);
 
-	memcpy(sid->data, p, len);
+	memcpy(sid->data, p, NFS4_MAX_SESSIONID_LEN);
 	return 0;
 }
 
@@ -500,7 +498,6 @@ static __be32 decode_recallany_args(struct svc_rqst *rqstp,
 	uint32_t bitmap[2];
 	__be32 *p, status;
 
-	args->craa_addr = svc_addr(rqstp);
 	p = read_buf(xdr, 4);
 	if (unlikely(p == NULL))
 		return htonl(NFS4ERR_BADXDR);
@@ -519,7 +516,6 @@ static __be32 decode_recallslot_args(struct svc_rqst *rqstp,
 {
 	__be32 *p;
 
-	args->crsa_addr = svc_addr(rqstp);
 	p = read_buf(xdr, 4);
 	if (unlikely(p == NULL))
 		return htonl(NFS4ERR_BADXDR);
@@ -684,13 +680,12 @@ static __be32 encode_sessionid(struct xdr_stream *xdr,
 				 const struct nfs4_sessionid *sid)
 {
 	__be32 *p;
-	int len = NFS4_MAX_SESSIONID_LEN;
 
-	p = xdr_reserve_space(xdr, len);
+	p = xdr_reserve_space(xdr, NFS4_MAX_SESSIONID_LEN);
 	if (unlikely(p == NULL))
 		return htonl(NFS4ERR_RESOURCE);
 
-	memcpy(p, sid, len);
+	memcpy(p, sid, NFS4_MAX_SESSIONID_LEN);
 	return 0;
 }
 
@@ -704,7 +699,9 @@ static __be32 encode_cb_sequence_res(struct svc_rqst *rqstp,
 	if (unlikely(status != 0))
 		goto out;
 
-	encode_sessionid(xdr, &res->csr_sessionid);
+	status = encode_sessionid(xdr, &res->csr_sessionid);
+	if (status)
+		goto out;
 
 	p = xdr_reserve_space(xdr, 4 * sizeof(uint32_t));
 	if (unlikely(p == NULL))
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 57c5a02f6213..d6d5d2a48e83 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -764,6 +764,7 @@ static void nfs_server_set_fsinfo(struct nfs_server *server,
 
 	server->time_delta = fsinfo->time_delta;
 
+	server->clone_blksize = fsinfo->clone_blksize;
 	/* We're airborne Set socket buffersize */
 	rpc_setbufsize(server->client, server->wsize + 100, server->rsize + 100);
 }
diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c
index be806ead7f4d..5166adcfc0fb 100644
--- a/fs/nfs/delegation.c
+++ b/fs/nfs/delegation.c
@@ -721,14 +721,12 @@ int nfs_async_inode_return_delegation(struct inode *inode,
 	struct nfs_client *clp = server->nfs_client;
 	struct nfs_delegation *delegation;
 
-	filemap_flush(inode->i_mapping);
-
 	rcu_read_lock();
 	delegation = rcu_dereference(NFS_I(inode)->delegation);
 	if (delegation == NULL)
 		goto out_enoent;
-
-	if (!clp->cl_mvops->match_stateid(&delegation->stateid, stateid))
+	if (stateid != NULL &&
+	    !clp->cl_mvops->match_stateid(&delegation->stateid, stateid))
 		goto out_enoent;
 	nfs_mark_return_delegation(server, delegation);
 	rcu_read_unlock();
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index 3d8e4ffa0a33..ce5a21861074 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -1714,9 +1714,6 @@ nfs_mknod(struct inode *dir, struct dentry *dentry, umode_t mode, dev_t rdev)
 	dfprintk(VFS, "NFS: mknod(%s/%lu), %pd\n",
 			dir->i_sb->s_id, dir->i_ino, dentry);
 
-	if (!new_valid_dev(rdev))
-		return -EINVAL;
-
 	attr.ia_mode = mode;
 	attr.ia_valid = ATTR_MODE;
 
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index fbc5a56de875..03516c80855a 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -339,6 +339,19 @@ static void ff_layout_sort_mirrors(struct nfs4_ff_layout_segment *fls)
 	}
 }
 
+static void ff_layout_mark_devices_valid(struct nfs4_ff_layout_segment *fls)
+{
+	struct nfs4_deviceid_node *node;
+	int i;
+
+	if (!(fls->flags & FF_FLAGS_NO_IO_THRU_MDS))
+		return;
+	for (i = 0; i < fls->mirror_array_cnt; i++) {
+		node = &fls->mirror_array[i]->mirror_ds->id_node;
+		clear_bit(NFS_DEVICEID_UNAVAILABLE, &node->flags);
+	}
+}
+
 static struct pnfs_layout_segment *
 ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 		     struct nfs4_layoutget_res *lgr,
@@ -499,6 +512,7 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 	rc = ff_layout_check_layout(lgr);
 	if (rc)
 		goto out_err_free;
+	ff_layout_mark_devices_valid(fls);
 
 	ret = &fls->generic_hdr;
 	dprintk("<-- %s (success)\n", __func__);
@@ -741,17 +755,17 @@ ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 }
 
 static struct nfs4_pnfs_ds *
-ff_layout_choose_best_ds_for_read(struct nfs_pageio_descriptor *pgio,
+ff_layout_choose_best_ds_for_read(struct pnfs_layout_segment *lseg,
+				  int start_idx,
 				  int *best_idx)
 {
-	struct nfs4_ff_layout_segment *fls;
+	struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
 	struct nfs4_pnfs_ds *ds;
 	int idx;
 
-	fls = FF_LAYOUT_LSEG(pgio->pg_lseg);
 	/* mirrors are sorted by efficiency */
-	for (idx = 0; idx < fls->mirror_array_cnt; idx++) {
-		ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, idx, false);
+	for (idx = start_idx; idx < fls->mirror_array_cnt; idx++) {
+		ds = nfs4_ff_layout_prepare_ds(lseg, idx, false);
 		if (ds) {
 			*best_idx = idx;
 			return ds;
@@ -782,7 +796,7 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 	if (pgio->pg_lseg == NULL)
 		goto out_mds;
 
-	ds = ff_layout_choose_best_ds_for_read(pgio, &ds_idx);
+	ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx);
 	if (!ds)
 		goto out_mds;
 	mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx);
@@ -1035,7 +1049,8 @@ static int ff_layout_async_handle_error_v4(struct rpc_task *task,
 		rpc_wake_up(&tbl->slot_tbl_waitq);
 		/* fall through */
 	default:
-		if (ff_layout_has_available_ds(lseg))
+		if (ff_layout_no_fallback_to_mds(lseg) ||
+		    ff_layout_has_available_ds(lseg))
 			return -NFS4ERR_RESET_TO_PNFS;
 reset:
 		dprintk("%s Retry through MDS. Error %d\n", __func__,
@@ -1153,7 +1168,6 @@ static void ff_layout_io_track_ds_error(struct pnfs_layout_segment *lseg,
 }
 
 /* NFS_PROTO call done callback routines */
-
 static int ff_layout_read_done_cb(struct rpc_task *task,
 				struct nfs_pgio_header *hdr)
 {
@@ -1171,6 +1185,10 @@ static int ff_layout_read_done_cb(struct rpc_task *task,
 
 	switch (err) {
 	case -NFS4ERR_RESET_TO_PNFS:
+		if (ff_layout_choose_best_ds_for_read(hdr->lseg,
+					hdr->pgio_mirror_idx + 1,
+					&hdr->pgio_mirror_idx))
+			goto out_eagain;
 		set_bit(NFS_LAYOUT_RETURN_BEFORE_CLOSE,
 			&hdr->lseg->pls_layout->plh_flags);
 		pnfs_read_resend_pnfs(hdr);
@@ -1179,11 +1197,13 @@ static int ff_layout_read_done_cb(struct rpc_task *task,
 		ff_layout_reset_read(hdr);
 		return task->tk_status;
 	case -EAGAIN:
-		rpc_restart_call_prepare(task);
-		return -EAGAIN;
+		goto out_eagain;
 	}
 
 	return 0;
+out_eagain:
+	rpc_restart_call_prepare(task);
+	return -EAGAIN;
 }
 
 static bool
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.h b/fs/nfs/flexfilelayout/flexfilelayout.h
index 68cc0d9828f9..2bb08bc6aaf0 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.h
+++ b/fs/nfs/flexfilelayout/flexfilelayout.h
@@ -10,6 +10,7 @@
 #define FS_NFS_NFS4FLEXFILELAYOUT_H
 
 #define FF_FLAGS_NO_LAYOUTCOMMIT 1
+#define FF_FLAGS_NO_IO_THRU_MDS 2
 
 #include "../pnfs.h"
 
@@ -146,6 +147,12 @@ FF_LAYOUT_MIRROR_COUNT(struct pnfs_layout_segment *lseg)
 }
 
 static inline bool
+ff_layout_no_fallback_to_mds(struct pnfs_layout_segment *lseg)
+{
+	return FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_IO_THRU_MDS;
+}
+
+static inline bool
 ff_layout_test_devid_unavailable(struct nfs4_deviceid_node *node)
 {
 	return nfs4_test_deviceid_unavailable(node);
diff --git a/fs/nfs/mount_clnt.c b/fs/nfs/mount_clnt.c
index 99a45283b9ee..09b190015df4 100644
--- a/fs/nfs/mount_clnt.c
+++ b/fs/nfs/mount_clnt.c
@@ -16,9 +16,7 @@
 #include <linux/nfs_fs.h>
 #include "internal.h"
 
-#ifdef NFS_DEBUG
-# define NFSDBG_FACILITY	NFSDBG_MOUNT
-#endif
+#define NFSDBG_FACILITY	NFSDBG_MOUNT
 
 /*
  * Defined by RFC 1094, section A.3; and RFC 1813, section 5.1.4
diff --git a/fs/nfs/nfs42.h b/fs/nfs/nfs42.h
index 814c1255f1d2..b587ccd31083 100644
--- a/fs/nfs/nfs42.h
+++ b/fs/nfs/nfs42.h
@@ -17,5 +17,6 @@ int nfs42_proc_deallocate(struct file *, loff_t, loff_t);
 loff_t nfs42_proc_llseek(struct file *, loff_t, int);
 int nfs42_proc_layoutstats_generic(struct nfs_server *,
 				   struct nfs42_layoutstat_data *);
+int nfs42_proc_clone(struct file *, struct file *, loff_t, loff_t, loff_t);
 
 #endif /* __LINUX_FS_NFS_NFS4_2_H */
diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c
index 0f020e4d8421..3e92a3cde15d 100644
--- a/fs/nfs/nfs42proc.c
+++ b/fs/nfs/nfs42proc.c
@@ -271,3 +271,74 @@ int nfs42_proc_layoutstats_generic(struct nfs_server *server,
 		return PTR_ERR(task);
 	return 0;
 }
+
+static int _nfs42_proc_clone(struct rpc_message *msg, struct file *src_f,
+			     struct file *dst_f, loff_t src_offset,
+			     loff_t dst_offset, loff_t count)
+{
+	struct inode *src_inode = file_inode(src_f);
+	struct inode *dst_inode = file_inode(dst_f);
+	struct nfs_server *server = NFS_SERVER(dst_inode);
+	struct nfs42_clone_args args = {
+		.src_fh = NFS_FH(src_inode),
+		.dst_fh = NFS_FH(dst_inode),
+		.src_offset = src_offset,
+		.dst_offset = dst_offset,
+		.dst_bitmask = server->cache_consistency_bitmask,
+	};
+	struct nfs42_clone_res res = {
+		.server	= server,
+	};
+	int status;
+
+	msg->rpc_argp = &args;
+	msg->rpc_resp = &res;
+
+	status = nfs42_set_rw_stateid(&args.src_stateid, src_f, FMODE_READ);
+	if (status)
+		return status;
+
+	status = nfs42_set_rw_stateid(&args.dst_stateid, dst_f, FMODE_WRITE);
+	if (status)
+		return status;
+
+	res.dst_fattr = nfs_alloc_fattr();
+	if (!res.dst_fattr)
+		return -ENOMEM;
+
+	status = nfs4_call_sync(server->client, server, msg,
+				&args.seq_args, &res.seq_res, 0);
+	if (status == 0)
+		status = nfs_post_op_update_inode(dst_inode, res.dst_fattr);
+
+	kfree(res.dst_fattr);
+	return status;
+}
+
+int nfs42_proc_clone(struct file *src_f, struct file *dst_f,
+		     loff_t src_offset, loff_t dst_offset, loff_t count)
+{
+	struct rpc_message msg = {
+		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_CLONE],
+	};
+	struct inode *inode = file_inode(src_f);
+	struct nfs_server *server = NFS_SERVER(file_inode(src_f));
+	struct nfs4_exception exception = { };
+	int err;
+
+	if (!nfs_server_capable(inode, NFS_CAP_CLONE))
+		return -EOPNOTSUPP;
+
+	do {
+		err = _nfs42_proc_clone(&msg, src_f, dst_f, src_offset,
+					dst_offset, count);
+		if (err == -ENOTSUPP || err == -EOPNOTSUPP) {
+			NFS_SERVER(inode)->caps &= ~NFS_CAP_CLONE;
+			return -EOPNOTSUPP;
+		}
+		err = nfs4_handle_exception(server, err, &exception);
+	} while (exception.retry);
+
+	return err;
+
+}
diff --git a/fs/nfs/nfs42xdr.c b/fs/nfs/nfs42xdr.c
index 0eb29e14070d..0ca482a51e53 100644
--- a/fs/nfs/nfs42xdr.c
+++ b/fs/nfs/nfs42xdr.c
@@ -34,6 +34,12 @@
 					1 /* opaque devaddr4 length */ + \
 					XDR_QUADLEN(PNFS_LAYOUTSTATS_MAXSIZE))
 #define decode_layoutstats_maxsz	(op_decode_hdr_maxsz)
+#define encode_clone_maxsz		(encode_stateid_maxsz + \
+					encode_stateid_maxsz + \
+					2 /* src offset */ + \
+					2 /* dst offset */ + \
+					2 /* count */)
+#define decode_clone_maxsz		(op_decode_hdr_maxsz)
 
 #define NFS4_enc_allocate_sz		(compound_encode_hdr_maxsz + \
 					 encode_putfh_maxsz + \
@@ -65,7 +71,20 @@
 					 decode_sequence_maxsz + \
 					 decode_putfh_maxsz + \
 					 PNFS_LAYOUTSTATS_MAXDEV * decode_layoutstats_maxsz)
-
+#define NFS4_enc_clone_sz		(compound_encode_hdr_maxsz + \
+					 encode_sequence_maxsz + \
+					 encode_putfh_maxsz + \
+					 encode_savefh_maxsz + \
+					 encode_putfh_maxsz + \
+					 encode_clone_maxsz + \
+					 encode_getattr_maxsz)
+#define NFS4_dec_clone_sz		(compound_decode_hdr_maxsz + \
+					 decode_sequence_maxsz + \
+					 decode_putfh_maxsz + \
+					 decode_savefh_maxsz + \
+					 decode_putfh_maxsz + \
+					 decode_clone_maxsz + \
+					 decode_getattr_maxsz)
 
 static void encode_fallocate(struct xdr_stream *xdr,
 			     struct nfs42_falloc_args *args)
@@ -128,6 +147,21 @@ static void encode_layoutstats(struct xdr_stream *xdr,
 		encode_uint32(xdr, 0);
 }
 
+static void encode_clone(struct xdr_stream *xdr,
+			 struct nfs42_clone_args *args,
+			 struct compound_hdr *hdr)
+{
+	__be32 *p;
+
+	encode_op_hdr(xdr, OP_CLONE, decode_clone_maxsz, hdr);
+	encode_nfs4_stateid(xdr, &args->src_stateid);
+	encode_nfs4_stateid(xdr, &args->dst_stateid);
+	p = reserve_space(xdr, 3*8);
+	p = xdr_encode_hyper(p, args->src_offset);
+	p = xdr_encode_hyper(p, args->dst_offset);
+	xdr_encode_hyper(p, args->count);
+}
+
 /*
  * Encode ALLOCATE request
  */
@@ -206,6 +240,27 @@ static void nfs4_xdr_enc_layoutstats(struct rpc_rqst *req,
 	encode_nops(&hdr);
 }
 
+/*
+ * Encode CLONE request
+ */
+static void nfs4_xdr_enc_clone(struct rpc_rqst *req,
+			       struct xdr_stream *xdr,
+			       struct nfs42_clone_args *args)
+{
+	struct compound_hdr hdr = {
+		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
+	};
+
+	encode_compound_hdr(xdr, req, &hdr);
+	encode_sequence(xdr, &args->seq_args, &hdr);
+	encode_putfh(xdr, args->src_fh, &hdr);
+	encode_savefh(xdr, &hdr);
+	encode_putfh(xdr, args->dst_fh, &hdr);
+	encode_clone(xdr, args, &hdr);
+	encode_getfattr(xdr, args->dst_bitmask, &hdr);
+	encode_nops(&hdr);
+}
+
 static int decode_allocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res)
 {
 	return decode_op_hdr(xdr, OP_ALLOCATE);
@@ -243,6 +298,11 @@ static int decode_layoutstats(struct xdr_stream *xdr)
 	return decode_op_hdr(xdr, OP_LAYOUTSTATS);
 }
 
+static int decode_clone(struct xdr_stream *xdr)
+{
+	return decode_op_hdr(xdr, OP_CLONE);
+}
+
 /*
  * Decode ALLOCATE request
  */
@@ -351,4 +411,39 @@ out:
 	return status;
 }
 
+/*
+ * Decode CLONE request
+ */
+static int nfs4_xdr_dec_clone(struct rpc_rqst *rqstp,
+			      struct xdr_stream *xdr,
+			      struct nfs42_clone_res *res)
+{
+	struct compound_hdr hdr;
+	int status;
+
+	status = decode_compound_hdr(xdr, &hdr);
+	if (status)
+		goto out;
+	status = decode_sequence(xdr, &res->seq_res, rqstp);
+	if (status)
+		goto out;
+	status = decode_putfh(xdr);
+	if (status)
+		goto out;
+	status = decode_savefh(xdr);
+	if (status)
+		goto out;
+	status = decode_putfh(xdr);
+	if (status)
+		goto out;
+	status = decode_clone(xdr);
+	if (status)
+		goto out;
+	status = decode_getfattr(xdr, res->dst_fattr, res->server);
+
+out:
+	res->rpc_status = status;
+	return status;
+}
+
 #endif /* __LINUX_FS_NFS_NFS4_2XDR_H */
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 50cfc4ca7a02..4afdee420d25 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -183,10 +183,12 @@ struct nfs4_state {
 
 
 struct nfs4_exception {
-	long timeout;
-	int retry;
 	struct nfs4_state *state;
 	struct inode *inode;
+	long timeout;
+	unsigned char delay : 1,
+		      recovering : 1,
+		      retry : 1;
 };
 
 struct nfs4_state_recovery_ops {
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index b0dbe0abed53..4aa571956cd6 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -4,6 +4,7 @@
  *  Copyright (C) 1992  Rick Sladkey
  */
 #include <linux/fs.h>
+#include <linux/file.h>
 #include <linux/falloc.h>
 #include <linux/nfs_fs.h>
 #include "delegation.h"
@@ -192,8 +193,138 @@ static long nfs42_fallocate(struct file *filep, int mode, loff_t offset, loff_t
 		return nfs42_proc_deallocate(filep, offset, len);
 	return nfs42_proc_allocate(filep, offset, len);
 }
+
+static noinline long
+nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd,
+		  u64 src_off, u64 dst_off, u64 count)
+{
+	struct inode *dst_inode = file_inode(dst_file);
+	struct nfs_server *server = NFS_SERVER(dst_inode);
+	struct fd src_file;
+	struct inode *src_inode;
+	unsigned int bs = server->clone_blksize;
+	int ret;
+
+	/* dst file must be opened for writing */
+	if (!(dst_file->f_mode & FMODE_WRITE))
+		return -EINVAL;
+
+	ret = mnt_want_write_file(dst_file);
+	if (ret)
+		return ret;
+
+	src_file = fdget(srcfd);
+	if (!src_file.file) {
+		ret = -EBADF;
+		goto out_drop_write;
+	}
+
+	src_inode = file_inode(src_file.file);
+
+	/* src and dst must be different files */
+	ret = -EINVAL;
+	if (src_inode == dst_inode)
+		goto out_fput;
+
+	/* src file must be opened for reading */
+	if (!(src_file.file->f_mode & FMODE_READ))
+		goto out_fput;
+
+	/* src and dst must be regular files */
+	ret = -EISDIR;
+	if (!S_ISREG(src_inode->i_mode) || !S_ISREG(dst_inode->i_mode))
+		goto out_fput;
+
+	ret = -EXDEV;
+	if (src_file.file->f_path.mnt != dst_file->f_path.mnt ||
+	    src_inode->i_sb != dst_inode->i_sb)
+		goto out_fput;
+
+	/* check alignment w.r.t. clone_blksize */
+	ret = -EINVAL;
+	if (bs) {
+		if (!IS_ALIGNED(src_off, bs) || !IS_ALIGNED(dst_off, bs))
+			goto out_fput;
+		if (!IS_ALIGNED(count, bs) && i_size_read(src_inode) != (src_off + count))
+			goto out_fput;
+	}
+
+	/* XXX: do we lock at all? what if server needs CB_RECALL_LAYOUT? */
+	if (dst_inode < src_inode) {
+		mutex_lock_nested(&dst_inode->i_mutex, I_MUTEX_PARENT);
+		mutex_lock_nested(&src_inode->i_mutex, I_MUTEX_CHILD);
+	} else {
+		mutex_lock_nested(&src_inode->i_mutex, I_MUTEX_PARENT);
+		mutex_lock_nested(&dst_inode->i_mutex, I_MUTEX_CHILD);
+	}
+
+	/* flush all pending writes on both src and dst so that server
+	 * has the latest data */
+	ret = nfs_sync_inode(src_inode);
+	if (ret)
+		goto out_unlock;
+	ret = nfs_sync_inode(dst_inode);
+	if (ret)
+		goto out_unlock;
+
+	ret = nfs42_proc_clone(src_file.file, dst_file, src_off, dst_off, count);
+
+	/* truncate inode page cache of the dst range so that future reads can fetch
+	 * new data from server */
+	if (!ret)
+		truncate_inode_pages_range(&dst_inode->i_data, dst_off, dst_off + count - 1);
+
+out_unlock:
+	if (dst_inode < src_inode) {
+		mutex_unlock(&src_inode->i_mutex);
+		mutex_unlock(&dst_inode->i_mutex);
+	} else {
+		mutex_unlock(&dst_inode->i_mutex);
+		mutex_unlock(&src_inode->i_mutex);
+	}
+out_fput:
+	fdput(src_file);
+out_drop_write:
+	mnt_drop_write_file(dst_file);
+	return ret;
+}
+
+static long nfs42_ioctl_clone_range(struct file *dst_file, void __user *argp)
+{
+	struct nfs_ioctl_clone_range_args args;
+
+	if (copy_from_user(&args, argp, sizeof(args)))
+		return -EFAULT;
+
+	return nfs42_ioctl_clone(dst_file, args.src_fd, args.src_off, args.dst_off, args.count);
+}
+#else
+static long nfs42_ioctl_clone(struct file *dst_file, unsigned long srcfd,
+		u64 src_off, u64 dst_off, u64 count)
+{
+	return -ENOTTY;
+}
+
+static long nfs42_ioctl_clone_range(struct file *dst_file, void __user *argp)
+{
+	return -ENOTTY;
+}
 #endif /* CONFIG_NFS_V4_2 */
 
+long nfs4_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
+{
+	void __user *argp = (void __user *)arg;
+
+	switch (cmd) {
+	case NFS_IOC_CLONE:
+		return nfs42_ioctl_clone(file, arg, 0, 0, 0);
+	case NFS_IOC_CLONE_RANGE:
+		return nfs42_ioctl_clone_range(file, argp);
+	}
+
+	return -ENOTTY;
+}
+
 const struct file_operations nfs4_file_operations = {
 #ifdef CONFIG_NFS_V4_2
 	.llseek		= nfs4_file_llseek,
@@ -216,4 +347,9 @@ const struct file_operations nfs4_file_operations = {
 #endif /* CONFIG_NFS_V4_2 */
 	.check_flags	= nfs_check_flags,
 	.setlease	= simple_nosetlease,
+#ifdef CONFIG_COMPAT
+	.unlocked_ioctl = nfs4_ioctl,
+#else
+	.compat_ioctl	= nfs4_ioctl,
+#endif /* CONFIG_COMPAT */
 };
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 0e5ff69455c7..ff5bddc49a2a 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -78,7 +78,6 @@ struct nfs4_opendata;
 static int _nfs4_proc_open(struct nfs4_opendata *data);
 static int _nfs4_recover_proc_open(struct nfs4_opendata *data);
 static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *);
-static int nfs4_async_handle_error(struct rpc_task *, const struct nfs_server *, struct nfs4_state *, long *);
 static void nfs_fixup_referral_attributes(struct nfs_fattr *fattr);
 static int nfs4_proc_getattr(struct nfs_server *, struct nfs_fh *, struct nfs_fattr *, struct nfs4_label *label);
 static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr, struct nfs4_label *label);
@@ -239,6 +238,7 @@ const u32 nfs4_fsinfo_bitmap[3] = { FATTR4_WORD0_MAXFILESIZE
 			FATTR4_WORD1_TIME_DELTA
 			| FATTR4_WORD1_FS_LAYOUT_TYPES,
 			FATTR4_WORD2_LAYOUT_BLKSIZE
+			| FATTR4_WORD2_CLONE_BLKSIZE
 };
 
 const u32 nfs4_fs_locations_bitmap[3] = {
@@ -344,13 +344,16 @@ static int nfs4_delay(struct rpc_clnt *clnt, long *timeout)
 /* This is the error handling routine for processes that are allowed
  * to sleep.
  */
-int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception)
+static int nfs4_do_handle_exception(struct nfs_server *server,
+		int errorcode, struct nfs4_exception *exception)
 {
 	struct nfs_client *clp = server->nfs_client;
 	struct nfs4_state *state = exception->state;
 	struct inode *inode = exception->inode;
 	int ret = errorcode;
 
+	exception->delay = 0;
+	exception->recovering = 0;
 	exception->retry = 0;
 	switch(errorcode) {
 		case 0:
@@ -359,11 +362,9 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_
 		case -NFS4ERR_DELEG_REVOKED:
 		case -NFS4ERR_ADMIN_REVOKED:
 		case -NFS4ERR_BAD_STATEID:
-			if (inode && nfs4_have_delegation(inode, FMODE_READ)) {
-				nfs4_inode_return_delegation(inode);
-				exception->retry = 1;
-				return 0;
-			}
+			if (inode && nfs_async_inode_return_delegation(inode,
+						NULL) == 0)
+				goto wait_on_recovery;
 			if (state == NULL)
 				break;
 			ret = nfs4_schedule_stateid_recovery(server, state);
@@ -409,11 +410,12 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_
 				ret = -EBUSY;
 				break;
 			}
-		case -NFS4ERR_GRACE:
 		case -NFS4ERR_DELAY:
-			ret = nfs4_delay(server->client, &exception->timeout);
-			if (ret != 0)
-				break;
+			nfs_inc_server_stats(server, NFSIOS_DELAY);
+		case -NFS4ERR_GRACE:
+			exception->delay = 1;
+			return 0;
+
 		case -NFS4ERR_RETRY_UNCACHED_REP:
 		case -NFS4ERR_OLD_STATEID:
 			exception->retry = 1;
@@ -434,14 +436,85 @@ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_
 	/* We failed to handle the error */
 	return nfs4_map_errors(ret);
 wait_on_recovery:
-	ret = nfs4_wait_clnt_recover(clp);
+	exception->recovering = 1;
+	return 0;
+}
+
+/* This is the error handling routine for processes that are allowed
+ * to sleep.
+ */
+int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception)
+{
+	struct nfs_client *clp = server->nfs_client;
+	int ret;
+
+	ret = nfs4_do_handle_exception(server, errorcode, exception);
+	if (exception->delay) {
+		ret = nfs4_delay(server->client, &exception->timeout);
+		goto out_retry;
+	}
+	if (exception->recovering) {
+		ret = nfs4_wait_clnt_recover(clp);
+		if (test_bit(NFS_MIG_FAILED, &server->mig_status))
+			return -EIO;
+		goto out_retry;
+	}
+	return ret;
+out_retry:
+	if (ret == 0)
+		exception->retry = 1;
+	return ret;
+}
+
+static int
+nfs4_async_handle_exception(struct rpc_task *task, struct nfs_server *server,
+		int errorcode, struct nfs4_exception *exception)
+{
+	struct nfs_client *clp = server->nfs_client;
+	int ret;
+
+	ret = nfs4_do_handle_exception(server, errorcode, exception);
+	if (exception->delay) {
+		rpc_delay(task, nfs4_update_delay(&exception->timeout));
+		goto out_retry;
+	}
+	if (exception->recovering) {
+		rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
+		if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0)
+			rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
+		goto out_retry;
+	}
 	if (test_bit(NFS_MIG_FAILED, &server->mig_status))
-		return -EIO;
+		ret = -EIO;
+	return ret;
+out_retry:
 	if (ret == 0)
 		exception->retry = 1;
 	return ret;
 }
 
+static int
+nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server,
+			struct nfs4_state *state, long *timeout)
+{
+	struct nfs4_exception exception = {
+		.state = state,
+	};
+
+	if (task->tk_status >= 0)
+		return 0;
+	if (timeout)
+		exception.timeout = *timeout;
+	task->tk_status = nfs4_async_handle_exception(task, server,
+			task->tk_status,
+			&exception);
+	if (exception.delay && timeout)
+		*timeout = exception.timeout;
+	if (exception.retry)
+		return -EAGAIN;
+	return 0;
+}
+
 /*
  * Return 'true' if 'clp' is using an rpc_client that is integrity protected
  * or 'false' otherwise.
@@ -4530,7 +4603,7 @@ static inline int nfs4_server_supports_acls(struct nfs_server *server)
 #define NFS4ACL_MAXPAGES DIV_ROUND_UP(XATTR_SIZE_MAX, PAGE_SIZE)
 
 static int buf_to_pages_noslab(const void *buf, size_t buflen,
-		struct page **pages, unsigned int *pgbase)
+		struct page **pages)
 {
 	struct page *newpage, **spages;
 	int rc = 0;
@@ -4674,7 +4747,6 @@ static ssize_t __nfs4_get_acl_uncached(struct inode *inode, void *buf, size_t bu
 		goto out_free;
 
 	args.acl_len = npages * PAGE_SIZE;
-	args.acl_pgbase = 0;
 
 	dprintk("%s  buf %p buflen %zu npages %d args.acl_len %zu\n",
 		__func__, buf, buflen, npages, args.acl_len);
@@ -4766,7 +4838,7 @@ static int __nfs4_proc_set_acl(struct inode *inode, const void *buf, size_t bufl
 		return -EOPNOTSUPP;
 	if (npages > ARRAY_SIZE(pages))
 		return -ERANGE;
-	i = buf_to_pages_noslab(buf, buflen, arg.acl_pages, &arg.acl_pgbase);
+	i = buf_to_pages_noslab(buf, buflen, arg.acl_pages);
 	if (i < 0)
 		return i;
 	nfs4_inode_return_delegation(inode);
@@ -4955,79 +5027,6 @@ out:
 #endif	/* CONFIG_NFS_V4_SECURITY_LABEL */
 
 
-static int
-nfs4_async_handle_error(struct rpc_task *task, const struct nfs_server *server,
-			struct nfs4_state *state, long *timeout)
-{
-	struct nfs_client *clp = server->nfs_client;
-
-	if (task->tk_status >= 0)
-		return 0;
-	switch(task->tk_status) {
-		case -NFS4ERR_DELEG_REVOKED:
-		case -NFS4ERR_ADMIN_REVOKED:
-		case -NFS4ERR_BAD_STATEID:
-		case -NFS4ERR_OPENMODE:
-			if (state == NULL)
-				break;
-			if (nfs4_schedule_stateid_recovery(server, state) < 0)
-				goto recovery_failed;
-			goto wait_on_recovery;
-		case -NFS4ERR_EXPIRED:
-			if (state != NULL) {
-				if (nfs4_schedule_stateid_recovery(server, state) < 0)
-					goto recovery_failed;
-			}
-		case -NFS4ERR_STALE_STATEID:
-		case -NFS4ERR_STALE_CLIENTID:
-			nfs4_schedule_lease_recovery(clp);
-			goto wait_on_recovery;
-		case -NFS4ERR_MOVED:
-			if (nfs4_schedule_migration_recovery(server) < 0)
-				goto recovery_failed;
-			goto wait_on_recovery;
-		case -NFS4ERR_LEASE_MOVED:
-			nfs4_schedule_lease_moved_recovery(clp);
-			goto wait_on_recovery;
-#if defined(CONFIG_NFS_V4_1)
-		case -NFS4ERR_BADSESSION:
-		case -NFS4ERR_BADSLOT:
-		case -NFS4ERR_BAD_HIGH_SLOT:
-		case -NFS4ERR_DEADSESSION:
-		case -NFS4ERR_CONN_NOT_BOUND_TO_SESSION:
-		case -NFS4ERR_SEQ_FALSE_RETRY:
-		case -NFS4ERR_SEQ_MISORDERED:
-			dprintk("%s ERROR %d, Reset session\n", __func__,
-				task->tk_status);
-			nfs4_schedule_session_recovery(clp->cl_session, task->tk_status);
-			goto wait_on_recovery;
-#endif /* CONFIG_NFS_V4_1 */
-		case -NFS4ERR_DELAY:
-			nfs_inc_server_stats(server, NFSIOS_DELAY);
-			rpc_delay(task, nfs4_update_delay(timeout));
-			goto restart_call;
-		case -NFS4ERR_GRACE:
-			rpc_delay(task, NFS4_POLL_RETRY_MAX);
-		case -NFS4ERR_RETRY_UNCACHED_REP:
-		case -NFS4ERR_OLD_STATEID:
-			goto restart_call;
-	}
-	task->tk_status = nfs4_map_errors(task->tk_status);
-	return 0;
-recovery_failed:
-	task->tk_status = -EIO;
-	return 0;
-wait_on_recovery:
-	rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
-	if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0)
-		rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
-	if (test_bit(NFS_MIG_FAILED, &server->mig_status))
-		goto recovery_failed;
-restart_call:
-	task->tk_status = 0;
-	return -EAGAIN;
-}
-
 static void nfs4_init_boot_verifier(const struct nfs_client *clp,
 				    nfs4_verifier *bootverf)
 {
@@ -5522,7 +5521,7 @@ struct nfs4_unlockdata {
 	struct nfs4_lock_state *lsp;
 	struct nfs_open_context *ctx;
 	struct file_lock fl;
-	const struct nfs_server *server;
+	struct nfs_server *server;
 	unsigned long timestamp;
 };
 
@@ -8718,7 +8717,8 @@ static const struct nfs4_minor_version_ops nfs_v4_2_minor_ops = {
 		| NFS_CAP_ALLOCATE
 		| NFS_CAP_DEALLOCATE
 		| NFS_CAP_SEEK
-		| NFS_CAP_LAYOUTSTATS,
+		| NFS_CAP_LAYOUTSTATS
+		| NFS_CAP_CLONE,
 	.init_client = nfs41_init_client,
 	.shutdown_client = nfs41_shutdown_client,
 	.match_stateid = nfs41_match_stateid,
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 788adf3897c7..dfed4f5c8fcc 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -1659,7 +1659,7 @@ encode_setacl(struct xdr_stream *xdr, struct nfs_setaclargs *arg, struct compoun
 	*p = cpu_to_be32(FATTR4_WORD0_ACL);
 	p = reserve_space(xdr, 4);
 	*p = cpu_to_be32(arg->acl_len);
-	xdr_write_pages(xdr, arg->acl_pages, arg->acl_pgbase, arg->acl_len);
+	xdr_write_pages(xdr, arg->acl_pages, 0, arg->acl_len);
 }
 
 static void
@@ -2491,7 +2491,7 @@ static void nfs4_xdr_enc_getacl(struct rpc_rqst *req, struct xdr_stream *xdr,
 	encode_getattr_two(xdr, FATTR4_WORD0_ACL, 0, &hdr);
 
 	xdr_inline_pages(&req->rq_rcv_buf, replen << 2,
-		args->acl_pages, args->acl_pgbase, args->acl_len);
+		args->acl_pages, 0, args->acl_len);
 
 	encode_nops(&hdr);
 }
@@ -4375,6 +4375,11 @@ static int decode_statfs(struct xdr_stream *xdr, struct nfs_fsstat *fsstat)
 		goto xdr_error;
 	if ((status = decode_attr_files_total(xdr, bitmap, &fsstat->tfiles)) != 0)
 		goto xdr_error;
+
+	status = -EIO;
+	if (unlikely(bitmap[0]))
+		goto xdr_error;
+
 	if ((status = decode_attr_space_avail(xdr, bitmap, &fsstat->abytes)) != 0)
 		goto xdr_error;
 	if ((status = decode_attr_space_free(xdr, bitmap, &fsstat->fbytes)) != 0)
@@ -4574,6 +4579,10 @@ static int decode_getfattr_attrs(struct xdr_stream *xdr, uint32_t *bitmap,
 		goto xdr_error;
 	fattr->valid |= status;
 
+	status = -EIO;
+	if (unlikely(bitmap[0]))
+		goto xdr_error;
+
 	status = decode_attr_mode(xdr, bitmap, &fmode);
 	if (status < 0)
 		goto xdr_error;
@@ -4627,6 +4636,10 @@ static int decode_getfattr_attrs(struct xdr_stream *xdr, uint32_t *bitmap,
 		goto xdr_error;
 	fattr->valid |= status;
 
+	status = -EIO;
+	if (unlikely(bitmap[1]))
+		goto xdr_error;
+
 	status = decode_attr_mdsthreshold(xdr, bitmap, fattr->mdsthreshold);
 	if (status < 0)
 		goto xdr_error;
@@ -4764,6 +4777,28 @@ static int decode_attr_layout_blksize(struct xdr_stream *xdr, uint32_t *bitmap,
 	return 0;
 }
 
+/*
+ * The granularity of a CLONE operation.
+ */
+static int decode_attr_clone_blksize(struct xdr_stream *xdr, uint32_t *bitmap,
+				     uint32_t *res)
+{
+	__be32 *p;
+
+	dprintk("%s: bitmap is %x\n", __func__, bitmap[2]);
+	*res = 0;
+	if (bitmap[2] & FATTR4_WORD2_CLONE_BLKSIZE) {
+		p = xdr_inline_decode(xdr, 4);
+		if (unlikely(!p)) {
+			print_overflow_msg(__func__, xdr);
+			return -EIO;
+		}
+		*res = be32_to_cpup(p);
+		bitmap[2] &= ~FATTR4_WORD2_CLONE_BLKSIZE;
+	}
+	return 0;
+}
+
 static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo)
 {
 	unsigned int savep;
@@ -4789,15 +4824,28 @@ static int decode_fsinfo(struct xdr_stream *xdr, struct nfs_fsinfo *fsinfo)
 	if ((status = decode_attr_maxwrite(xdr, bitmap, &fsinfo->wtmax)) != 0)
 		goto xdr_error;
 	fsinfo->wtpref = fsinfo->wtmax;
+
+	status = -EIO;
+	if (unlikely(bitmap[0]))
+		goto xdr_error;
+
 	status = decode_attr_time_delta(xdr, bitmap, &fsinfo->time_delta);
 	if (status != 0)
 		goto xdr_error;
 	status = decode_attr_pnfstype(xdr, bitmap, &fsinfo->layouttype);
 	if (status != 0)
 		goto xdr_error;
+
+	status = -EIO;
+	if (unlikely(bitmap[1]))
+		goto xdr_error;
+
 	status = decode_attr_layout_blksize(xdr, bitmap, &fsinfo->blksize);
 	if (status)
 		goto xdr_error;
+	status = decode_attr_clone_blksize(xdr, bitmap, &fsinfo->clone_blksize);
+	if (status)
+		goto xdr_error;
 
 	status = verify_attr_len(xdr, savep, attrlen);
 xdr_error:
@@ -7465,6 +7513,7 @@ struct rpc_procinfo	nfs4_procedures[] = {
 	PROC(ALLOCATE,		enc_allocate,		dec_allocate),
 	PROC(DEALLOCATE,	enc_deallocate,		dec_deallocate),
 	PROC(LAYOUTSTATS,	enc_layoutstats,	dec_layoutstats),
+	PROC(CLONE,		enc_clone,		dec_clone),
 #endif /* CONFIG_NFS_V4_2 */
 };
 
diff --git a/fs/nfs/nfsroot.c b/fs/nfs/nfsroot.c
index 9bc9f04fb7f6..89a15dbe5efc 100644
--- a/fs/nfs/nfsroot.c
+++ b/fs/nfs/nfsroot.c
@@ -90,7 +90,7 @@
 #define NFS_DEF_OPTIONS		"vers=2,udp,rsize=4096,wsize=4096"
 
 /* Parameters passed from the kernel command line */
-static char nfs_root_parms[256] __initdata = "";
+static char nfs_root_parms[NFS_MAXPATHLEN + 1] __initdata = "";
 
 /* Text-based mount options passed to super.c */
 static char nfs_root_options[256] __initdata = NFS_DEF_OPTIONS;
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 8abe27165ad0..93496c059837 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -1912,12 +1912,13 @@ static void pnfs_ld_handle_write_error(struct nfs_pgio_header *hdr)
  */
 void pnfs_ld_write_done(struct nfs_pgio_header *hdr)
 {
-	trace_nfs4_pnfs_write(hdr, hdr->pnfs_error);
-	if (!hdr->pnfs_error) {
+	if (likely(!hdr->pnfs_error)) {
 		pnfs_set_layoutcommit(hdr->inode, hdr->lseg,
 				hdr->mds_offset + hdr->res.count);
 		hdr->mds_ops->rpc_call_done(&hdr->task, hdr);
-	} else
+	}
+	trace_nfs4_pnfs_write(hdr, hdr->pnfs_error);
+	if (unlikely(hdr->pnfs_error))
 		pnfs_ld_handle_write_error(hdr);
 	hdr->mds_ops->rpc_release(hdr);
 }
@@ -2028,11 +2029,12 @@ static void pnfs_ld_handle_read_error(struct nfs_pgio_header *hdr)
  */
 void pnfs_ld_read_done(struct nfs_pgio_header *hdr)
 {
-	trace_nfs4_pnfs_read(hdr, hdr->pnfs_error);
 	if (likely(!hdr->pnfs_error)) {
 		__nfs4_read_done_cb(hdr);
 		hdr->mds_ops->rpc_call_done(&hdr->task, hdr);
-	} else
+	}
+	trace_nfs4_pnfs_read(hdr, hdr->pnfs_error);
+	if (unlikely(hdr->pnfs_error))
 		pnfs_ld_handle_read_error(hdr);
 	hdr->mds_ops->rpc_release(hdr);
 }
diff --git a/fs/nfs/read.c b/fs/nfs/read.c
index 01b8cc8e8cfc..0a5e33f33b5c 100644
--- a/fs/nfs/read.c
+++ b/fs/nfs/read.c
@@ -246,6 +246,13 @@ static void nfs_readpage_retry(struct rpc_task *task,
 		nfs_set_pgio_error(hdr, -EIO, argp->offset);
 		return;
 	}
+
+	/* For non rpc-based layout drivers, retry-through-MDS */
+	if (!task->tk_ops) {
+		hdr->pnfs_error = -EAGAIN;
+		return;
+	}
+
 	/* Yes, so retry the read at the end of the hdr */
 	hdr->mds_offset += resp->count;
 	argp->offset += resp->count;
@@ -268,7 +275,7 @@ static void nfs_readpage_result(struct rpc_task *task,
 			hdr->good_bytes = bound - hdr->io_start;
 		}
 		spin_unlock(&hdr->lock);
-	} else if (hdr->res.count != hdr->args.count)
+	} else if (hdr->res.count < hdr->args.count)
 		nfs_readpage_retry(task, hdr);
 }
 
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index 383a027de452..f1268280244e 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -2816,7 +2816,6 @@ out_invalid_transport_udp:
  * NFS client for backwards compatibility
  */
 unsigned int nfs_callback_set_tcpport;
-unsigned short nfs_callback_tcpport;
 /* Default cache timeout is 10 minutes */
 unsigned int nfs_idmap_cache_timeout = 600;
 /* Turn off NFSv4 uid/gid mapping when using AUTH_SYS */
@@ -2827,7 +2826,6 @@ char nfs4_client_id_uniquifier[NFS4_CLIENT_ID_UNIQ_LEN] = "";
 bool recover_lost_locks = false;
 
 EXPORT_SYMBOL_GPL(nfs_callback_set_tcpport);
-EXPORT_SYMBOL_GPL(nfs_callback_tcpport);
 EXPORT_SYMBOL_GPL(nfs_idmap_cache_timeout);
 EXPORT_SYMBOL_GPL(nfs4_disable_idmapping);
 EXPORT_SYMBOL_GPL(max_session_slots);
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 75ab7622e0cc..7b9316406930 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -1505,6 +1505,13 @@ static void nfs_writeback_result(struct rpc_task *task,
 			task->tk_status = -EIO;
 			return;
 		}
+
+		/* For non rpc-based layout drivers, retry-through-MDS */
+		if (!task->tk_ops) {
+			hdr->pnfs_error = -EAGAIN;
+			return;
+		}
+
 		/* Was this an NFSv2 write or an NFSv3 stable write? */
 		if (resp->verf->committed != NFS_UNSTABLE) {
 			/* Resend from where the server left off */
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 00121f298269..e7e78537aea2 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -130,6 +130,7 @@ enum nfs_opnum4 {
 	OP_READ_PLUS = 68,
 	OP_SEEK = 69,
 	OP_WRITE_SAME = 70,
+	OP_CLONE = 71,
 
 	OP_ILLEGAL = 10044,
 };
@@ -421,6 +422,7 @@ enum lock_type4 {
 #define FATTR4_WORD2_LAYOUT_TYPES       (1UL << 0)
 #define FATTR4_WORD2_LAYOUT_BLKSIZE     (1UL << 1)
 #define FATTR4_WORD2_MDSTHRESHOLD       (1UL << 4)
+#define FATTR4_WORD2_CLONE_BLKSIZE	(1UL << 13)
 #define FATTR4_WORD2_SECURITY_LABEL     (1UL << 16)
 
 /* MDS threshold bitmap bits */
@@ -501,6 +503,7 @@ enum {
 	NFSPROC4_CLNT_ALLOCATE,
 	NFSPROC4_CLNT_DEALLOCATE,
 	NFSPROC4_CLNT_LAYOUTSTATS,
+	NFSPROC4_CLNT_CLONE,
 };
 
 /* nfs41 types */
diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
index 570a7df2775b..2469ab0bb3a1 100644
--- a/include/linux/nfs_fs_sb.h
+++ b/include/linux/nfs_fs_sb.h
@@ -147,6 +147,7 @@ struct nfs_server {
 	unsigned int		acdirmax;
 	unsigned int		namelen;
 	unsigned int		options;	/* extra options enabled by mount */
+	unsigned int		clone_blksize;	/* granularity of a CLONE operation */
 #define NFS_OPTION_FSCACHE	0x00000001	/* - local caching enabled */
 #define NFS_OPTION_MIGRATION	0x00000002	/* - NFSv4 migration enabled */
 
@@ -243,5 +244,6 @@ struct nfs_server {
 #define NFS_CAP_ALLOCATE	(1U << 20)
 #define NFS_CAP_DEALLOCATE	(1U << 21)
 #define NFS_CAP_LAYOUTSTATS	(1U << 22)
+#define NFS_CAP_CLONE		(1U << 23)
 
 #endif
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 52faf7e96c65..570d630f98ae 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -141,6 +141,7 @@ struct nfs_fsinfo {
 	__u32			lease_time; /* in seconds */
 	__u32			layouttype; /* supported pnfs layout driver */
 	__u32			blksize; /* preferred pnfs io block size */
+	__u32			clone_blksize; /* granularity of a CLONE operation */
 };
 
 struct nfs_fsstat {
@@ -359,6 +360,25 @@ struct nfs42_layoutstat_data {
 	struct nfs42_layoutstat_res res;
 };
 
+struct nfs42_clone_args {
+	struct nfs4_sequence_args	seq_args;
+	struct nfs_fh			*src_fh;
+	struct nfs_fh			*dst_fh;
+	nfs4_stateid			src_stateid;
+	nfs4_stateid			dst_stateid;
+	__u64				src_offset;
+	__u64				dst_offset;
+	__u64				count;
+	const u32			*dst_bitmask;
+};
+
+struct nfs42_clone_res {
+	struct nfs4_sequence_res	seq_res;
+	unsigned int			rpc_status;
+	struct nfs_fattr		*dst_fattr;
+	const struct nfs_server		*server;
+};
+
 struct stateowner_id {
 	__u64	create_time;
 	__u32	uniquifier;
@@ -528,7 +548,7 @@ struct nfs4_delegreturnargs {
 struct nfs4_delegreturnres {
 	struct nfs4_sequence_res	seq_res;
 	struct nfs_fattr * fattr;
-	const struct nfs_server *server;
+	struct nfs_server *server;
 };
 
 /*
@@ -601,7 +621,7 @@ struct nfs_removeargs {
 
 struct nfs_removeres {
 	struct nfs4_sequence_res 	seq_res;
-	const struct nfs_server *server;
+	struct nfs_server *server;
 	struct nfs_fattr	*dir_attr;
 	struct nfs4_change_info	cinfo;
 };
@@ -619,7 +639,7 @@ struct nfs_renameargs {
 
 struct nfs_renameres {
 	struct nfs4_sequence_res	seq_res;
-	const struct nfs_server		*server;
+	struct nfs_server		*server;
 	struct nfs4_change_info		old_cinfo;
 	struct nfs_fattr		*old_fattr;
 	struct nfs4_change_info		new_cinfo;
@@ -685,7 +705,6 @@ struct nfs_setaclargs {
 	struct nfs4_sequence_args	seq_args;
 	struct nfs_fh *			fh;
 	size_t				acl_len;
-	unsigned int			acl_pgbase;
 	struct page **			acl_pages;
 };
 
@@ -697,7 +716,6 @@ struct nfs_getaclargs {
 	struct nfs4_sequence_args 	seq_args;
 	struct nfs_fh *			fh;
 	size_t				acl_len;
-	unsigned int			acl_pgbase;
 	struct page **			acl_pages;
 };
 
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index 8df43c9f11dc..4397a4824c81 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -38,6 +38,11 @@ void xprt_free_bc_request(struct rpc_rqst *req);
 int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs);
 void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
 
+/* Socket backchannel transport methods */
+int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
+void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
+void xprt_free_bc_rqst(struct rpc_rqst *req);
+
 /*
  * Determine if a shared backchannel is in use
  */
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 1e4438ea2380..f869807a0d0e 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -226,9 +226,13 @@ extern void svc_rdma_put_frmr(struct svcxprt_rdma *,
 			      struct svc_rdma_fastreg_mr *);
 extern void svc_sq_reap(struct svcxprt_rdma *);
 extern void svc_rq_reap(struct svcxprt_rdma *);
-extern struct svc_xprt_class svc_rdma_class;
 extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
 
+extern struct svc_xprt_class svc_rdma_class;
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+extern struct svc_xprt_class svc_rdma_bc_class;
+#endif
+
 /* svc_rdma.c */
 extern int svc_rdma_init(void);
 extern void svc_rdma_cleanup(void);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 0fb9acbb4780..69ef5b3ab038 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -54,6 +54,8 @@ enum rpc_display_format_t {
 struct rpc_task;
 struct rpc_xprt;
 struct seq_file;
+struct svc_serv;
+struct net;
 
 /*
  * This describes a complete RPC request
@@ -136,6 +138,12 @@ struct rpc_xprt_ops {
 	int		(*enable_swap)(struct rpc_xprt *xprt);
 	void		(*disable_swap)(struct rpc_xprt *xprt);
 	void		(*inject_disconnect)(struct rpc_xprt *xprt);
+	int		(*bc_setup)(struct rpc_xprt *xprt,
+				    unsigned int min_reqs);
+	int		(*bc_up)(struct svc_serv *serv, struct net *net);
+	void		(*bc_free_rqst)(struct rpc_rqst *rqst);
+	void		(*bc_destroy)(struct rpc_xprt *xprt,
+				      unsigned int max_reqs);
 };
 
 /*
@@ -153,6 +161,7 @@ enum xprt_transports {
 	XPRT_TRANSPORT_TCP	= IPPROTO_TCP,
 	XPRT_TRANSPORT_BC_TCP	= IPPROTO_TCP | XPRT_TRANSPORT_BC,
 	XPRT_TRANSPORT_RDMA	= 256,
+	XPRT_TRANSPORT_BC_RDMA	= XPRT_TRANSPORT_RDMA | XPRT_TRANSPORT_BC,
 	XPRT_TRANSPORT_LOCAL	= 257,
 };
 
diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index 357e44c1a46b..0ece4ba06f06 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -44,6 +44,8 @@ struct sock_xprt {
 	 */
 	unsigned long		sock_state;
 	struct delayed_work	connect_worker;
+	struct work_struct	recv_worker;
+	struct mutex		recv_mutex;
 	struct sockaddr_storage	srcaddr;
 	unsigned short		srcport;
 
diff --git a/include/uapi/linux/nfs.h b/include/uapi/linux/nfs.h
index 5199a36dd574..654bae3f1a38 100644
--- a/include/uapi/linux/nfs.h
+++ b/include/uapi/linux/nfs.h
@@ -7,6 +7,8 @@
 #ifndef _UAPI_LINUX_NFS_H
 #define _UAPI_LINUX_NFS_H
 
+#include <linux/types.h>
+
 #define NFS_PROGRAM	100003
 #define NFS_PORT	2049
 #define NFS_MAXDATA	8192
@@ -31,6 +33,17 @@
 
 #define NFS_PIPE_DIRNAME "nfs"
 
+/* NFS ioctls */
+/* Let's follow btrfs lead on CLONE to avoid messing userspace */
+#define NFS_IOC_CLONE		_IOW(0x94, 9, int)
+#define NFS_IOC_CLONE_RANGE	_IOW(0x94, 13, int)
+
+struct nfs_ioctl_clone_range_args {
+	__s64 src_fd;
+	__u64 src_off, count;
+	__u64 dst_off;
+};
+
 /*
  * NFS stats. The good thing with these values is that NFSv3 errors are
  * a superset of NFSv2 errors (with the exception of NFSERR_WFLUSH which
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 6255d141133b..229956bf8457 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -138,6 +138,14 @@ out_free:
  */
 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
 {
+	if (!xprt->ops->bc_setup)
+		return 0;
+	return xprt->ops->bc_setup(xprt, min_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
+
+int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
+{
 	struct rpc_rqst *req;
 	struct list_head tmp_list;
 	int i;
@@ -192,7 +200,6 @@ out_free:
 	dprintk("RPC:       setup backchannel transport failed\n");
 	return -ENOMEM;
 }
-EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
 
 /**
  * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
@@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
  */
 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
 {
+	if (xprt->ops->bc_destroy)
+		xprt->ops->bc_destroy(xprt, max_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
+
+void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
+{
 	struct rpc_rqst *req = NULL, *tmp = NULL;
 
 	dprintk("RPC:        destroy backchannel transport\n");
@@ -227,7 +241,6 @@ out:
 	dprintk("RPC:        backchannel list empty= %s\n",
 		list_empty(&xprt->bc_pa_list) ? "true" : "false");
 }
-EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
 
 static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
 {
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
 {
 	struct rpc_xprt *xprt = req->rq_xprt;
 
+	xprt->ops->bc_free_rqst(req);
+}
+
+void xprt_free_bc_rqst(struct rpc_rqst *req)
+{
+	struct rpc_xprt *xprt = req->rq_xprt;
+
 	dprintk("RPC:       free backchannel req=%p\n", req);
 
 	req->rq_connect_cookie = xprt->connect_cookie - 1;
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a8f579df14d8..bc5b7b5032ca 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1367,11 +1367,6 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
 	/* reset result send buffer "put" position */
 	resv->iov_len = 0;
 
-	if (rqstp->rq_prot != IPPROTO_TCP) {
-		printk(KERN_ERR "No support for Non-TCP transports!\n");
-		BUG();
-	}
-
 	/*
 	 * Skip the next two words because they've already been
 	 * processed in the transport
diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c
index 887f0183b4c6..c88d9bc06f5c 100644
--- a/net/sunrpc/sysctl.c
+++ b/net/sunrpc/sysctl.c
@@ -76,7 +76,7 @@ static int
 proc_dodebug(struct ctl_table *table, int write,
 				void __user *buffer, size_t *lenp, loff_t *ppos)
 {
-	char		tmpbuf[20], c, *s;
+	char		tmpbuf[20], c, *s = NULL;
 	char __user *p;
 	unsigned int	value;
 	size_t		left, len;
@@ -103,23 +103,24 @@ proc_dodebug(struct ctl_table *table, int write,
 			return -EFAULT;
 		tmpbuf[left] = '\0';
 
-		for (s = tmpbuf, value = 0; '0' <= *s && *s <= '9'; s++, left--)
-			value = 10 * value + (*s - '0');
-		if (*s && !isspace(*s))
-			return -EINVAL;
-		while (left && isspace(*s))
-			left--, s++;
+		value = simple_strtol(tmpbuf, &s, 0);
+		if (s) {
+			left -= (s - tmpbuf);
+			if (left && !isspace(*s))
+				return -EINVAL;
+			while (left && isspace(*s))
+				left--, s++;
+		} else
+			left = 0;
 		*(unsigned int *) table->data = value;
 		/* Display the RPC tasks on writing to rpc_debug */
 		if (strcmp(table->procname, "rpc_debug") == 0)
 			rpc_show_tasks(&init_net);
 	} else {
-		if (!access_ok(VERIFY_WRITE, buffer, left))
-			return -EFAULT;
-		len = sprintf(tmpbuf, "%d", *(unsigned int *) table->data);
+		len = sprintf(tmpbuf, "0x%04x", *(unsigned int *) table->data);
 		if (len > left)
 			len = left;
-		if (__copy_to_user(buffer, tmpbuf, len))
+		if (copy_to_user(buffer, tmpbuf, len))
 			return -EFAULT;
 		if ((left -= len) > 0) {
 			if (put_user('\n', (char __user *)buffer + len))
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de240bd..33f99d3004f2 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
 	svc_rdma.o svc_rdma_transport.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
 	module.o
+rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 000000000000..2dcb44f69e53
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,394 @@
+/*
+ * Copyright (c) 2015 Oracle.  All rights reserved.
+ *
+ * Support for backward direction RPCs on RPC/RDMA.
+ */
+
+#include <linux/module.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/svc.h>
+#include <linux/sunrpc/svc_xprt.h>
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY	RPCDBG_TRANS
+#endif
+
+#define RPCRDMA_BACKCHANNEL_DEBUG
+
+static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
+				 struct rpc_rqst *rqst)
+{
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+	spin_lock(&buf->rb_reqslock);
+	list_del(&req->rl_all);
+	spin_unlock(&buf->rb_reqslock);
+
+	rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+
+	kfree(rqst);
+}
+
+static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
+				 struct rpc_rqst *rqst)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_regbuf *rb;
+	struct rpcrdma_req *req;
+	struct xdr_buf *buf;
+	size_t size;
+
+	req = rpcrdma_create_req(r_xprt);
+	if (!req)
+		return -ENOMEM;
+	req->rl_backchannel = true;
+
+	size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	if (IS_ERR(rb))
+		goto out_fail;
+	req->rl_rdmabuf = rb;
+
+	size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	if (IS_ERR(rb))
+		goto out_fail;
+	rb->rg_owner = req;
+	req->rl_sendbuf = rb;
+	/* so that rpcr_to_rdmar works when receiving a request */
+	rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
+
+	buf = &rqst->rq_snd_buf;
+	buf->head[0].iov_base = rqst->rq_buffer;
+	buf->head[0].iov_len = 0;
+	buf->tail[0].iov_base = NULL;
+	buf->tail[0].iov_len = 0;
+	buf->page_len = 0;
+	buf->len = 0;
+	buf->buflen = size;
+
+	return 0;
+
+out_fail:
+	rpcrdma_bc_free_rqst(r_xprt, rqst);
+	return -ENOMEM;
+}
+
+/* Allocate and add receive buffers to the rpcrdma_buffer's
+ * existing list of rep's. These are released when the
+ * transport is destroyed.
+ */
+static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
+				 unsigned int count)
+{
+	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+	struct rpcrdma_rep *rep;
+	unsigned long flags;
+	int rc = 0;
+
+	while (count--) {
+		rep = rpcrdma_create_rep(r_xprt);
+		if (IS_ERR(rep)) {
+			pr_err("RPC:       %s: reply buffer alloc failed\n",
+			       __func__);
+			rc = PTR_ERR(rep);
+			break;
+		}
+
+		spin_lock_irqsave(&buffers->rb_lock, flags);
+		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	}
+
+	return rc;
+}
+
+/**
+ * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of concurrent incoming requests to expect
+ *
+ * Returns 0 on success; otherwise a negative errno
+ */
+int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
+	struct rpc_rqst *rqst;
+	unsigned int i;
+	int rc;
+
+	/* The backchannel reply path returns each rpc_rqst to the
+	 * bc_pa_list _after_ the reply is sent. If the server is
+	 * faster than the client, it can send another backward
+	 * direction request before the rpc_rqst is returned to the
+	 * list. The client rejects the request in this case.
+	 *
+	 * Twice as many rpc_rqsts are prepared to ensure there is
+	 * always an rpc_rqst available as soon as a reply is sent.
+	 */
+	if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
+		goto out_err;
+
+	for (i = 0; i < (reqs << 1); i++) {
+		rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
+		if (!rqst) {
+			pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
+			       __func__);
+			goto out_free;
+		}
+
+		rqst->rq_xprt = &r_xprt->rx_xprt;
+		INIT_LIST_HEAD(&rqst->rq_list);
+		INIT_LIST_HEAD(&rqst->rq_bc_list);
+
+		if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
+			goto out_free;
+
+		spin_lock_bh(&xprt->bc_pa_lock);
+		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+		spin_unlock_bh(&xprt->bc_pa_lock);
+	}
+
+	rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
+	if (rc)
+		goto out_free;
+
+	rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
+	if (rc)
+		goto out_free;
+
+	buffer->rb_bc_srv_max_requests = reqs;
+	request_module("svcrdma");
+
+	return 0;
+
+out_free:
+	xprt_rdma_bc_destroy(xprt, reqs);
+
+out_err:
+	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
+	return -ENOMEM;
+}
+
+/**
+ * xprt_rdma_bc_up - Create transport endpoint for backchannel service
+ * @serv: server endpoint
+ * @net: network namespace
+ *
+ * The "xprt" is an implied argument: it supplies the name of the
+ * backchannel transport class.
+ *
+ * Returns zero on success, negative errno on failure
+ */
+int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
+{
+	int ret;
+
+	ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
+	if (ret < 0)
+		return ret;
+	return 0;
+}
+
+/**
+ * rpcrdma_bc_marshal_reply - Send backwards direction reply
+ * @rqst: buffer containing RPC reply data
+ *
+ * Returns zero on success.
+ */
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct rpcrdma_msg *headerp;
+	size_t rpclen;
+
+	headerp = rdmab_to_msg(req->rl_rdmabuf);
+	headerp->rm_xid = rqst->rq_xid;
+	headerp->rm_vers = rpcrdma_version;
+	headerp->rm_credit =
+			cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
+	headerp->rm_type = rdma_msg;
+	headerp->rm_body.rm_chunks[0] = xdr_zero;
+	headerp->rm_body.rm_chunks[1] = xdr_zero;
+	headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+	rpclen = rqst->rq_svec[0].iov_len;
+
+	pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
+		__func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
+	pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
+		__func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
+	pr_info("RPC:       %s:      RPC: %*ph\n",
+		__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
+
+	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
+	req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
+	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+
+	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
+	req->rl_send_iov[1].length = rpclen;
+	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
+
+	req->rl_niovs = 2;
+	return 0;
+}
+
+/**
+ * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of incoming requests to destroy; ignored
+ */
+void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpc_rqst *rqst, *tmp;
+
+	spin_lock_bh(&xprt->bc_pa_lock);
+	list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
+		list_del(&rqst->rq_bc_pa_list);
+		spin_unlock_bh(&xprt->bc_pa_lock);
+
+		rpcrdma_bc_free_rqst(r_xprt, rqst);
+
+		spin_lock_bh(&xprt->bc_pa_lock);
+	}
+	spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * xprt_rdma_bc_free_rqst - Release a backchannel rqst
+ * @rqst: request to release
+ */
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+
+	smp_mb__before_atomic();
+	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
+	clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+	smp_mb__after_atomic();
+
+	spin_lock_bh(&xprt->bc_pa_lock);
+	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+	spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * rpcrdma_bc_receive_call - Handle a backward direction call
+ * @xprt: transport receiving the call
+ * @rep: receive buffer containing the call
+ *
+ * Called in the RPC reply handler, which runs in a tasklet.
+ * Be quick about it.
+ *
+ * Operational assumptions:
+ *    o Backchannel credits are ignored, just as the NFS server
+ *      forechannel currently does
+ *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
+ *      No replay detection is done at the transport level
+ */
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
+			     struct rpcrdma_rep *rep)
+{
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+	struct rpcrdma_msg *headerp;
+	struct svc_serv *bc_serv;
+	struct rpcrdma_req *req;
+	struct rpc_rqst *rqst;
+	struct xdr_buf *buf;
+	size_t size;
+	__be32 *p;
+
+	headerp = rdmab_to_msg(rep->rr_rdmabuf);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+	pr_info("RPC:       %s: callback XID %08x, length=%u\n",
+		__func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
+	pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
+#endif
+
+	/* Sanity check:
+	 * Need at least enough bytes for RPC/RDMA header, as code
+	 * here references the header fields by array offset. Also,
+	 * backward calls are always inline, so ensure there
+	 * are some bytes beyond the RPC/RDMA header.
+	 */
+	if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
+		goto out_short;
+	p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
+	size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
+
+	/* Grab a free bc rqst */
+	spin_lock(&xprt->bc_pa_lock);
+	if (list_empty(&xprt->bc_pa_list)) {
+		spin_unlock(&xprt->bc_pa_lock);
+		goto out_overflow;
+	}
+	rqst = list_first_entry(&xprt->bc_pa_list,
+				struct rpc_rqst, rq_bc_pa_list);
+	list_del(&rqst->rq_bc_pa_list);
+	spin_unlock(&xprt->bc_pa_lock);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+	pr_info("RPC:       %s: using rqst %p\n", __func__, rqst);
+#endif
+
+	/* Prepare rqst */
+	rqst->rq_reply_bytes_recvd = 0;
+	rqst->rq_bytes_sent = 0;
+	rqst->rq_xid = headerp->rm_xid;
+	set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+
+	buf = &rqst->rq_rcv_buf;
+	memset(buf, 0, sizeof(*buf));
+	buf->head[0].iov_base = p;
+	buf->head[0].iov_len = size;
+	buf->len = size;
+
+	/* The receive buffer has to be hooked to the rpcrdma_req
+	 * so that it can be reposted after the server is done
+	 * parsing it but just before sending the backward
+	 * direction reply.
+	 */
+	req = rpcr_to_rdmar(rqst);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+	pr_info("RPC:       %s: attaching rep %p to req %p\n",
+		__func__, rep, req);
+#endif
+	req->rl_reply = rep;
+
+	/* Defeat the retransmit detection logic in send_request */
+	req->rl_connect_cookie = 0;
+
+	/* Queue rqst for ULP's callback service */
+	bc_serv = xprt->bc_serv;
+	spin_lock(&bc_serv->sv_cb_lock);
+	list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
+	spin_unlock(&bc_serv->sv_cb_lock);
+
+	wake_up(&bc_serv->sv_cb_waitq);
+
+	r_xprt->rx_stats.bcall_count++;
+	return;
+
+out_overflow:
+	pr_warn("RPC/RDMA backchannel overflow\n");
+	xprt_disconnect_done(xprt);
+	/* This receive buffer gets reposted automatically
+	 * when the connection is re-established.
+	 */
+	return;
+
+out_short:
+	pr_warn("RPC/RDMA short backward direction call\n");
+
+	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+		xprt_disconnect_done(xprt);
+	else
+		pr_warn("RPC:       %s: reposting rep %p\n",
+			__func__, rep);
+}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a1434447b0d6..88cf9e7269c2 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -256,8 +256,11 @@ frwr_sendcompletion(struct ib_wc *wc)
 
 	/* WARNING: Only wr_id and status are reliable at this point */
 	r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-	pr_warn("RPC:       %s: frmr %p flushed, status %s (%d)\n",
-		__func__, r, ib_wc_status_msg(wc->status), wc->status);
+	if (wc->status == IB_WC_WR_FLUSH_ERR)
+		dprintk("RPC:       %s: frmr %p flushed\n", __func__, r);
+	else
+		pr_warn("RPC:       %s: frmr %p error, status %s (%d)\n",
+			__func__, r, ib_wc_status_msg(wc->status), wc->status);
 	r->r.frmr.fr_state = FRMR_IS_STALE;
 }
 
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd6577467..c10d9699441c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
+		return rpcrdma_bc_marshal_reply(rqst);
+#endif
+
 	/*
 	 * rpclen gets amount of data in first buffer, which is the
 	 * pre-registered buffer.
@@ -711,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
 	spin_unlock_bh(&xprt->transport_lock);
 }
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+{
+	__be32 *p = (__be32 *)headerp;
+
+	if (headerp->rm_type != rdma_msg)
+		return false;
+	if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+		return false;
+	if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+		return false;
+	if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+		return false;
+
+	/* sanity */
+	if (p[7] != headerp->rm_xid)
+		return false;
+	/* call direction */
+	if (p[8] != cpu_to_be32(RPC_CALL))
+		return false;
+
+	return true;
+}
+#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
+
 /*
  * This function is called when an async event is posted to
  * the connection which changes the connection state. All it
@@ -723,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
 	schedule_delayed_work(&ep->rep_connect_worker, 0);
 }
 
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
  */
@@ -741,52 +777,32 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	unsigned long cwnd;
 	u32 credits;
 
-	/* Check status. If bad, signal disconnect and return rep to pool */
-	if (rep->rr_len == ~0U) {
-		rpcrdma_recv_buffer_put(rep);
-		if (r_xprt->rx_ep.rep_connected == 1) {
-			r_xprt->rx_ep.rep_connected = -EIO;
-			rpcrdma_conn_func(&r_xprt->rx_ep);
-		}
-		return;
-	}
-	if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
-		dprintk("RPC:       %s: short/invalid reply\n", __func__);
-		goto repost;
-	}
+	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
+
+	if (rep->rr_len == RPCRDMA_BAD_LEN)
+		goto out_badstatus;
+	if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+		goto out_shortreply;
+
 	headerp = rdmab_to_msg(rep->rr_rdmabuf);
-	if (headerp->rm_vers != rpcrdma_version) {
-		dprintk("RPC:       %s: invalid version %d\n",
-			__func__, be32_to_cpu(headerp->rm_vers));
-		goto repost;
-	}
+	if (headerp->rm_vers != rpcrdma_version)
+		goto out_badversion;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	if (rpcrdma_is_bcall(headerp))
+		goto out_bcall;
+#endif
 
-	/* Get XID and try for a match. */
-	spin_lock(&xprt->transport_lock);
+	/* Match incoming rpcrdma_rep to an rpcrdma_req to
+	 * get context for handling any incoming chunks.
+	 */
+	spin_lock_bh(&xprt->transport_lock);
 	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-	if (rqst == NULL) {
-		spin_unlock(&xprt->transport_lock);
-		dprintk("RPC:       %s: reply 0x%p failed "
-			"to match any request xid 0x%08x len %d\n",
-			__func__, rep, be32_to_cpu(headerp->rm_xid),
-			rep->rr_len);
-repost:
-		r_xprt->rx_stats.bad_reply_count++;
-		if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
-			rpcrdma_recv_buffer_put(rep);
+	if (!rqst)
+		goto out_nomatch;
 
-		return;
-	}
-
-	/* get request object */
 	req = rpcr_to_rdmar(rqst);
-	if (req->rl_reply) {
-		spin_unlock(&xprt->transport_lock);
-		dprintk("RPC:       %s: duplicate reply 0x%p to RPC "
-			"request 0x%p: xid 0x%08x\n", __func__, rep, req,
-			be32_to_cpu(headerp->rm_xid));
-		goto repost;
-	}
+	if (req->rl_reply)
+		goto out_duplicate;
 
 	dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
 		"                   RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +899,50 @@ badheader:
 	if (xprt->cwnd > cwnd)
 		xprt_release_rqst_cong(rqst->rq_task);
 
+	xprt_complete_rqst(rqst->rq_task, status);
+	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 			__func__, xprt, rqst, status);
-	xprt_complete_rqst(rqst->rq_task, status);
-	spin_unlock(&xprt->transport_lock);
+	return;
+
+out_badstatus:
+	rpcrdma_recv_buffer_put(rep);
+	if (r_xprt->rx_ep.rep_connected == 1) {
+		r_xprt->rx_ep.rep_connected = -EIO;
+		rpcrdma_conn_func(&r_xprt->rx_ep);
+	}
+	return;
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+out_bcall:
+	rpcrdma_bc_receive_call(r_xprt, rep);
+	return;
+#endif
+
+out_shortreply:
+	dprintk("RPC:       %s: short/invalid reply\n", __func__);
+	goto repost;
+
+out_badversion:
+	dprintk("RPC:       %s: invalid version %d\n",
+		__func__, be32_to_cpu(headerp->rm_vers));
+	goto repost;
+
+out_nomatch:
+	spin_unlock_bh(&xprt->transport_lock);
+	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
+		__func__, be32_to_cpu(headerp->rm_xid),
+		rep->rr_len);
+	goto repost;
+
+out_duplicate:
+	spin_unlock_bh(&xprt->transport_lock);
+	dprintk("RPC:       %s: "
+		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
+		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
+
+repost:
+	r_xprt->rx_stats.bad_reply_count++;
+	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+		rpcrdma_recv_buffer_put(rep);
 }
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f023a5..1b7051bdbdc8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
 		unregister_sysctl_table(svcrdma_table_header);
 		svcrdma_table_header = NULL;
 	}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	svc_unreg_xprt_class(&svc_rdma_bc_class);
+#endif
 	svc_unreg_xprt_class(&svc_rdma_class);
 	kmem_cache_destroy(svc_rdma_map_cachep);
 	kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
 
 	/* Register RDMA with the SVC transport switch */
 	svc_reg_xprt_class(&svc_rdma_class);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	svc_reg_xprt_class(&svc_rdma_bc_class);
+#endif
 	return 0;
  err1:
 	kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a266e870d870..b348b4adef29 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 					struct net *net,
 					struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
 	.xcl_ident = XPRT_TRANSPORT_RDMA,
 };
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
+					   struct sockaddr *, int, int);
+static void svc_rdma_bc_detach(struct svc_xprt *);
+static void svc_rdma_bc_free(struct svc_xprt *);
+
+static struct svc_xprt_ops svc_rdma_bc_ops = {
+	.xpo_create = svc_rdma_bc_create,
+	.xpo_detach = svc_rdma_bc_detach,
+	.xpo_free = svc_rdma_bc_free,
+	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+	.xpo_secure_port = svc_rdma_secure_port,
+};
+
+struct svc_xprt_class svc_rdma_bc_class = {
+	.xcl_name = "rdma-bc",
+	.xcl_owner = THIS_MODULE,
+	.xcl_ops = &svc_rdma_bc_ops,
+	.xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
+};
+
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
+					   struct net *net,
+					   struct sockaddr *sa, int salen,
+					   int flags)
+{
+	struct svcxprt_rdma *cma_xprt;
+	struct svc_xprt *xprt;
+
+	cma_xprt = rdma_create_xprt(serv, 0);
+	if (!cma_xprt)
+		return ERR_PTR(-ENOMEM);
+	xprt = &cma_xprt->sc_xprt;
+
+	svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
+	serv->sv_bc_xprt = xprt;
+
+	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+	return xprt;
+}
+
+static void svc_rdma_bc_detach(struct svc_xprt *xprt)
+{
+	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+}
+
+static void svc_rdma_bc_free(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+	if (xprt)
+		kfree(rdma);
+}
+#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
+
 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
 {
 	struct svc_rdma_op_ctxt *ctxt;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452bc580c..8c545f7d7525 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 static int
 xprt_rdma_enable_swap(struct rpc_xprt *xprt)
 {
-	return -EINVAL;
+	return 0;
 }
 
 static void
@@ -705,7 +705,13 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
 	.print_stats		= xprt_rdma_print_stats,
 	.enable_swap		= xprt_rdma_enable_swap,
 	.disable_swap		= xprt_rdma_disable_swap,
-	.inject_disconnect	= xprt_rdma_inject_disconnect
+	.inject_disconnect	= xprt_rdma_inject_disconnect,
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	.bc_setup		= xprt_rdma_bc_setup,
+	.bc_up			= xprt_rdma_bc_up,
+	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
+	.bc_destroy		= xprt_rdma_bc_destroy,
+#endif
 };
 
 static struct xprt_class xprt_rdma = {
@@ -732,6 +738,7 @@ void xprt_rdma_cleanup(void)
 		dprintk("RPC:       %s: xprt_unregister returned %i\n",
 			__func__, rc);
 
+	rpcrdma_destroy_wq();
 	frwr_destroy_recovery_wq();
 }
 
@@ -743,8 +750,15 @@ int xprt_rdma_init(void)
 	if (rc)
 		return rc;
 
+	rc = rpcrdma_alloc_wq();
+	if (rc) {
+		frwr_destroy_recovery_wq();
+		return rc;
+	}
+
 	rc = xprt_register_transport(&xprt_rdma);
 	if (rc) {
+		rpcrdma_destroy_wq();
 		frwr_destroy_recovery_wq();
 		return rc;
 	}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f63369bd01c5..eadd1655145a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -68,47 +68,33 @@
  * internal functions
  */
 
-/*
- * handle replies in tasklet context, using a single, global list
- * rdma tasklet function -- just turn around and call the func
- * for all replies on the list
- */
-
-static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
-static LIST_HEAD(rpcrdma_tasklets_g);
+static struct workqueue_struct *rpcrdma_receive_wq;
 
-static void
-rpcrdma_run_tasklet(unsigned long data)
+int
+rpcrdma_alloc_wq(void)
 {
-	struct rpcrdma_rep *rep;
-	unsigned long flags;
-
-	data = data;
-	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-	while (!list_empty(&rpcrdma_tasklets_g)) {
-		rep = list_entry(rpcrdma_tasklets_g.next,
-				 struct rpcrdma_rep, rr_list);
-		list_del(&rep->rr_list);
-		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+	struct workqueue_struct *recv_wq;
 
-		rpcrdma_reply_handler(rep);
+	recv_wq = alloc_workqueue("xprtrdma_receive",
+				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+				  0);
+	if (!recv_wq)
+		return -ENOMEM;
 
-		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-	}
-	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+	rpcrdma_receive_wq = recv_wq;
+	return 0;
 }
 
-static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
-
-static void
-rpcrdma_schedule_tasklet(struct list_head *sched_list)
+void
+rpcrdma_destroy_wq(void)
 {
-	unsigned long flags;
+	struct workqueue_struct *wq;
 
-	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-	list_splice_tail(sched_list, &rpcrdma_tasklets_g);
-	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-	tasklet_schedule(&rpcrdma_tasklet_g);
+	if (rpcrdma_receive_wq) {
+		wq = rpcrdma_receive_wq;
+		rpcrdma_receive_wq = NULL;
+		destroy_workqueue(wq);
+	}
 }
 
 static void
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 	}
 }
 
-static int
-rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The common case is a single send completion is waiting. By
+ * passing two WC entries to ib_poll_cq, a return code of 1
+ * means there is exactly one WC waiting and no more. We don't
+ * have to invoke ib_poll_cq again to know that the CQ has been
+ * properly drained.
+ */
+static void
+rpcrdma_sendcq_poll(struct ib_cq *cq)
 {
-	struct ib_wc *wcs;
-	int budget, count, rc;
+	struct ib_wc *pos, wcs[2];
+	int count, rc;
 
-	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
 	do {
-		wcs = ep->rep_send_wcs;
+		pos = wcs;
 
-		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-		if (rc <= 0)
-			return rc;
+		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+		if (rc < 0)
+			break;
 
 		count = rc;
 		while (count-- > 0)
-			rpcrdma_sendcq_process_wc(wcs++);
-	} while (rc == RPCRDMA_POLLSIZE && --budget);
-	return 0;
+			rpcrdma_sendcq_process_wc(pos++);
+	} while (rc == ARRAY_SIZE(wcs));
+	return;
 }
 
-/*
- * Handle send, fast_reg_mr, and local_inv completions.
- *
- * Send events are typically suppressed and thus do not result
- * in an upcall. Occasionally one is signaled, however. This
- * prevents the provider's completion queue from wrapping and
- * losing a completion.
+/* Handle provider send completion upcalls.
  */
 static void
 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-	int rc;
-
-	rc = rpcrdma_sendcq_poll(cq, ep);
-	if (rc) {
-		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
+	do {
+		rpcrdma_sendcq_poll(cq);
+	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
+}
 
-	rc = ib_req_notify_cq(cq,
-			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-	if (rc == 0)
-		return;
-	if (rc < 0) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
+static void
+rpcrdma_receive_worker(struct work_struct *work)
+{
+	struct rpcrdma_rep *rep =
+			container_of(work, struct rpcrdma_rep, rr_work);
 
-	rpcrdma_sendcq_poll(cq, ep);
+	rpcrdma_reply_handler(rep);
 }
 
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
 {
 	struct rpcrdma_rep *rep =
 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
 	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
-	list_add_tail(&rep->rr_list, sched_list);
+	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 	return;
+
 out_fail:
 	if (wc->status != IB_WC_WR_FLUSH_ERR)
 		pr_err("RPC:       %s: rep %p: %s\n",
 		       __func__, rep, ib_wc_status_msg(wc->status));
-	rep->rr_len = ~0U;
+	rep->rr_len = RPCRDMA_BAD_LEN;
 	goto out_schedule;
 }
 
-static int
-rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * struct ib_wc is 64 bytes, making the poll array potentially
+ * large. But this is at the bottom of the call chain. Further
+ * substantial work is done in another thread.
+ */
+static void
+rpcrdma_recvcq_poll(struct ib_cq *cq)
 {
-	struct list_head sched_list;
-	struct ib_wc *wcs;
-	int budget, count, rc;
+	struct ib_wc *pos, wcs[4];
+	int count, rc;
 
-	INIT_LIST_HEAD(&sched_list);
-	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
 	do {
-		wcs = ep->rep_recv_wcs;
+		pos = wcs;
 
-		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-		if (rc <= 0)
-			goto out_schedule;
+		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+		if (rc < 0)
+			break;
 
 		count = rc;
 		while (count-- > 0)
-			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
-	} while (rc == RPCRDMA_POLLSIZE && --budget);
-	rc = 0;
-
-out_schedule:
-	rpcrdma_schedule_tasklet(&sched_list);
-	return rc;
+			rpcrdma_recvcq_process_wc(pos++);
+	} while (rc == ARRAY_SIZE(wcs));
 }
 
-/*
- * Handle receive completions.
- *
- * It is reentrant but processes single events in order to maintain
- * ordering of receives to keep server credits.
- *
- * It is the responsibility of the scheduled tasklet to return
- * recv buffers to the pool. NOTE: this affects synchronization of
- * connection shutdown. That is, the structures required for
- * the completion of the reply handler must remain intact until
- * all memory has been reclaimed.
+/* Handle provider receive completion upcalls.
  */
 static void
 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-	int rc;
-
-	rc = rpcrdma_recvcq_poll(cq, ep);
-	if (rc) {
-		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
-
-	rc = ib_req_notify_cq(cq,
-			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-	if (rc == 0)
-		return;
-	if (rc < 0) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
-
-	rpcrdma_recvcq_poll(cq, ep);
+	do {
+		rpcrdma_recvcq_poll(cq);
+	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
 }
 
 static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
 	struct ib_wc wc;
-	LIST_HEAD(sched_list);
 
 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-		rpcrdma_recvcq_process_wc(&wc, &sched_list);
-	if (!list_empty(&sched_list))
-		rpcrdma_schedule_tasklet(&sched_list);
+		rpcrdma_recvcq_process_wc(&wc);
 	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
 		rpcrdma_sendcq_process_wc(&wc);
 }
@@ -623,6 +569,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	struct ib_device_attr *devattr = &ia->ri_devattr;
 	struct ib_cq *sendcq, *recvcq;
 	struct ib_cq_init_attr cq_attr = {};
+	unsigned int max_qp_wr;
 	int rc, err;
 
 	if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -631,18 +578,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 		return -ENOMEM;
 	}
 
+	if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+		dprintk("RPC:       %s: insufficient wqe's available\n",
+			__func__);
+		return -ENOMEM;
+	}
+	max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
+
 	/* check provider's send/recv wr limits */
-	if (cdata->max_requests > devattr->max_qp_wr)
-		cdata->max_requests = devattr->max_qp_wr;
+	if (cdata->max_requests > max_qp_wr)
+		cdata->max_requests = max_qp_wr;
 
 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 	ep->rep_attr.qp_context = ep;
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 	if (rc)
 		return rc;
 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
 	ep->rep_attr.cap.max_recv_sge = 1;
 	ep->rep_attr.cap.max_inline_data = 0;
@@ -670,7 +626,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
 	cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
 	sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
-			      rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
 	if (IS_ERR(sendcq)) {
 		rc = PTR_ERR(sendcq);
 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -687,7 +643,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
 	cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
 	recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
-			      rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
 	if (IS_ERR(recvcq)) {
 		rc = PTR_ERR(recvcq);
 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -886,7 +842,21 @@ retry:
 		}
 		rc = ep->rep_connected;
 	} else {
+		struct rpcrdma_xprt *r_xprt;
+		unsigned int extras;
+
 		dprintk("RPC:       %s: connected\n", __func__);
+
+		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+		if (extras) {
+			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+			if (rc)
+				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+					__func__, rc);
+				rc = 0;
+		}
 	}
 
 out:
@@ -923,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	}
 }
 
-static struct rpcrdma_req *
+struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
+	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 	struct rpcrdma_req *req;
 
 	req = kzalloc(sizeof(*req), GFP_KERNEL);
 	if (req == NULL)
 		return ERR_PTR(-ENOMEM);
 
+	INIT_LIST_HEAD(&req->rl_free);
+	spin_lock(&buffer->rb_reqslock);
+	list_add(&req->rl_all, &buffer->rb_allreqs);
+	spin_unlock(&buffer->rb_reqslock);
 	req->rl_buffer = &r_xprt->rx_buf;
 	return req;
 }
 
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -958,6 +933,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 
 	rep->rr_device = ia->ri_device;
 	rep->rr_rxprt = r_xprt;
+	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
 	return rep;
 
 out_free:
@@ -971,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	char *p;
-	size_t len;
 	int i, rc;
 
-	buf->rb_max_requests = cdata->max_requests;
+	buf->rb_max_requests = r_xprt->rx_data.max_requests;
+	buf->rb_bc_srv_max_requests = 0;
 	spin_lock_init(&buf->rb_lock);
 
-	/* Need to allocate:
-	 *   1.  arrays for send and recv pointers
-	 *   2.  arrays of struct rpcrdma_req to fill in pointers
-	 *   3.  array of struct rpcrdma_rep for replies
-	 * Send/recv buffers in req/rep need to be registered
-	 */
-	len = buf->rb_max_requests *
-		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-
-	p = kzalloc(len, GFP_KERNEL);
-	if (p == NULL) {
-		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
-			__func__, len);
-		rc = -ENOMEM;
-		goto out;
-	}
-	buf->rb_pool = p;	/* for freeing it later */
-
-	buf->rb_send_bufs = (struct rpcrdma_req **) p;
-	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
-	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
-	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
-
 	rc = ia->ri_ops->ro_init(r_xprt);
 	if (rc)
 		goto out;
 
+	INIT_LIST_HEAD(&buf->rb_send_bufs);
+	INIT_LIST_HEAD(&buf->rb_allreqs);
+	spin_lock_init(&buf->rb_reqslock);
 	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_req *req;
-		struct rpcrdma_rep *rep;
 
 		req = rpcrdma_create_req(r_xprt);
 		if (IS_ERR(req)) {
@@ -1017,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 			rc = PTR_ERR(req);
 			goto out;
 		}
-		buf->rb_send_bufs[i] = req;
+		req->rl_backchannel = false;
+		list_add(&req->rl_free, &buf->rb_send_bufs);
+	}
+
+	INIT_LIST_HEAD(&buf->rb_recv_bufs);
+	for (i = 0; i < buf->rb_max_requests + 2; i++) {
+		struct rpcrdma_rep *rep;
 
 		rep = rpcrdma_create_rep(r_xprt);
 		if (IS_ERR(rep)) {
@@ -1026,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 			rc = PTR_ERR(rep);
 			goto out;
 		}
-		buf->rb_recv_bufs[i] = rep;
+		list_add(&rep->rr_list, &buf->rb_recv_bufs);
 	}
 
 	return 0;
@@ -1035,22 +994,38 @@ out:
 	return rc;
 }
 
+static struct rpcrdma_req *
+rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
+{
+	struct rpcrdma_req *req;
+
+	req = list_first_entry(&buf->rb_send_bufs,
+			       struct rpcrdma_req, rl_free);
+	list_del(&req->rl_free);
+	return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
+{
+	struct rpcrdma_rep *rep;
+
+	rep = list_first_entry(&buf->rb_recv_bufs,
+			       struct rpcrdma_rep, rr_list);
+	list_del(&rep->rr_list);
+	return rep;
+}
+
 static void
 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 {
-	if (!rep)
-		return;
-
 	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
 	kfree(rep);
 }
 
-static void
+void
 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 {
-	if (!req)
-		return;
-
 	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
 	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
 	kfree(req);
@@ -1060,25 +1035,29 @@ void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-	int i;
 
-	/* clean up in reverse order from create
-	 *   1.  recv mr memory (mr free, then kfree)
-	 *   2.  send mr memory (mr free, then kfree)
-	 *   3.  MWs
-	 */
-	dprintk("RPC:       %s: entering\n", __func__);
+	while (!list_empty(&buf->rb_recv_bufs)) {
+		struct rpcrdma_rep *rep;
 
-	for (i = 0; i < buf->rb_max_requests; i++) {
-		if (buf->rb_recv_bufs)
-			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
-		if (buf->rb_send_bufs)
-			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+		rep = rpcrdma_buffer_get_rep_locked(buf);
+		rpcrdma_destroy_rep(ia, rep);
 	}
 
-	ia->ri_ops->ro_destroy(buf);
+	spin_lock(&buf->rb_reqslock);
+	while (!list_empty(&buf->rb_allreqs)) {
+		struct rpcrdma_req *req;
+
+		req = list_first_entry(&buf->rb_allreqs,
+				       struct rpcrdma_req, rl_all);
+		list_del(&req->rl_all);
+
+		spin_unlock(&buf->rb_reqslock);
+		rpcrdma_destroy_req(ia, req);
+		spin_lock(&buf->rb_reqslock);
+	}
+	spin_unlock(&buf->rb_reqslock);
 
-	kfree(buf->rb_pool);
+	ia->ri_ops->ro_destroy(buf);
 }
 
 struct rpcrdma_mw *
@@ -1110,53 +1089,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 	spin_unlock(&buf->rb_mwlock);
 }
 
-static void
-rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
-	buf->rb_send_bufs[--buf->rb_send_index] = req;
-	req->rl_niovs = 0;
-	if (req->rl_reply) {
-		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
-		req->rl_reply = NULL;
-	}
-}
-
 /*
  * Get a set of request/reply buffers.
  *
- * Reply buffer (if needed) is attached to send buffer upon return.
- * Rule:
- *    rb_send_index and rb_recv_index MUST always be pointing to the
- *    *next* available buffer (non-NULL). They are incremented after
- *    removing buffers, and decremented *before* returning them.
+ * Reply buffer (if available) is attached to send buffer upon return.
  */
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 {
 	struct rpcrdma_req *req;
-	unsigned long flags;
-
-	spin_lock_irqsave(&buffers->rb_lock, flags);
 
-	if (buffers->rb_send_index == buffers->rb_max_requests) {
-		spin_unlock_irqrestore(&buffers->rb_lock, flags);
-		dprintk("RPC:       %s: out of request buffers\n", __func__);
-		return ((struct rpcrdma_req *)NULL);
-	}
-
-	req = buffers->rb_send_bufs[buffers->rb_send_index];
-	if (buffers->rb_send_index < buffers->rb_recv_index) {
-		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
-			__func__,
-			buffers->rb_recv_index - buffers->rb_send_index);
-		req->rl_reply = NULL;
-	} else {
-		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-	}
-	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+	spin_lock(&buffers->rb_lock);
+	if (list_empty(&buffers->rb_send_bufs))
+		goto out_reqbuf;
+	req = rpcrdma_buffer_get_req_locked(buffers);
+	if (list_empty(&buffers->rb_recv_bufs))
+		goto out_repbuf;
+	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+	spin_unlock(&buffers->rb_lock);
+	return req;
 
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+out_reqbuf:
+	spin_unlock(&buffers->rb_lock);
+	pr_warn("RPC:       %s: out of request buffers\n", __func__);
+	return NULL;
+out_repbuf:
+	spin_unlock(&buffers->rb_lock);
+	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
+	req->rl_reply = NULL;
 	return req;
 }
 
@@ -1168,30 +1128,31 @@ void
 rpcrdma_buffer_put(struct rpcrdma_req *req)
 {
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
-	unsigned long flags;
+	struct rpcrdma_rep *rep = req->rl_reply;
 
-	spin_lock_irqsave(&buffers->rb_lock, flags);
-	rpcrdma_buffer_put_sendbuf(req, buffers);
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	req->rl_niovs = 0;
+	req->rl_reply = NULL;
+
+	spin_lock(&buffers->rb_lock);
+	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+	if (rep)
+		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+	spin_unlock(&buffers->rb_lock);
 }
 
 /*
  * Recover reply buffers from pool.
- * This happens when recovering from error conditions.
- * Post-increment counter/array index.
+ * This happens when recovering from disconnect.
  */
 void
 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
 {
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
-	unsigned long flags;
 
-	spin_lock_irqsave(&buffers->rb_lock, flags);
-	if (buffers->rb_recv_index < buffers->rb_max_requests) {
-		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-	}
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	spin_lock(&buffers->rb_lock);
+	if (!list_empty(&buffers->rb_recv_bufs))
+		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+	spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1202,11 +1163,10 @@ void
 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 {
 	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-	unsigned long flags;
 
-	spin_lock_irqsave(&buffers->rb_lock, flags);
-	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	spin_lock(&buffers->rb_lock);
+	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+	spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1363,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
 	return rc;
 }
 
+/**
+ * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_rep *rep;
+	unsigned long flags;
+	int rc;
+
+	while (count--) {
+		spin_lock_irqsave(&buffers->rb_lock, flags);
+		if (list_empty(&buffers->rb_recv_bufs))
+			goto out_reqbuf;
+		rep = rpcrdma_buffer_get_rep_locked(buffers);
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+		rc = rpcrdma_ep_post_recv(ia, ep, rep);
+		if (rc)
+			goto out_rc;
+	}
+
+	return 0;
+
+out_reqbuf:
+	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	pr_warn("%s: no extra receive buffers\n", __func__);
+	return -ENOMEM;
+
+out_rc:
+	rpcrdma_recv_buffer_put(rep);
+	return rc;
+}
+
 /* How many chunk list items fit within our inline buffers?
  */
 unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c82abf44e39d..ac7f8d4f632a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
  * RDMA Endpoint -- one per transport instance
  */
 
-#define RPCRDMA_WC_BUDGET	(128)
-#define RPCRDMA_POLLSIZE	(16)
-
 struct rpcrdma_ep {
 	atomic_t		rep_cqcount;
 	int			rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
 	struct rdma_conn_param	rep_remote_cma;
 	struct sockaddr_storage	rep_remote_addr;
 	struct delayed_work	rep_connect_worker;
-	struct ib_wc		rep_send_wcs[RPCRDMA_POLLSIZE];
-	struct ib_wc		rep_recv_wcs[RPCRDMA_POLLSIZE];
 };
 
 /*
@@ -106,6 +101,16 @@ struct rpcrdma_ep {
  */
 #define RPCRDMA_IGNORE_COMPLETION	(0ULL)
 
+/* Pre-allocate extra Work Requests for handling backward receives
+ * and sends. This is a fixed value because the Work Queues are
+ * allocated when the forward channel is set up.
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+#define RPCRDMA_BACKWARD_WRS		(8)
+#else
+#define RPCRDMA_BACKWARD_WRS		(0)
+#endif
+
 /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
  *
  * The below structure appears at the front of a large region of kmalloc'd
@@ -169,10 +174,13 @@ struct rpcrdma_rep {
 	unsigned int		rr_len;
 	struct ib_device	*rr_device;
 	struct rpcrdma_xprt	*rr_rxprt;
+	struct work_struct	rr_work;
 	struct list_head	rr_list;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 };
 
+#define RPCRDMA_BAD_LEN		(~0U)
+
 /*
  * struct rpcrdma_mw - external memory region metadata
  *
@@ -256,6 +264,7 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
 #define RPCRDMA_MAX_IOVS	(2)
 
 struct rpcrdma_req {
+	struct list_head	rl_free;
 	unsigned int		rl_niovs;
 	unsigned int		rl_nchunks;
 	unsigned int		rl_connect_cookie;
@@ -265,6 +274,9 @@ struct rpcrdma_req {
 	struct rpcrdma_regbuf	*rl_rdmabuf;
 	struct rpcrdma_regbuf	*rl_sendbuf;
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
+
+	struct list_head	rl_all;
+	bool			rl_backchannel;
 };
 
 static inline struct rpcrdma_req *
@@ -289,12 +301,14 @@ struct rpcrdma_buffer {
 	struct list_head	rb_all;
 	char			*rb_pool;
 
-	spinlock_t		rb_lock;	/* protect buf arrays */
+	spinlock_t		rb_lock;	/* protect buf lists */
+	struct list_head	rb_send_bufs;
+	struct list_head	rb_recv_bufs;
 	u32			rb_max_requests;
-	int			rb_send_index;
-	int			rb_recv_index;
-	struct rpcrdma_req	**rb_send_bufs;
-	struct rpcrdma_rep	**rb_recv_bufs;
+
+	u32			rb_bc_srv_max_requests;
+	spinlock_t		rb_reqslock;	/* protect rb_allreqs */
+	struct list_head	rb_allreqs;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
@@ -340,6 +354,7 @@ struct rpcrdma_stats {
 	unsigned long		failed_marshal_count;
 	unsigned long		bad_reply_count;
 	unsigned long		nomsg_call_count;
+	unsigned long		bcall_count;
 };
 
 /*
@@ -415,6 +430,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
+struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
+void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
@@ -431,10 +449,14 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 			 struct rpcrdma_regbuf *);
 
 unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
 int frwr_alloc_recovery_wq(void);
 void frwr_destroy_recovery_wq(void);
 
+int rpcrdma_alloc_wq(void);
+void rpcrdma_destroy_wq(void);
+
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */
@@ -495,6 +517,18 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
 int xprt_rdma_init(void);
 void xprt_rdma_cleanup(void);
 
+/* Backchannel calls - xprtrdma/backchannel.c
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
+int xprt_rdma_bc_up(struct svc_serv *, struct net *);
+int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
+void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
+#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
+
 /* Temporary NFS request map cache. Created in svc_rdma.c  */
 extern struct kmem_cache *svc_rdma_map_cachep;
 /* WR context cache. Created in svc_rdma.c  */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 1a85e0ed0b48..1d1a70498910 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
 		int flags = XS_SENDMSG_FLAGS;
 
 		remainder -= len;
-		if (remainder != 0 || more)
+		if (more)
 			flags |= MSG_MORE;
+		if (remainder != 0)
+			flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
 		err = do_sendpage(sock, *ppage, base, len, flags);
 		if (remainder == 0 || err != len)
 			break;
@@ -823,6 +825,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
 
 	kernel_sock_shutdown(sock, SHUT_RDWR);
 
+	mutex_lock(&transport->recv_mutex);
 	write_lock_bh(&sk->sk_callback_lock);
 	transport->inet = NULL;
 	transport->sock = NULL;
@@ -833,6 +836,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
 	xprt_clear_connected(xprt);
 	write_unlock_bh(&sk->sk_callback_lock);
 	xs_sock_reset_connection_flags(xprt);
+	mutex_unlock(&transport->recv_mutex);
 
 	trace_rpc_socket_close(xprt, sock);
 	sock_release(sock);
@@ -886,6 +890,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
 
 	cancel_delayed_work_sync(&transport->connect_worker);
 	xs_close(xprt);
+	cancel_work_sync(&transport->recv_worker);
 	xs_xprt_free(xprt);
 	module_put(THIS_MODULE);
 }
@@ -906,44 +911,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
 }
 
 /**
- * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
- * @sk: socket with data to read
+ * xs_local_data_read_skb
+ * @xprt: transport
+ * @sk: socket
+ * @skb: skbuff
  *
  * Currently this assumes we can read the whole reply in a single gulp.
  */
-static void xs_local_data_ready(struct sock *sk)
+static void xs_local_data_read_skb(struct rpc_xprt *xprt,
+		struct sock *sk,
+		struct sk_buff *skb)
 {
 	struct rpc_task *task;
-	struct rpc_xprt *xprt;
 	struct rpc_rqst *rovr;
-	struct sk_buff *skb;
-	int err, repsize, copied;
+	int repsize, copied;
 	u32 _xid;
 	__be32 *xp;
 
-	read_lock_bh(&sk->sk_callback_lock);
-	dprintk("RPC:       %s...\n", __func__);
-	xprt = xprt_from_sock(sk);
-	if (xprt == NULL)
-		goto out;
-
-	skb = skb_recv_datagram(sk, 0, 1, &err);
-	if (skb == NULL)
-		goto out;
-
 	repsize = skb->len - sizeof(rpc_fraghdr);
 	if (repsize < 4) {
 		dprintk("RPC:       impossible RPC reply size %d\n", repsize);
-		goto dropit;
+		return;
 	}
 
 	/* Copy the XID from the skb... */
 	xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
 	if (xp == NULL)
-		goto dropit;
+		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock(&xprt->transport_lock);
+	spin_lock_bh(&xprt->transport_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
@@ -961,50 +958,68 @@ static void xs_local_data_ready(struct sock *sk)
 	xprt_complete_rqst(task, copied);
 
  out_unlock:
-	spin_unlock(&xprt->transport_lock);
- dropit:
-	skb_free_datagram(sk, skb);
- out:
-	read_unlock_bh(&sk->sk_callback_lock);
+	spin_unlock_bh(&xprt->transport_lock);
+}
+
+static void xs_local_data_receive(struct sock_xprt *transport)
+{
+	struct sk_buff *skb;
+	struct sock *sk;
+	int err;
+
+	mutex_lock(&transport->recv_mutex);
+	sk = transport->inet;
+	if (sk == NULL)
+		goto out;
+	for (;;) {
+		skb = skb_recv_datagram(sk, 0, 1, &err);
+		if (skb == NULL)
+			break;
+		xs_local_data_read_skb(&transport->xprt, sk, skb);
+		skb_free_datagram(sk, skb);
+	}
+out:
+	mutex_unlock(&transport->recv_mutex);
+}
+
+static void xs_local_data_receive_workfn(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, recv_worker);
+	xs_local_data_receive(transport);
 }
 
 /**
- * xs_udp_data_ready - "data ready" callback for UDP sockets
- * @sk: socket with data to read
+ * xs_udp_data_read_skb - receive callback for UDP sockets
+ * @xprt: transport
+ * @sk: socket
+ * @skb: skbuff
  *
  */
-static void xs_udp_data_ready(struct sock *sk)
+static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
+		struct sock *sk,
+		struct sk_buff *skb)
 {
 	struct rpc_task *task;
-	struct rpc_xprt *xprt;
 	struct rpc_rqst *rovr;
-	struct sk_buff *skb;
-	int err, repsize, copied;
+	int repsize, copied;
 	u32 _xid;
 	__be32 *xp;
 
-	read_lock_bh(&sk->sk_callback_lock);
-	dprintk("RPC:       xs_udp_data_ready...\n");
-	if (!(xprt = xprt_from_sock(sk)))
-		goto out;
-
-	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
-		goto out;
-
 	repsize = skb->len - sizeof(struct udphdr);
 	if (repsize < 4) {
 		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
-		goto dropit;
+		return;
 	}
 
 	/* Copy the XID from the skb... */
 	xp = skb_header_pointer(skb, sizeof(struct udphdr),
 				sizeof(_xid), &_xid);
 	if (xp == NULL)
-		goto dropit;
+		return;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock(&xprt->transport_lock);
+	spin_lock_bh(&xprt->transport_lock);
 	rovr = xprt_lookup_rqst(xprt, *xp);
 	if (!rovr)
 		goto out_unlock;
@@ -1025,10 +1040,54 @@ static void xs_udp_data_ready(struct sock *sk)
 	xprt_complete_rqst(task, copied);
 
  out_unlock:
-	spin_unlock(&xprt->transport_lock);
- dropit:
-	skb_free_datagram(sk, skb);
- out:
+	spin_unlock_bh(&xprt->transport_lock);
+}
+
+static void xs_udp_data_receive(struct sock_xprt *transport)
+{
+	struct sk_buff *skb;
+	struct sock *sk;
+	int err;
+
+	mutex_lock(&transport->recv_mutex);
+	sk = transport->inet;
+	if (sk == NULL)
+		goto out;
+	for (;;) {
+		skb = skb_recv_datagram(sk, 0, 1, &err);
+		if (skb == NULL)
+			break;
+		xs_udp_data_read_skb(&transport->xprt, sk, skb);
+		skb_free_datagram(sk, skb);
+	}
+out:
+	mutex_unlock(&transport->recv_mutex);
+}
+
+static void xs_udp_data_receive_workfn(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, recv_worker);
+	xs_udp_data_receive(transport);
+}
+
+/**
+ * xs_data_ready - "data ready" callback for UDP sockets
+ * @sk: socket with data to read
+ *
+ */
+static void xs_data_ready(struct sock *sk)
+{
+	struct rpc_xprt *xprt;
+
+	read_lock_bh(&sk->sk_callback_lock);
+	dprintk("RPC:       xs_data_ready...\n");
+	xprt = xprt_from_sock(sk);
+	if (xprt != NULL) {
+		struct sock_xprt *transport = container_of(xprt,
+				struct sock_xprt, xprt);
+		queue_work(rpciod_workqueue, &transport->recv_worker);
+	}
 	read_unlock_bh(&sk->sk_callback_lock);
 }
 
@@ -1243,12 +1302,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
 
 	/* Find and lock the request corresponding to this xid */
-	spin_lock(&xprt->transport_lock);
+	spin_lock_bh(&xprt->transport_lock);
 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
 	if (!req) {
 		dprintk("RPC:       XID %08x request not found!\n",
 				ntohl(transport->tcp_xid));
-		spin_unlock(&xprt->transport_lock);
+		spin_unlock_bh(&xprt->transport_lock);
 		return -1;
 	}
 
@@ -1257,7 +1316,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
 
-	spin_unlock(&xprt->transport_lock);
+	spin_unlock_bh(&xprt->transport_lock);
 	return 0;
 }
 
@@ -1277,10 +1336,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 	struct rpc_rqst *req;
 
 	/* Look up and lock the request corresponding to the given XID */
-	spin_lock(&xprt->transport_lock);
+	spin_lock_bh(&xprt->transport_lock);
 	req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
 	if (req == NULL) {
-		spin_unlock(&xprt->transport_lock);
+		spin_unlock_bh(&xprt->transport_lock);
 		printk(KERN_WARNING "Callback slot table overflowed\n");
 		xprt_force_disconnect(xprt);
 		return -1;
@@ -1291,7 +1350,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
 
 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
 		xprt_complete_bc_request(req, transport->tcp_copied);
-	spin_unlock(&xprt->transport_lock);
+	spin_unlock_bh(&xprt->transport_lock);
 
 	return 0;
 }
@@ -1306,6 +1365,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 		xs_tcp_read_reply(xprt, desc) :
 		xs_tcp_read_callback(xprt, desc);
 }
+
+static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
+{
+	int ret;
+
+	ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
+			      SVC_SOCK_ANONYMOUS);
+	if (ret < 0)
+		return ret;
+	return 0;
+}
 #else
 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 					struct xdr_skb_reader *desc)
@@ -1391,6 +1461,44 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
 	return len - desc.count;
 }
 
+static void xs_tcp_data_receive(struct sock_xprt *transport)
+{
+	struct rpc_xprt *xprt = &transport->xprt;
+	struct sock *sk;
+	read_descriptor_t rd_desc = {
+		.count = 2*1024*1024,
+		.arg.data = xprt,
+	};
+	unsigned long total = 0;
+	int read = 0;
+
+	mutex_lock(&transport->recv_mutex);
+	sk = transport->inet;
+	if (sk == NULL)
+		goto out;
+
+	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
+	for (;;) {
+		lock_sock(sk);
+		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+		release_sock(sk);
+		if (read <= 0)
+			break;
+		total += read;
+		rd_desc.count = 65536;
+	}
+out:
+	mutex_unlock(&transport->recv_mutex);
+	trace_xs_tcp_data_ready(xprt, read, total);
+}
+
+static void xs_tcp_data_receive_workfn(struct work_struct *work)
+{
+	struct sock_xprt *transport =
+		container_of(work, struct sock_xprt, recv_worker);
+	xs_tcp_data_receive(transport);
+}
+
 /**
  * xs_tcp_data_ready - "data ready" callback for TCP sockets
  * @sk: socket with data to read
@@ -1398,34 +1506,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
  */
 static void xs_tcp_data_ready(struct sock *sk)
 {
+	struct sock_xprt *transport;
 	struct rpc_xprt *xprt;
-	read_descriptor_t rd_desc;
-	int read;
-	unsigned long total = 0;
 
 	dprintk("RPC:       xs_tcp_data_ready...\n");
 
 	read_lock_bh(&sk->sk_callback_lock);
-	if (!(xprt = xprt_from_sock(sk))) {
-		read = 0;
+	if (!(xprt = xprt_from_sock(sk)))
 		goto out;
-	}
+	transport = container_of(xprt, struct sock_xprt, xprt);
+
 	/* Any data means we had a useful conversation, so
 	 * the we don't need to delay the next reconnect
 	 */
 	if (xprt->reestablish_timeout)
 		xprt->reestablish_timeout = 0;
+	queue_work(rpciod_workqueue, &transport->recv_worker);
 
-	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
-	rd_desc.arg.data = xprt;
-	do {
-		rd_desc.count = 65536;
-		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-		if (read > 0)
-			total += read;
-	} while (read > 0);
 out:
-	trace_xs_tcp_data_ready(xprt, read, total);
 	read_unlock_bh(&sk->sk_callback_lock);
 }
 
@@ -1873,7 +1971,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
 		xs_save_old_callbacks(transport, sk);
 
 		sk->sk_user_data = xprt;
-		sk->sk_data_ready = xs_local_data_ready;
+		sk->sk_data_ready = xs_data_ready;
 		sk->sk_write_space = xs_udp_write_space;
 		sk->sk_error_report = xs_error_report;
 		sk->sk_allocation = GFP_NOIO;
@@ -2059,7 +2157,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 		xs_save_old_callbacks(transport, sk);
 
 		sk->sk_user_data = xprt;
-		sk->sk_data_ready = xs_udp_data_ready;
+		sk->sk_data_ready = xs_data_ready;
 		sk->sk_write_space = xs_udp_write_space;
 		sk->sk_allocation = GFP_NOIO;
 
@@ -2472,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
 	struct svc_xprt	*xprt;
-	u32                     len;
+	int len;
 
 	dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
 	/*
@@ -2580,6 +2678,12 @@ static struct rpc_xprt_ops xs_tcp_ops = {
 	.enable_swap		= xs_enable_swap,
 	.disable_swap		= xs_disable_swap,
 	.inject_disconnect	= xs_inject_disconnect,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+	.bc_setup		= xprt_setup_bc,
+	.bc_up			= xs_tcp_bc_up,
+	.bc_free_rqst		= xprt_free_bc_rqst,
+	.bc_destroy		= xprt_destroy_bc,
+#endif
 };
 
 /*
@@ -2650,6 +2754,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
 	}
 
 	new = container_of(xprt, struct sock_xprt, xprt);
+	mutex_init(&new->recv_mutex);
 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
 	xprt->addrlen = args->addrlen;
 	if (args->srcaddr)
@@ -2703,6 +2808,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
 	xprt->ops = &xs_local_ops;
 	xprt->timeout = &xs_local_default_timeout;
 
+	INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
 	INIT_DELAYED_WORK(&transport->connect_worker,
 			xs_dummy_setup_socket);
 
@@ -2774,21 +2880,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
 
 	xprt->timeout = &xs_udp_default_timeout;
 
+	INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
+	INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
+
 	switch (addr->sa_family) {
 	case AF_INET:
 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
 			xprt_set_bound(xprt);
 
-		INIT_DELAYED_WORK(&transport->connect_worker,
-					xs_udp_setup_socket);
 		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
 		break;
 	case AF_INET6:
 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
 			xprt_set_bound(xprt);
 
-		INIT_DELAYED_WORK(&transport->connect_worker,
-					xs_udp_setup_socket);
 		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
 		break;
 	default:
@@ -2853,21 +2958,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
 	xprt->ops = &xs_tcp_ops;
 	xprt->timeout = &xs_tcp_default_timeout;
 
+	INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
+	INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
+
 	switch (addr->sa_family) {
 	case AF_INET:
 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
 			xprt_set_bound(xprt);
 
-		INIT_DELAYED_WORK(&transport->connect_worker,
-					xs_tcp_setup_socket);
 		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
 		break;
 	case AF_INET6:
 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
 			xprt_set_bound(xprt);
 
-		INIT_DELAYED_WORK(&transport->connect_worker,
-					xs_tcp_setup_socket);
 		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
 		break;
 	default: