summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--fs/nfs/callback_proc.c9
-rw-r--r--fs/nfs/callback_xdr.c17
-rw-r--r--fs/nfs/delegation.c9
-rw-r--r--fs/nfs/delegation.h2
-rw-r--r--fs/nfs/direct.c17
-rw-r--r--fs/nfs/filelayout/filelayout.c6
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.c200
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.h17
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayoutdev.c119
-rw-r--r--fs/nfs/internal.h1
-rw-r--r--fs/nfs/nfs42.h1
-rw-r--r--fs/nfs/nfs42proc.c107
-rw-r--r--fs/nfs/nfs42xdr.c146
-rw-r--r--fs/nfs/nfs4_fs.h12
-rw-r--r--fs/nfs/nfs4file.c23
-rw-r--r--fs/nfs/nfs4proc.c185
-rw-r--r--fs/nfs/nfs4state.c18
-rw-r--r--fs/nfs/nfs4trace.h10
-rw-r--r--fs/nfs/nfs4xdr.c43
-rw-r--r--fs/nfs/pagelist.c6
-rw-r--r--fs/nfs/pnfs.c349
-rw-r--r--fs/nfs/pnfs.h17
-rw-r--r--fs/nfs/pnfs_nfs.c60
-rw-r--r--fs/nfs/super.c9
-rw-r--r--fs/nfs/write.c64
-rw-r--r--include/linux/errno.h1
-rw-r--r--include/linux/nfs4.h28
-rw-r--r--include/linux/nfs_fs_sb.h1
-rw-r--r--include/linux/nfs_xdr.h30
-rw-r--r--include/linux/sunrpc/auth.h26
-rw-r--r--include/linux/sunrpc/clnt.h1
-rw-r--r--include/linux/sunrpc/msg_prot.h4
-rw-r--r--include/linux/sunrpc/xprt.h1
-rw-r--r--include/linux/sunrpc/xprtrdma.h4
-rw-r--r--net/sunrpc/auth.c9
-rw-r--r--net/sunrpc/auth_generic.c13
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c6
-rw-r--r--net/sunrpc/auth_unix.c6
-rw-r--r--net/sunrpc/clnt.c17
-rw-r--r--net/sunrpc/xdr.c2
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c16
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c134
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c214
-rw-r--r--net/sunrpc/xprtrdma/physical_ops.c39
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c517
-rw-r--r--net/sunrpc/xprtrdma/transport.c16
-rw-r--r--net/sunrpc/xprtrdma/verbs.c78
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h47
-rw-r--r--net/sunrpc/xprtsock.c6
49 files changed, 1764 insertions, 899 deletions
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index 618ced381a14..aaa2e8d3df6f 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -217,7 +217,8 @@ static u32 initiate_file_draining(struct nfs_client *clp,
 	}
 
 	if (pnfs_mark_matching_lsegs_return(lo, &free_me_list,
-					&args->cbl_range)) {
+				&args->cbl_range,
+				be32_to_cpu(args->cbl_stateid.seqid))) {
 		rv = NFS4_OK;
 		goto unlock;
 	}
@@ -500,8 +501,10 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
 	cps->slot = slot;
 
 	/* The ca_maxresponsesize_cached is 0 with no DRC */
-	if (args->csa_cachethis != 0)
-		return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
+	if (args->csa_cachethis != 0) {
+		status = htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
+		goto out_unlock;
+	}
 
 	/*
 	 * Check for pending referring calls.  If a match is found, a
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index 976c90608e56..d81f96aacd51 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -146,10 +146,16 @@ static __be32 decode_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
 	p = read_buf(xdr, NFS4_STATEID_SIZE);
 	if (unlikely(p == NULL))
 		return htonl(NFS4ERR_RESOURCE);
-	memcpy(stateid, p, NFS4_STATEID_SIZE);
+	memcpy(stateid->data, p, NFS4_STATEID_SIZE);
 	return 0;
 }
 
+static __be32 decode_delegation_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
+{
+	stateid->type = NFS4_DELEGATION_STATEID_TYPE;
+	return decode_stateid(xdr, stateid);
+}
+
 static __be32 decode_compound_hdr_arg(struct xdr_stream *xdr, struct cb_compound_hdr_arg *hdr)
 {
 	__be32 *p;
@@ -211,7 +217,7 @@ static __be32 decode_recall_args(struct svc_rqst *rqstp, struct xdr_stream *xdr,
 	__be32 *p;
 	__be32 status;
 
-	status = decode_stateid(xdr, &args->stateid);
+	status = decode_delegation_stateid(xdr, &args->stateid);
 	if (unlikely(status != 0))
 		goto out;
 	p = read_buf(xdr, 4);
@@ -227,6 +233,11 @@ out:
 }
 
 #if defined(CONFIG_NFS_V4_1)
+static __be32 decode_layout_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
+{
+	stateid->type = NFS4_LAYOUT_STATEID_TYPE;
+	return decode_stateid(xdr, stateid);
+}
 
 static __be32 decode_layoutrecall_args(struct svc_rqst *rqstp,
 				       struct xdr_stream *xdr,
@@ -263,7 +274,7 @@ static __be32 decode_layoutrecall_args(struct svc_rqst *rqstp,
 		}
 		p = xdr_decode_hyper(p, &args->cbl_range.offset);
 		p = xdr_decode_hyper(p, &args->cbl_range.length);
-		status = decode_stateid(xdr, &args->cbl_stateid);
+		status = decode_layout_stateid(xdr, &args->cbl_stateid);
 		if (unlikely(status != 0))
 			goto out;
 	} else if (args->cbl_recall_type == RETURN_FSID) {
diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c
index 5166adcfc0fb..322c2585bc34 100644
--- a/fs/nfs/delegation.c
+++ b/fs/nfs/delegation.c
@@ -875,15 +875,16 @@ int nfs_delegations_present(struct nfs_client *clp)
 
 /**
  * nfs4_copy_delegation_stateid - Copy inode's state ID information
- * @dst: stateid data structure to fill in
  * @inode: inode to check
  * @flags: delegation type requirement
+ * @dst: stateid data structure to fill in
+ * @cred: optional argument to retrieve credential
  *
  * Returns "true" and fills in "dst->data" * if inode had a delegation,
  * otherwise "false" is returned.
  */
-bool nfs4_copy_delegation_stateid(nfs4_stateid *dst, struct inode *inode,
-		fmode_t flags)
+bool nfs4_copy_delegation_stateid(struct inode *inode, fmode_t flags,
+		nfs4_stateid *dst, struct rpc_cred **cred)
 {
 	struct nfs_inode *nfsi = NFS_I(inode);
 	struct nfs_delegation *delegation;
@@ -896,6 +897,8 @@ bool nfs4_copy_delegation_stateid(nfs4_stateid *dst, struct inode *inode,
 	if (ret) {
 		nfs4_stateid_copy(dst, &delegation->stateid);
 		nfs_mark_delegation_referenced(delegation);
+		if (cred)
+			*cred = get_rpccred(delegation->cred);
 	}
 	rcu_read_unlock();
 	return ret;
diff --git a/fs/nfs/delegation.h b/fs/nfs/delegation.h
index 333063e032f0..64724d252a79 100644
--- a/fs/nfs/delegation.h
+++ b/fs/nfs/delegation.h
@@ -56,7 +56,7 @@ void nfs_delegation_reap_unclaimed(struct nfs_client *clp);
 int nfs4_proc_delegreturn(struct inode *inode, struct rpc_cred *cred, const nfs4_stateid *stateid, int issync);
 int nfs4_open_delegation_recall(struct nfs_open_context *ctx, struct nfs4_state *state, const nfs4_stateid *stateid, fmode_t type);
 int nfs4_lock_delegation_recall(struct file_lock *fl, struct nfs4_state *state, const nfs4_stateid *stateid);
-bool nfs4_copy_delegation_stateid(nfs4_stateid *dst, struct inode *inode, fmode_t flags);
+bool nfs4_copy_delegation_stateid(struct inode *inode, fmode_t flags, nfs4_stateid *dst, struct rpc_cred **cred);
 
 void nfs_mark_delegation_referenced(struct nfs_delegation *delegation);
 int nfs4_have_delegation(struct inode *inode, fmode_t flags);
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 741a92c470bb..979b3c4dee6a 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -87,6 +87,7 @@ struct nfs_direct_req {
 	int			mirror_count;
 
 	ssize_t			count,		/* bytes actually processed */
+				max_count,	/* max expected count */
 				bytes_left,	/* bytes left to be sent */
 				io_start,	/* start of IO */
 				error;		/* any reported error */
@@ -123,6 +124,8 @@ nfs_direct_good_bytes(struct nfs_direct_req *dreq, struct nfs_pgio_header *hdr)
 	int i;
 	ssize_t count;
 
+	WARN_ON_ONCE(dreq->count >= dreq->max_count);
+
 	if (dreq->mirror_count == 1) {
 		dreq->mirrors[hdr->pgio_mirror_idx].count += hdr->good_bytes;
 		dreq->count += hdr->good_bytes;
@@ -275,7 +278,7 @@ static void nfs_direct_release_pages(struct page **pages, unsigned int npages)
 void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
 			      struct nfs_direct_req *dreq)
 {
-	cinfo->lock = &dreq->inode->i_lock;
+	cinfo->inode = dreq->inode;
 	cinfo->mds = &dreq->mds_cinfo;
 	cinfo->ds = &dreq->ds_cinfo;
 	cinfo->dreq = dreq;
@@ -591,7 +594,7 @@ ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter)
 		goto out_unlock;
 
 	dreq->inode = inode;
-	dreq->bytes_left = count;
+	dreq->bytes_left = dreq->max_count = count;
 	dreq->io_start = iocb->ki_pos;
 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 	l_ctx = nfs_get_lock_context(dreq->ctx);
@@ -630,13 +633,13 @@ nfs_direct_write_scan_commit_list(struct inode *inode,
 				  struct list_head *list,
 				  struct nfs_commit_info *cinfo)
 {
-	spin_lock(cinfo->lock);
+	spin_lock(&cinfo->inode->i_lock);
 #ifdef CONFIG_NFS_V4_1
 	if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
 		NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
 #endif
 	nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
-	spin_unlock(cinfo->lock);
+	spin_unlock(&cinfo->inode->i_lock);
 }
 
 static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
@@ -671,13 +674,13 @@ static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
 		if (!nfs_pageio_add_request(&desc, req)) {
 			nfs_list_remove_request(req);
 			nfs_list_add_request(req, &failed);
-			spin_lock(cinfo.lock);
+			spin_lock(&cinfo.inode->i_lock);
 			dreq->flags = 0;
 			if (desc.pg_error < 0)
 				dreq->error = desc.pg_error;
 			else
 				dreq->error = -EIO;
-			spin_unlock(cinfo.lock);
+			spin_unlock(&cinfo.inode->i_lock);
 		}
 		nfs_release_request(req);
 	}
@@ -1023,7 +1026,7 @@ ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter)
 		goto out_unlock;
 
 	dreq->inode = inode;
-	dreq->bytes_left = iov_iter_count(iter);
+	dreq->bytes_left = dreq->max_count = iov_iter_count(iter);
 	dreq->io_start = pos;
 	dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp));
 	l_ctx = nfs_get_lock_context(dreq->ctx);
diff --git a/fs/nfs/filelayout/filelayout.c b/fs/nfs/filelayout/filelayout.c
index 3384dc8e6683..aa59757389dc 100644
--- a/fs/nfs/filelayout/filelayout.c
+++ b/fs/nfs/filelayout/filelayout.c
@@ -795,7 +795,7 @@ filelayout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 		buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW;
 	}
 
-	spin_lock(cinfo->lock);
+	spin_lock(&cinfo->inode->i_lock);
 	if (cinfo->ds->nbuckets >= size)
 		goto out;
 	for (i = 0; i < cinfo->ds->nbuckets; i++) {
@@ -811,7 +811,7 @@ filelayout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 	swap(cinfo->ds->buckets, buckets);
 	cinfo->ds->nbuckets = size;
 out:
-	spin_unlock(cinfo->lock);
+	spin_unlock(&cinfo->inode->i_lock);
 	kfree(buckets);
 	return 0;
 }
@@ -890,6 +890,7 @@ filelayout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 					   0,
 					   NFS4_MAX_UINT64,
 					   IOMODE_READ,
+					   false,
 					   GFP_KERNEL);
 		if (IS_ERR(pgio->pg_lseg)) {
 			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
@@ -915,6 +916,7 @@ filelayout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 					   0,
 					   NFS4_MAX_UINT64,
 					   IOMODE_RW,
+					   false,
 					   GFP_NOFS);
 		if (IS_ERR(pgio->pg_lseg)) {
 			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index 0cb1abd535e3..0e8018bc9880 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -26,6 +26,8 @@
 
 #define FF_LAYOUT_POLL_RETRY_MAX     (15*HZ)
 
+static struct group_info	*ff_zero_group;
+
 static struct pnfs_layout_hdr *
 ff_layout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
 {
@@ -53,14 +55,15 @@ ff_layout_free_layout_hdr(struct pnfs_layout_hdr *lo)
 	kfree(FF_LAYOUT_FROM_HDR(lo));
 }
 
-static int decode_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
+static int decode_pnfs_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
 {
 	__be32 *p;
 
 	p = xdr_inline_decode(xdr, NFS4_STATEID_SIZE);
 	if (unlikely(p == NULL))
 		return -ENOBUFS;
-	memcpy(stateid, p, NFS4_STATEID_SIZE);
+	stateid->type = NFS4_PNFS_DS_STATEID_TYPE;
+	memcpy(stateid->data, p, NFS4_STATEID_SIZE);
 	dprintk("%s: stateid id= [%x%x%x%x]\n", __func__,
 		p[0], p[1], p[2], p[3]);
 	return 0;
@@ -211,10 +214,16 @@ static struct nfs4_ff_layout_mirror *ff_layout_alloc_mirror(gfp_t gfp_flags)
 
 static void ff_layout_free_mirror(struct nfs4_ff_layout_mirror *mirror)
 {
+	struct rpc_cred	*cred;
+
 	ff_layout_remove_mirror(mirror);
 	kfree(mirror->fh_versions);
-	if (mirror->cred)
-		put_rpccred(mirror->cred);
+	cred = rcu_access_pointer(mirror->ro_cred);
+	if (cred)
+		put_rpccred(cred);
+	cred = rcu_access_pointer(mirror->rw_cred);
+	if (cred)
+		put_rpccred(cred);
 	nfs4_ff_layout_put_deviceid(mirror->mirror_ds);
 	kfree(mirror);
 }
@@ -290,6 +299,8 @@ ff_lseg_merge(struct pnfs_layout_segment *new,
 {
 	u64 new_end, old_end;
 
+	if (test_bit(NFS_LSEG_LAYOUTRETURN, &old->pls_flags))
+		return false;
 	if (new->pls_range.iomode != old->pls_range.iomode)
 		return false;
 	old_end = pnfs_calc_offset_end(old->pls_range.offset,
@@ -310,8 +321,6 @@ ff_lseg_merge(struct pnfs_layout_segment *new,
 			new_end);
 	if (test_bit(NFS_LSEG_ROC, &old->pls_flags))
 		set_bit(NFS_LSEG_ROC, &new->pls_flags);
-	if (test_bit(NFS_LSEG_LAYOUTRETURN, &old->pls_flags))
-		set_bit(NFS_LSEG_LAYOUTRETURN, &new->pls_flags);
 	return true;
 }
 
@@ -407,8 +416,9 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 		struct nfs4_ff_layout_mirror *mirror;
 		struct nfs4_deviceid devid;
 		struct nfs4_deviceid_node *idnode;
-		u32 ds_count;
-		u32 fh_count;
+		struct auth_cred acred = { .group_info = ff_zero_group };
+		struct rpc_cred	__rcu *cred;
+		u32 ds_count, fh_count, id;
 		int j;
 
 		rc = -EIO;
@@ -456,7 +466,7 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 		fls->mirror_array[i]->efficiency = be32_to_cpup(p);
 
 		/* stateid */
-		rc = decode_stateid(&stream, &fls->mirror_array[i]->stateid);
+		rc = decode_pnfs_stateid(&stream, &fls->mirror_array[i]->stateid);
 		if (rc)
 			goto out_err_free;
 
@@ -484,24 +494,49 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 		fls->mirror_array[i]->fh_versions_cnt = fh_count;
 
 		/* user */
-		rc = decode_name(&stream, &fls->mirror_array[i]->uid);
+		rc = decode_name(&stream, &id);
 		if (rc)
 			goto out_err_free;
 
+		acred.uid = make_kuid(&init_user_ns, id);
+
 		/* group */
-		rc = decode_name(&stream, &fls->mirror_array[i]->gid);
+		rc = decode_name(&stream, &id);
 		if (rc)
 			goto out_err_free;
 
+		acred.gid = make_kgid(&init_user_ns, id);
+
+		/* find the cred for it */
+		rcu_assign_pointer(cred, rpc_lookup_generic_cred(&acred, 0, gfp_flags));
+		if (IS_ERR(cred)) {
+			rc = PTR_ERR(cred);
+			goto out_err_free;
+		}
+
+		if (lgr->range.iomode == IOMODE_READ)
+			rcu_assign_pointer(fls->mirror_array[i]->ro_cred, cred);
+		else
+			rcu_assign_pointer(fls->mirror_array[i]->rw_cred, cred);
+
 		mirror = ff_layout_add_mirror(lh, fls->mirror_array[i]);
 		if (mirror != fls->mirror_array[i]) {
+			/* swap cred ptrs so free_mirror will clean up old */
+			if (lgr->range.iomode == IOMODE_READ) {
+				cred = xchg(&mirror->ro_cred, cred);
+				rcu_assign_pointer(fls->mirror_array[i]->ro_cred, cred);
+			} else {
+				cred = xchg(&mirror->rw_cred, cred);
+				rcu_assign_pointer(fls->mirror_array[i]->rw_cred, cred);
+			}
 			ff_layout_free_mirror(fls->mirror_array[i]);
 			fls->mirror_array[i] = mirror;
 		}
 
-		dprintk("%s: uid %d gid %d\n", __func__,
-			fls->mirror_array[i]->uid,
-			fls->mirror_array[i]->gid);
+		dprintk("%s: iomode %s uid %u gid %u\n", __func__,
+			lgr->range.iomode == IOMODE_READ ? "READ" : "RW",
+			from_kuid(&init_user_ns, acred.uid),
+			from_kgid(&init_user_ns, acred.gid));
 	}
 
 	p = xdr_inline_decode(&stream, 4);
@@ -745,7 +780,7 @@ ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 	else {
 		int i;
 
-		spin_lock(cinfo->lock);
+		spin_lock(&cinfo->inode->i_lock);
 		if (cinfo->ds->nbuckets != 0)
 			kfree(buckets);
 		else {
@@ -759,7 +794,7 @@ ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 					NFS_INVALID_STABLE_HOW;
 			}
 		}
-		spin_unlock(cinfo->lock);
+		spin_unlock(&cinfo->inode->i_lock);
 		return 0;
 	}
 }
@@ -786,6 +821,36 @@ ff_layout_choose_best_ds_for_read(struct pnfs_layout_segment *lseg,
 }
 
 static void
+ff_layout_pg_get_read(struct nfs_pageio_descriptor *pgio,
+		      struct nfs_page *req,
+		      bool strict_iomode)
+{
+retry_strict:
+	pnfs_put_lseg(pgio->pg_lseg);
+	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
+					   req->wb_context,
+					   0,
+					   NFS4_MAX_UINT64,
+					   IOMODE_READ,
+					   strict_iomode,
+					   GFP_KERNEL);
+	if (IS_ERR(pgio->pg_lseg)) {
+		pgio->pg_error = PTR_ERR(pgio->pg_lseg);
+		pgio->pg_lseg = NULL;
+	}
+
+	/* If we don't have checking, do get a IOMODE_RW
+	 * segment, and the server wants to avoid READs
+	 * there, then retry!
+	 */
+	if (pgio->pg_lseg && !strict_iomode &&
+	    ff_layout_avoid_read_on_rw(pgio->pg_lseg)) {
+		strict_iomode = true;
+		goto retry_strict;
+	}
+}
+
+static void
 ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 			struct nfs_page *req)
 {
@@ -795,26 +860,23 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 	int ds_idx;
 
 	/* Use full layout for now */
-	if (!pgio->pg_lseg) {
-		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
-						   req->wb_context,
-						   0,
-						   NFS4_MAX_UINT64,
-						   IOMODE_READ,
-						   GFP_KERNEL);
-		if (IS_ERR(pgio->pg_lseg)) {
-			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
-			pgio->pg_lseg = NULL;
-			return;
-		}
-	}
+	if (!pgio->pg_lseg)
+		ff_layout_pg_get_read(pgio, req, false);
+	else if (ff_layout_avoid_read_on_rw(pgio->pg_lseg))
+		ff_layout_pg_get_read(pgio, req, true);
+
 	/* If no lseg, fall back to read through mds */
 	if (pgio->pg_lseg == NULL)
 		goto out_mds;
 
 	ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx);
-	if (!ds)
-		goto out_mds;
+	if (!ds) {
+		if (ff_layout_no_fallback_to_mds(pgio->pg_lseg))
+			goto out_pnfs;
+		else
+			goto out_mds;
+	}
+
 	mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx);
 
 	pgio->pg_mirror_idx = ds_idx;
@@ -828,6 +890,12 @@ out_mds:
 	pnfs_put_lseg(pgio->pg_lseg);
 	pgio->pg_lseg = NULL;
 	nfs_pageio_reset_read_mds(pgio);
+	return;
+
+out_pnfs:
+	pnfs_set_lo_fail(pgio->pg_lseg);
+	pnfs_put_lseg(pgio->pg_lseg);
+	pgio->pg_lseg = NULL;
 }
 
 static void
@@ -847,6 +915,7 @@ ff_layout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 						   0,
 						   NFS4_MAX_UINT64,
 						   IOMODE_RW,
+						   false,
 						   GFP_NOFS);
 		if (IS_ERR(pgio->pg_lseg)) {
 			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
@@ -870,8 +939,12 @@ ff_layout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 
 	for (i = 0; i < pgio->pg_mirror_count; i++) {
 		ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, i, true);
-		if (!ds)
-			goto out_mds;
+		if (!ds) {
+			if (ff_layout_no_fallback_to_mds(pgio->pg_lseg))
+				goto out_pnfs;
+			else
+				goto out_mds;
+		}
 		pgm = &pgio->pg_mirrors[i];
 		mirror = FF_LAYOUT_COMP(pgio->pg_lseg, i);
 		pgm->pg_bsize = mirror->mirror_ds->ds_versions[0].wsize;
@@ -883,6 +956,12 @@ out_mds:
 	pnfs_put_lseg(pgio->pg_lseg);
 	pgio->pg_lseg = NULL;
 	nfs_pageio_reset_write_mds(pgio);
+	return;
+
+out_pnfs:
+	pnfs_set_lo_fail(pgio->pg_lseg);
+	pnfs_put_lseg(pgio->pg_lseg);
+	pgio->pg_lseg = NULL;
 }
 
 static unsigned int
@@ -895,6 +974,7 @@ ff_layout_pg_get_mirror_count_write(struct nfs_pageio_descriptor *pgio,
 						   0,
 						   NFS4_MAX_UINT64,
 						   IOMODE_RW,
+						   false,
 						   GFP_NOFS);
 		if (IS_ERR(pgio->pg_lseg)) {
 			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
@@ -1067,8 +1147,7 @@ static int ff_layout_async_handle_error_v4(struct rpc_task *task,
 		rpc_wake_up(&tbl->slot_tbl_waitq);
 		/* fall through */
 	default:
-		if (ff_layout_no_fallback_to_mds(lseg) ||
-		    ff_layout_has_available_ds(lseg))
+		if (ff_layout_avoid_mds_available_ds(lseg))
 			return -NFS4ERR_RESET_TO_PNFS;
 reset:
 		dprintk("%s Retry through MDS. Error %d\n", __func__,
@@ -1215,8 +1294,6 @@ static int ff_layout_read_done_cb(struct rpc_task *task,
 					hdr->pgio_mirror_idx + 1,
 					&hdr->pgio_mirror_idx))
 			goto out_eagain;
-		set_bit(NFS_LAYOUT_RETURN_REQUESTED,
-			&hdr->lseg->pls_layout->plh_flags);
 		pnfs_read_resend_pnfs(hdr);
 		return task->tk_status;
 	case -NFS4ERR_RESET_TO_MDS:
@@ -1260,7 +1337,7 @@ ff_layout_set_layoutcommit(struct nfs_pgio_header *hdr)
 }
 
 static bool
-ff_layout_reset_to_mds(struct pnfs_layout_segment *lseg, int idx)
+ff_layout_device_unavailable(struct pnfs_layout_segment *lseg, int idx)
 {
 	/* No mirroring for now */
 	struct nfs4_deviceid_node *node = FF_LAYOUT_DEVID_NODE(lseg, idx);
@@ -1297,16 +1374,10 @@ static int ff_layout_read_prepare_common(struct rpc_task *task,
 		rpc_exit(task, -EIO);
 		return -EIO;
 	}
-	if (ff_layout_reset_to_mds(hdr->lseg, hdr->pgio_mirror_idx)) {
-		dprintk("%s task %u reset io to MDS\n", __func__, task->tk_pid);
-		if (ff_layout_has_available_ds(hdr->lseg))
-			pnfs_read_resend_pnfs(hdr);
-		else
-			ff_layout_reset_read(hdr);
-		rpc_exit(task, 0);
+	if (ff_layout_device_unavailable(hdr->lseg, hdr->pgio_mirror_idx)) {
+		rpc_exit(task, -EHOSTDOWN);
 		return -EAGAIN;
 	}
-	hdr->pgio_done_cb = ff_layout_read_done_cb;
 
 	ff_layout_read_record_layoutstats_start(task, hdr);
 	return 0;
@@ -1496,14 +1567,8 @@ static int ff_layout_write_prepare_common(struct rpc_task *task,
 		return -EIO;
 	}
 
-	if (ff_layout_reset_to_mds(hdr->lseg, hdr->pgio_mirror_idx)) {
-		bool retry_pnfs;
-
-		retry_pnfs = ff_layout_has_available_ds(hdr->lseg);
-		dprintk("%s task %u reset io to %s\n", __func__,
-			task->tk_pid, retry_pnfs ? "pNFS" : "MDS");
-		ff_layout_reset_write(hdr, retry_pnfs);
-		rpc_exit(task, 0);
+	if (ff_layout_device_unavailable(hdr->lseg, hdr->pgio_mirror_idx)) {
+		rpc_exit(task, -EHOSTDOWN);
 		return -EAGAIN;
 	}
 
@@ -1712,7 +1777,7 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
 		goto out_failed;
 
 	ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred);
-	if (IS_ERR(ds_cred))
+	if (!ds_cred)
 		goto out_failed;
 
 	vers = nfs4_ff_layout_ds_version(lseg, idx);
@@ -1720,6 +1785,7 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
 	dprintk("%s USE DS: %s cl_count %d vers %d\n", __func__,
 		ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count), vers);
 
+	hdr->pgio_done_cb = ff_layout_read_done_cb;
 	atomic_inc(&ds->ds_clp->cl_count);
 	hdr->ds_clp = ds->ds_clp;
 	fh = nfs4_ff_layout_select_ds_fh(lseg, idx);
@@ -1737,11 +1803,11 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr)
 			  vers == 3 ? &ff_layout_read_call_ops_v3 :
 				      &ff_layout_read_call_ops_v4,
 			  0, RPC_TASK_SOFTCONN);
-
+	put_rpccred(ds_cred);
 	return PNFS_ATTEMPTED;
 
 out_failed:
-	if (ff_layout_has_available_ds(lseg))
+	if (ff_layout_avoid_mds_available_ds(lseg))
 		return PNFS_TRY_AGAIN;
 	return PNFS_NOT_ATTEMPTED;
 }
@@ -1769,7 +1835,7 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
 		return PNFS_NOT_ATTEMPTED;
 
 	ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred);
-	if (IS_ERR(ds_cred))
+	if (!ds_cred)
 		return PNFS_NOT_ATTEMPTED;
 
 	vers = nfs4_ff_layout_ds_version(lseg, idx);
@@ -1798,6 +1864,7 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync)
 			  vers == 3 ? &ff_layout_write_call_ops_v3 :
 				      &ff_layout_write_call_ops_v4,
 			  sync, RPC_TASK_SOFTCONN);
+	put_rpccred(ds_cred);
 	return PNFS_ATTEMPTED;
 }
 
@@ -1824,7 +1891,7 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
 	struct rpc_clnt *ds_clnt;
 	struct rpc_cred *ds_cred;
 	u32 idx;
-	int vers;
+	int vers, ret;
 	struct nfs_fh *fh;
 
 	idx = calc_ds_index_from_commit(lseg, data->ds_commit_index);
@@ -1838,7 +1905,7 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
 		goto out_err;
 
 	ds_cred = ff_layout_get_ds_cred(lseg, idx, data->cred);
-	if (IS_ERR(ds_cred))
+	if (!ds_cred)
 		goto out_err;
 
 	vers = nfs4_ff_layout_ds_version(lseg, idx);
@@ -1854,10 +1921,12 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how)
 	if (fh)
 		data->args.fh = fh;
 
-	return nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops,
+	ret = nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops,
 				   vers == 3 ? &ff_layout_commit_call_ops_v3 :
 					       &ff_layout_commit_call_ops_v4,
 				   how, RPC_TASK_SOFTCONN);
+	put_rpccred(ds_cred);
+	return ret;
 out_err:
 	pnfs_generic_prepare_to_resend_writes(data);
 	pnfs_generic_commit_release(data);
@@ -2223,6 +2292,11 @@ static int __init nfs4flexfilelayout_init(void)
 {
 	printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Registering...\n",
 	       __func__);
+	if (!ff_zero_group) {
+		ff_zero_group = groups_alloc(0);
+		if (!ff_zero_group)
+			return -ENOMEM;
+	}
 	return pnfs_register_layoutdriver(&flexfilelayout_type);
 }
 
@@ -2231,6 +2305,10 @@ static void __exit nfs4flexfilelayout_exit(void)
 	printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Unregistering...\n",
 	       __func__);
 	pnfs_unregister_layoutdriver(&flexfilelayout_type);
+	if (ff_zero_group) {
+		put_group_info(ff_zero_group);
+		ff_zero_group = NULL;
+	}
 }
 
 MODULE_ALIAS("nfs-layouttype4-4");
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.h b/fs/nfs/flexfilelayout/flexfilelayout.h
index dd353bb7dc0a..1bcdb15d0c41 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.h
+++ b/fs/nfs/flexfilelayout/flexfilelayout.h
@@ -10,7 +10,8 @@
 #define FS_NFS_NFS4FLEXFILELAYOUT_H
 
 #define FF_FLAGS_NO_LAYOUTCOMMIT 1
-#define FF_FLAGS_NO_IO_THRU_MDS 2
+#define FF_FLAGS_NO_IO_THRU_MDS  2
+#define FF_FLAGS_NO_READ_IO      4
 
 #include "../pnfs.h"
 
@@ -76,9 +77,8 @@ struct nfs4_ff_layout_mirror {
 	u32				fh_versions_cnt;
 	struct nfs_fh			*fh_versions;
 	nfs4_stateid			stateid;
-	u32				uid;
-	u32				gid;
-	struct rpc_cred			*cred;
+	struct rpc_cred	__rcu		*ro_cred;
+	struct rpc_cred	__rcu		*rw_cred;
 	atomic_t			ref;
 	spinlock_t			lock;
 	struct nfs4_ff_layoutstat	read_stat;
@@ -154,6 +154,12 @@ ff_layout_no_fallback_to_mds(struct pnfs_layout_segment *lseg)
 }
 
 static inline bool
+ff_layout_no_read_on_rw(struct pnfs_layout_segment *lseg)
+{
+	return FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_READ_IO;
+}
+
+static inline bool
 ff_layout_test_devid_unavailable(struct nfs4_deviceid_node *node)
 {
 	return nfs4_test_deviceid_unavailable(node);
@@ -192,4 +198,7 @@ nfs4_ff_find_or_create_ds_client(struct pnfs_layout_segment *lseg,
 struct rpc_cred *ff_layout_get_ds_cred(struct pnfs_layout_segment *lseg,
 				       u32 ds_idx, struct rpc_cred *mdscred);
 bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg);
+bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg);
+bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg);
+
 #endif /* FS_NFS_NFS4FLEXFILELAYOUT_H */
diff --git a/fs/nfs/flexfilelayout/flexfilelayoutdev.c b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
index add0e5a70bd6..0aa36be71fce 100644
--- a/fs/nfs/flexfilelayout/flexfilelayoutdev.c
+++ b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
@@ -228,7 +228,8 @@ ff_ds_error_match(const struct nfs4_ff_layout_ds_err *e1,
 		return e1->opnum < e2->opnum ? -1 : 1;
 	if (e1->status != e2->status)
 		return e1->status < e2->status ? -1 : 1;
-	ret = memcmp(&e1->stateid, &e2->stateid, sizeof(e1->stateid));
+	ret = memcmp(e1->stateid.data, e2->stateid.data,
+			sizeof(e1->stateid.data));
 	if (ret != 0)
 		return ret;
 	ret = memcmp(&e1->deviceid, &e2->deviceid, sizeof(e1->deviceid));
@@ -302,40 +303,26 @@ int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo,
 	return 0;
 }
 
-/* currently we only support AUTH_NONE and AUTH_SYS */
-static rpc_authflavor_t
-nfs4_ff_layout_choose_authflavor(struct nfs4_ff_layout_mirror *mirror)
+static struct rpc_cred *
+ff_layout_get_mirror_cred(struct nfs4_ff_layout_mirror *mirror, u32 iomode)
 {
-	if (mirror->uid == (u32)-1)
-		return RPC_AUTH_NULL;
-	return RPC_AUTH_UNIX;
-}
+	struct rpc_cred *cred, __rcu **pcred;
 
-/* fetch cred for NFSv3 DS */
-static int ff_layout_update_mirror_cred(struct nfs4_ff_layout_mirror *mirror,
-				      struct nfs4_pnfs_ds *ds)
-{
-	if (ds->ds_clp && !mirror->cred &&
-	    mirror->mirror_ds->ds_versions[0].version == 3) {
-		struct rpc_auth *auth = ds->ds_clp->cl_rpcclient->cl_auth;
-		struct rpc_cred *cred;
-		struct auth_cred acred = {
-			.uid = make_kuid(&init_user_ns, mirror->uid),
-			.gid = make_kgid(&init_user_ns, mirror->gid),
-		};
-
-		/* AUTH_NULL ignores acred */
-		cred = auth->au_ops->lookup_cred(auth, &acred, 0);
-		if (IS_ERR(cred)) {
-			dprintk("%s: lookup_cred failed with %ld\n",
-				__func__, PTR_ERR(cred));
-			return PTR_ERR(cred);
-		} else {
-			if (cmpxchg(&mirror->cred, NULL, cred))
-				put_rpccred(cred);
-		}
-	}
-	return 0;
+	if (iomode == IOMODE_READ)
+		pcred = &mirror->ro_cred;
+	else
+		pcred = &mirror->rw_cred;
+
+	rcu_read_lock();
+	do {
+		cred = rcu_dereference(*pcred);
+		if (!cred)
+			break;
+
+		cred = get_rpccred_rcu(cred);
+	} while(!cred);
+	rcu_read_unlock();
+	return cred;
 }
 
 struct nfs_fh *
@@ -356,7 +343,23 @@ out:
 	return fh;
 }
 
-/* Upon return, either ds is connected, or ds is NULL */
+/**
+ * nfs4_ff_layout_prepare_ds - prepare a DS connection for an RPC call
+ * @lseg: the layout segment we're operating on
+ * @ds_idx: index of the DS to use
+ * @fail_return: return layout on connect failure?
+ *
+ * Try to prepare a DS connection to accept an RPC call. This involves
+ * selecting a mirror to use and connecting the client to it if it's not
+ * already connected.
+ *
+ * Since we only need a single functioning mirror to satisfy a read, we don't
+ * want to return the layout if there is one. For writes though, any down
+ * mirror should result in a LAYOUTRETURN. @fail_return is how we distinguish
+ * between the two cases.
+ *
+ * Returns a pointer to a connected DS object on success or NULL on failure.
+ */
 struct nfs4_pnfs_ds *
 nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
 			  bool fail_return)
@@ -367,7 +370,6 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
 	struct inode *ino = lseg->pls_layout->plh_inode;
 	struct nfs_server *s = NFS_SERVER(ino);
 	unsigned int max_payload;
-	rpc_authflavor_t flavor;
 
 	if (!ff_layout_mirror_valid(lseg, mirror)) {
 		pr_err_ratelimited("NFS: %s: No data server for offset index %d\n",
@@ -383,9 +385,7 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
 	/* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */
 	smp_rmb();
 	if (ds->ds_clp)
-		goto out_update_creds;
-
-	flavor = nfs4_ff_layout_choose_authflavor(mirror);
+		goto out;
 
 	/* FIXME: For now we assume the server sent only one version of NFS
 	 * to use for the DS.
@@ -394,7 +394,7 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
 			     dataserver_retrans,
 			     mirror->mirror_ds->ds_versions[0].version,
 			     mirror->mirror_ds->ds_versions[0].minor_version,
-			     flavor);
+			     RPC_AUTH_UNIX);
 
 	/* connect success, check rsize/wsize limit */
 	if (ds->ds_clp) {
@@ -410,20 +410,10 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
 					 mirror, lseg->pls_range.offset,
 					 lseg->pls_range.length, NFS4ERR_NXIO,
 					 OP_ILLEGAL, GFP_NOIO);
-		if (!fail_return) {
-			if (ff_layout_has_available_ds(lseg))
-				set_bit(NFS_LAYOUT_RETURN_REQUESTED,
-					&lseg->pls_layout->plh_flags);
-			else
-				pnfs_error_mark_layout_for_return(ino, lseg);
-		} else
+		if (fail_return || !ff_layout_has_available_ds(lseg))
 			pnfs_error_mark_layout_for_return(ino, lseg);
 		ds = NULL;
-		goto out;
 	}
-out_update_creds:
-	if (ff_layout_update_mirror_cred(mirror, ds))
-		ds = NULL;
 out:
 	return ds;
 }
@@ -433,16 +423,15 @@ ff_layout_get_ds_cred(struct pnfs_layout_segment *lseg, u32 ds_idx,
 		      struct rpc_cred *mdscred)
 {
 	struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
-	struct rpc_cred *cred = ERR_PTR(-EINVAL);
-
-	if (!nfs4_ff_layout_prepare_ds(lseg, ds_idx, true))
-		goto out;
+	struct rpc_cred *cred;
 
-	if (mirror && mirror->cred)
-		cred = mirror->cred;
-	else
-		cred = mdscred;
-out:
+	if (mirror) {
+		cred = ff_layout_get_mirror_cred(mirror, lseg->pls_range.iomode);
+		if (!cred)
+			cred = get_rpccred(mdscred);
+	} else {
+		cred = get_rpccred(mdscred);
+	}
 	return cred;
 }
 
@@ -562,6 +551,18 @@ bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg)
 	return ff_rw_layout_has_available_ds(lseg);
 }
 
+bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg)
+{
+	return ff_layout_no_fallback_to_mds(lseg) ||
+	       ff_layout_has_available_ds(lseg);
+}
+
+bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg)
+{
+	return lseg->pls_range.iomode == IOMODE_RW &&
+	       ff_layout_no_read_on_rw(lseg);
+}
+
 module_param(dataserver_retrans, uint, 0644);
 MODULE_PARM_DESC(dataserver_retrans, "The  number of times the NFSv4.1 client "
 			"retries a request before it attempts further "
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index f1d1d2c472e9..5154fa65a2f2 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -477,6 +477,7 @@ void nfs_mark_request_commit(struct nfs_page *req,
 			     u32 ds_commit_idx);
 int nfs_write_need_commit(struct nfs_pgio_header *);
 void nfs_writeback_update_inode(struct nfs_pgio_header *hdr);
+int nfs_commit_file(struct file *file, struct nfs_write_verifier *verf);
 int nfs_generic_commit_list(struct inode *inode, struct list_head *head,
 			    int how, struct nfs_commit_info *cinfo);
 void nfs_retry_commit(struct list_head *page_list,
diff --git a/fs/nfs/nfs42.h b/fs/nfs/nfs42.h
index b587ccd31083..b6cd15314bab 100644
--- a/fs/nfs/nfs42.h
+++ b/fs/nfs/nfs42.h
@@ -13,6 +13,7 @@
 
 /* nfs4.2proc.c */
 int nfs42_proc_allocate(struct file *, loff_t, loff_t);
+ssize_t nfs42_proc_copy(struct file *, loff_t, struct file *, loff_t, size_t);
 int nfs42_proc_deallocate(struct file *, loff_t, loff_t);
 loff_t nfs42_proc_llseek(struct file *, loff_t, int);
 int nfs42_proc_layoutstats_generic(struct nfs_server *,
diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c
index dff83460e5a6..aa03ed09ba06 100644
--- a/fs/nfs/nfs42proc.c
+++ b/fs/nfs/nfs42proc.c
@@ -126,6 +126,111 @@ int nfs42_proc_deallocate(struct file *filep, loff_t offset, loff_t len)
 	return err;
 }
 
+static ssize_t _nfs42_proc_copy(struct file *src, loff_t pos_src,
+				struct nfs_lock_context *src_lock,
+				struct file *dst, loff_t pos_dst,
+				struct nfs_lock_context *dst_lock,
+				size_t count)
+{
+	struct nfs42_copy_args args = {
+		.src_fh		= NFS_FH(file_inode(src)),
+		.src_pos	= pos_src,
+		.dst_fh		= NFS_FH(file_inode(dst)),
+		.dst_pos	= pos_dst,
+		.count		= count,
+	};
+	struct nfs42_copy_res res;
+	struct rpc_message msg = {
+		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_COPY],
+		.rpc_argp = &args,
+		.rpc_resp = &res,
+	};
+	struct inode *dst_inode = file_inode(dst);
+	struct nfs_server *server = NFS_SERVER(dst_inode);
+	int status;
+
+	status = nfs4_set_rw_stateid(&args.src_stateid, src_lock->open_context,
+				     src_lock, FMODE_READ);
+	if (status)
+		return status;
+
+	status = nfs4_set_rw_stateid(&args.dst_stateid, dst_lock->open_context,
+				     dst_lock, FMODE_WRITE);
+	if (status)
+		return status;
+
+	status = nfs4_call_sync(server->client, server, &msg,
+				&args.seq_args, &res.seq_res, 0);
+	if (status == -ENOTSUPP)
+		server->caps &= ~NFS_CAP_COPY;
+	if (status)
+		return status;
+
+	if (res.write_res.verifier.committed != NFS_FILE_SYNC) {
+		status = nfs_commit_file(dst, &res.write_res.verifier.verifier);
+		if (status)
+			return status;
+	}
+
+	truncate_pagecache_range(dst_inode, pos_dst,
+				 pos_dst + res.write_res.count);
+
+	return res.write_res.count;
+}
+
+ssize_t nfs42_proc_copy(struct file *src, loff_t pos_src,
+			struct file *dst, loff_t pos_dst,
+			size_t count)
+{
+	struct nfs_server *server = NFS_SERVER(file_inode(dst));
+	struct nfs_lock_context *src_lock;
+	struct nfs_lock_context *dst_lock;
+	struct nfs4_exception src_exception = { };
+	struct nfs4_exception dst_exception = { };
+	ssize_t err, err2;
+
+	if (!nfs_server_capable(file_inode(dst), NFS_CAP_COPY))
+		return -EOPNOTSUPP;
+
+	src_lock = nfs_get_lock_context(nfs_file_open_context(src));
+	if (IS_ERR(src_lock))
+		return PTR_ERR(src_lock);
+
+	src_exception.inode = file_inode(src);
+	src_exception.state = src_lock->open_context->state;
+
+	dst_lock = nfs_get_lock_context(nfs_file_open_context(dst));
+	if (IS_ERR(dst_lock)) {
+		err = PTR_ERR(dst_lock);
+		goto out_put_src_lock;
+	}
+
+	dst_exception.inode = file_inode(dst);
+	dst_exception.state = dst_lock->open_context->state;
+
+	do {
+		inode_lock(file_inode(dst));
+		err = _nfs42_proc_copy(src, pos_src, src_lock,
+				       dst, pos_dst, dst_lock, count);
+		inode_unlock(file_inode(dst));
+
+		if (err == -ENOTSUPP) {
+			err = -EOPNOTSUPP;
+			break;
+		}
+
+		err2 = nfs4_handle_exception(server, err, &src_exception);
+		err  = nfs4_handle_exception(server, err, &dst_exception);
+		if (!err)
+			err = err2;
+	} while (src_exception.retry || dst_exception.retry);
+
+	nfs_put_lock_context(dst_lock);
+out_put_src_lock:
+	nfs_put_lock_context(src_lock);
+	return err;
+}
+
 static loff_t _nfs42_proc_llseek(struct file *filep,
 		struct nfs_lock_context *lock, loff_t offset, int whence)
 {
@@ -232,7 +337,7 @@ nfs42_layoutstat_done(struct rpc_task *task, void *calldata)
 			 * with the current stateid.
 			 */
 			set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-			pnfs_mark_matching_lsegs_invalid(lo, &head, NULL);
+			pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0);
 			spin_unlock(&inode->i_lock);
 			pnfs_free_lseg_list(&head);
 		} else
diff --git a/fs/nfs/nfs42xdr.c b/fs/nfs/nfs42xdr.c
index 0ca482a51e53..6dc6f2aea0d6 100644
--- a/fs/nfs/nfs42xdr.c
+++ b/fs/nfs/nfs42xdr.c
@@ -9,9 +9,22 @@
 #define encode_fallocate_maxsz		(encode_stateid_maxsz + \
 					 2 /* offset */ + \
 					 2 /* length */)
+#define NFS42_WRITE_RES_SIZE		(1 /* wr_callback_id size */ +\
+					 XDR_QUADLEN(NFS4_STATEID_SIZE) + \
+					 2 /* wr_count */ + \
+					 1 /* wr_committed */ + \
+					 XDR_QUADLEN(NFS4_VERIFIER_SIZE))
 #define encode_allocate_maxsz		(op_encode_hdr_maxsz + \
 					 encode_fallocate_maxsz)
 #define decode_allocate_maxsz		(op_decode_hdr_maxsz)
+#define encode_copy_maxsz		(op_encode_hdr_maxsz +          \
+					 XDR_QUADLEN(NFS4_STATEID_SIZE) + \
+					 XDR_QUADLEN(NFS4_STATEID_SIZE) + \
+					 2 + 2 + 2 + 1 + 1 + 1)
+#define decode_copy_maxsz		(op_decode_hdr_maxsz + \
+					 NFS42_WRITE_RES_SIZE + \
+					 1 /* cr_consecutive */ + \
+					 1 /* cr_synchronous */)
 #define encode_deallocate_maxsz		(op_encode_hdr_maxsz + \
 					 encode_fallocate_maxsz)
 #define decode_deallocate_maxsz		(op_decode_hdr_maxsz)
@@ -49,6 +62,16 @@
 					 decode_putfh_maxsz + \
 					 decode_allocate_maxsz + \
 					 decode_getattr_maxsz)
+#define NFS4_enc_copy_sz		(compound_encode_hdr_maxsz + \
+					 encode_putfh_maxsz + \
+					 encode_savefh_maxsz + \
+					 encode_putfh_maxsz + \
+					 encode_copy_maxsz)
+#define NFS4_dec_copy_sz		(compound_decode_hdr_maxsz + \
+					 decode_putfh_maxsz + \
+					 decode_savefh_maxsz + \
+					 decode_putfh_maxsz + \
+					 decode_copy_maxsz)
 #define NFS4_enc_deallocate_sz		(compound_encode_hdr_maxsz + \
 					 encode_putfh_maxsz + \
 					 encode_deallocate_maxsz + \
@@ -102,6 +125,23 @@ static void encode_allocate(struct xdr_stream *xdr,
 	encode_fallocate(xdr, args);
 }
 
+static void encode_copy(struct xdr_stream *xdr,
+			struct nfs42_copy_args *args,
+			struct compound_hdr *hdr)
+{
+	encode_op_hdr(xdr, OP_COPY, decode_copy_maxsz, hdr);
+	encode_nfs4_stateid(xdr, &args->src_stateid);
+	encode_nfs4_stateid(xdr, &args->dst_stateid);
+
+	encode_uint64(xdr, args->src_pos);
+	encode_uint64(xdr, args->dst_pos);
+	encode_uint64(xdr, args->count);
+
+	encode_uint32(xdr, 1); /* consecutive = true */
+	encode_uint32(xdr, 1); /* synchronous = true */
+	encode_uint32(xdr, 0); /* src server list */
+}
+
 static void encode_deallocate(struct xdr_stream *xdr,
 			      struct nfs42_falloc_args *args,
 			      struct compound_hdr *hdr)
@@ -182,6 +222,26 @@ static void nfs4_xdr_enc_allocate(struct rpc_rqst *req,
 }
 
 /*
+ * Encode COPY request
+ */
+static void nfs4_xdr_enc_copy(struct rpc_rqst *req,
+			      struct xdr_stream *xdr,
+			      struct nfs42_copy_args *args)
+{
+	struct compound_hdr hdr = {
+		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
+	};
+
+	encode_compound_hdr(xdr, req, &hdr);
+	encode_sequence(xdr, &args->seq_args, &hdr);
+	encode_putfh(xdr, args->src_fh, &hdr);
+	encode_savefh(xdr, &hdr);
+	encode_putfh(xdr, args->dst_fh, &hdr);
+	encode_copy(xdr, args, &hdr);
+	encode_nops(&hdr);
+}
+
+/*
  * Encode DEALLOCATE request
  */
 static void nfs4_xdr_enc_deallocate(struct rpc_rqst *req,
@@ -266,6 +326,62 @@ static int decode_allocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res)
 	return decode_op_hdr(xdr, OP_ALLOCATE);
 }
 
+static int decode_write_response(struct xdr_stream *xdr,
+				 struct nfs42_write_res *res)
+{
+	__be32 *p;
+	int stateids;
+
+	p = xdr_inline_decode(xdr, 4 + 8 + 4);
+	if (unlikely(!p))
+		goto out_overflow;
+
+	stateids = be32_to_cpup(p++);
+	p = xdr_decode_hyper(p, &res->count);
+	res->verifier.committed = be32_to_cpup(p);
+	return decode_verifier(xdr, &res->verifier.verifier);
+
+out_overflow:
+	print_overflow_msg(__func__, xdr);
+	return -EIO;
+}
+
+static int decode_copy_requirements(struct xdr_stream *xdr,
+				    struct nfs42_copy_res *res) {
+	__be32 *p;
+
+	p = xdr_inline_decode(xdr, 4 + 4);
+	if (unlikely(!p))
+		goto out_overflow;
+
+	res->consecutive = be32_to_cpup(p++);
+	res->synchronous = be32_to_cpup(p++);
+	return 0;
+out_overflow:
+	print_overflow_msg(__func__, xdr);
+	return -EIO;
+}
+
+static int decode_copy(struct xdr_stream *xdr, struct nfs42_copy_res *res)
+{
+	int status;
+
+	status = decode_op_hdr(xdr, OP_COPY);
+	if (status == NFS4ERR_OFFLOAD_NO_REQS) {
+		status = decode_copy_requirements(xdr, res);
+		if (status)
+			return status;
+		return NFS4ERR_OFFLOAD_NO_REQS;
+	} else if (status)
+		return status;
+
+	status = decode_write_response(xdr, &res->write_res);
+	if (status)
+		return status;
+
+	return decode_copy_requirements(xdr, res);
+}
+
 static int decode_deallocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res)
 {
 	return decode_op_hdr(xdr, OP_DEALLOCATE);
@@ -331,6 +447,36 @@ out:
 }
 
 /*
+ * Decode COPY response
+ */
+static int nfs4_xdr_dec_copy(struct rpc_rqst *rqstp,
+			     struct xdr_stream *xdr,
+			     struct nfs42_copy_res *res)
+{
+	struct compound_hdr hdr;
+	int status;
+
+	status = decode_compound_hdr(xdr, &hdr);
+	if (status)
+		goto out;
+	status = decode_sequence(xdr, &res->seq_res, rqstp);
+	if (status)
+		goto out;
+	status = decode_putfh(xdr);
+	if (status)
+		goto out;
+	status = decode_savefh(xdr);
+	if (status)
+		goto out;
+	status = decode_putfh(xdr);
+	if (status)
+		goto out;
+	status = decode_copy(xdr, res);
+out:
+	return status;
+}
+
+/*
  * Decode DEALLOCATE request
  */
 static int nfs4_xdr_dec_deallocate(struct rpc_rqst *rqstp,
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 4afdee420d25..768456fa1b17 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -438,8 +438,9 @@ extern void nfs41_handle_server_scope(struct nfs_client *,
 				      struct nfs41_server_scope **);
 extern void nfs4_put_lock_state(struct nfs4_lock_state *lsp);
 extern int nfs4_set_lock_state(struct nfs4_state *state, struct file_lock *fl);
-extern int nfs4_select_rw_stateid(nfs4_stateid *, struct nfs4_state *,
-		fmode_t, const struct nfs_lockowner *);
+extern int nfs4_select_rw_stateid(struct nfs4_state *, fmode_t,
+		const struct nfs_lockowner *, nfs4_stateid *,
+		struct rpc_cred **);
 
 extern struct nfs_seqid *nfs_alloc_seqid(struct nfs_seqid_counter *counter, gfp_t gfp_mask);
 extern int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task);
@@ -496,12 +497,15 @@ extern struct svc_version nfs4_callback_version4;
 
 static inline void nfs4_stateid_copy(nfs4_stateid *dst, const nfs4_stateid *src)
 {
-	memcpy(dst, src, sizeof(*dst));
+	memcpy(dst->data, src->data, sizeof(dst->data));
+	dst->type = src->type;
 }
 
 static inline bool nfs4_stateid_match(const nfs4_stateid *dst, const nfs4_stateid *src)
 {
-	return memcmp(dst, src, sizeof(*dst)) == 0;
+	if (dst->type != src->type)
+		return false;
+	return memcmp(dst->data, src->data, sizeof(dst->data)) == 0;
 }
 
 static inline bool nfs4_stateid_match_other(const nfs4_stateid *dst, const nfs4_stateid *src)
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index d0390516467c..014b0e41ace5 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -129,6 +129,28 @@ nfs4_file_flush(struct file *file, fl_owner_t id)
 }
 
 #ifdef CONFIG_NFS_V4_2
+static ssize_t nfs4_copy_file_range(struct file *file_in, loff_t pos_in,
+				    struct file *file_out, loff_t pos_out,
+				    size_t count, unsigned int flags)
+{
+	struct inode *in_inode = file_inode(file_in);
+	struct inode *out_inode = file_inode(file_out);
+	int ret;
+
+	if (in_inode == out_inode)
+		return -EINVAL;
+
+	/* flush any pending writes */
+	ret = nfs_sync_inode(in_inode);
+	if (ret)
+		return ret;
+	ret = nfs_sync_inode(out_inode);
+	if (ret)
+		return ret;
+
+	return nfs42_proc_copy(file_in, pos_in, file_out, pos_out, count);
+}
+
 static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence)
 {
 	loff_t ret;
@@ -243,6 +265,7 @@ const struct file_operations nfs4_file_operations = {
 	.check_flags	= nfs_check_flags,
 	.setlease	= simple_nosetlease,
 #ifdef CONFIG_NFS_V4_2
+	.copy_file_range = nfs4_copy_file_range,
 	.llseek		= nfs4_file_llseek,
 	.fallocate	= nfs42_fallocate,
 	.clone_file_range = nfs42_clone_file_range,
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 084e8570da18..223982eb38c9 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -74,6 +74,17 @@
 #define NFS4_POLL_RETRY_MIN	(HZ/10)
 #define NFS4_POLL_RETRY_MAX	(15*HZ)
 
+/* file attributes which can be mapped to nfs attributes */
+#define NFS4_VALID_ATTRS (ATTR_MODE \
+	| ATTR_UID \
+	| ATTR_GID \
+	| ATTR_SIZE \
+	| ATTR_ATIME \
+	| ATTR_MTIME \
+	| ATTR_CTIME \
+	| ATTR_ATIME_SET \
+	| ATTR_MTIME_SET)
+
 struct nfs4_opendata;
 static int _nfs4_proc_open(struct nfs4_opendata *data);
 static int _nfs4_recover_proc_open(struct nfs4_opendata *data);
@@ -416,6 +427,7 @@ static int nfs4_do_handle_exception(struct nfs_server *server,
 		case -NFS4ERR_DELAY:
 			nfs_inc_server_stats(server, NFSIOS_DELAY);
 		case -NFS4ERR_GRACE:
+		case -NFS4ERR_RECALLCONFLICT:
 			exception->delay = 1;
 			return 0;
 
@@ -2558,15 +2570,20 @@ static int _nfs4_do_open(struct inode *dir,
 	if ((opendata->o_arg.open_flags & (O_CREAT|O_EXCL)) == (O_CREAT|O_EXCL) &&
 	    (opendata->o_arg.createmode != NFS4_CREATE_GUARDED)) {
 		nfs4_exclusive_attrset(opendata, sattr, &label);
-
-		nfs_fattr_init(opendata->o_res.f_attr);
-		status = nfs4_do_setattr(state->inode, cred,
-				opendata->o_res.f_attr, sattr,
-				state, label, olabel);
-		if (status == 0) {
-			nfs_setattr_update_inode(state->inode, sattr,
-					opendata->o_res.f_attr);
-			nfs_setsecurity(state->inode, opendata->o_res.f_attr, olabel);
+		/*
+		 * send create attributes which was not set by open
+		 * with an extra setattr.
+		 */
+		if (sattr->ia_valid & NFS4_VALID_ATTRS) {
+			nfs_fattr_init(opendata->o_res.f_attr);
+			status = nfs4_do_setattr(state->inode, cred,
+					opendata->o_res.f_attr, sattr,
+					state, label, olabel);
+			if (status == 0) {
+				nfs_setattr_update_inode(state->inode, sattr,
+						opendata->o_res.f_attr);
+				nfs_setsecurity(state->inode, opendata->o_res.f_attr, olabel);
+			}
 		}
 	}
 	if (opened && opendata->file_created)
@@ -2676,6 +2693,7 @@ static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
 		.rpc_resp	= &res,
 		.rpc_cred	= cred,
         };
+	struct rpc_cred *delegation_cred = NULL;
 	unsigned long timestamp = jiffies;
 	fmode_t fmode;
 	bool truncate;
@@ -2691,7 +2709,7 @@ static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
 	truncate = (sattr->ia_valid & ATTR_SIZE) ? true : false;
 	fmode = truncate ? FMODE_WRITE : FMODE_READ;
 
-	if (nfs4_copy_delegation_stateid(&arg.stateid, inode, fmode)) {
+	if (nfs4_copy_delegation_stateid(inode, fmode, &arg.stateid, &delegation_cred)) {
 		/* Use that stateid */
 	} else if (truncate && state != NULL) {
 		struct nfs_lockowner lockowner = {
@@ -2700,13 +2718,17 @@ static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
 		};
 		if (!nfs4_valid_open_stateid(state))
 			return -EBADF;
-		if (nfs4_select_rw_stateid(&arg.stateid, state, FMODE_WRITE,
-				&lockowner) == -EIO)
+		if (nfs4_select_rw_stateid(state, FMODE_WRITE, &lockowner,
+				&arg.stateid, &delegation_cred) == -EIO)
 			return -EBADF;
 	} else
 		nfs4_stateid_copy(&arg.stateid, &zero_stateid);
+	if (delegation_cred)
+		msg.rpc_cred = delegation_cred;
 
 	status = nfs4_call_sync(server->client, server, &msg, &arg.seq_args, &res.seq_res, 1);
+
+	put_rpccred(delegation_cred);
 	if (status == 0 && state != NULL)
 		renew_lease(server, timestamp);
 	trace_nfs4_setattr(inode, &arg.stateid, status);
@@ -4285,7 +4307,7 @@ int nfs4_set_rw_stateid(nfs4_stateid *stateid,
 
 	if (l_ctx != NULL)
 		lockowner = &l_ctx->lockowner;
-	return nfs4_select_rw_stateid(stateid, ctx->state, fmode, lockowner);
+	return nfs4_select_rw_stateid(ctx->state, fmode, lockowner, stateid, NULL);
 }
 EXPORT_SYMBOL_GPL(nfs4_set_rw_stateid);
 
@@ -6054,6 +6076,7 @@ static int nfs41_lock_expired(struct nfs4_state *state, struct file_lock *reques
 static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock *request)
 {
 	struct nfs_inode *nfsi = NFS_I(state->inode);
+	struct nfs4_state_owner *sp = state->owner;
 	unsigned char fl_flags = request->fl_flags;
 	int status = -ENOLCK;
 
@@ -6068,6 +6091,7 @@ static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock
 	status = do_vfs_lock(state->inode, request);
 	if (status < 0)
 		goto out;
+	mutex_lock(&sp->so_delegreturn_mutex);
 	down_read(&nfsi->rwsem);
 	if (test_bit(NFS_DELEGATED_STATE, &state->flags)) {
 		/* Yes: cache locks! */
@@ -6075,9 +6099,11 @@ static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock
 		request->fl_flags = fl_flags & ~FL_SLEEP;
 		status = do_vfs_lock(state->inode, request);
 		up_read(&nfsi->rwsem);
+		mutex_unlock(&sp->so_delegreturn_mutex);
 		goto out;
 	}
 	up_read(&nfsi->rwsem);
+	mutex_unlock(&sp->so_delegreturn_mutex);
 	status = _nfs4_do_setlk(state, cmd, request, NFS_LOCK_NEW);
 out:
 	request->fl_flags = fl_flags;
@@ -7351,9 +7377,11 @@ int nfs4_proc_get_lease_time(struct nfs_client *clp, struct nfs_fsinfo *fsinfo)
  * always set csa_cachethis to FALSE because the current implementation
  * of the back channel DRC only supports caching the CB_SEQUENCE operation.
  */
-static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args)
+static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
+				    struct rpc_clnt *clnt)
 {
 	unsigned int max_rqst_sz, max_resp_sz;
+	unsigned int max_bc_payload = rpc_max_bc_payload(clnt);
 
 	max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead;
 	max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead;
@@ -7371,8 +7399,8 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args)
 		args->fc_attrs.max_ops, args->fc_attrs.max_reqs);
 
 	/* Back channel attributes */
-	args->bc_attrs.max_rqst_sz = PAGE_SIZE;
-	args->bc_attrs.max_resp_sz = PAGE_SIZE;
+	args->bc_attrs.max_rqst_sz = max_bc_payload;
+	args->bc_attrs.max_resp_sz = max_bc_payload;
 	args->bc_attrs.max_resp_sz_cached = 0;
 	args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
 	args->bc_attrs.max_reqs = NFS41_BC_MAX_CALLBACKS;
@@ -7476,7 +7504,7 @@ static int _nfs4_proc_create_session(struct nfs_client *clp,
 	};
 	int status;
 
-	nfs4_init_channel_attrs(&args);
+	nfs4_init_channel_attrs(&args, clp->cl_rpcclient);
 	args.flags = (SESSION4_PERSIST | SESSION4_BACK_CHAN);
 
 	status = rpc_call_sync(session->clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
@@ -7820,40 +7848,34 @@ nfs4_layoutget_prepare(struct rpc_task *task, void *calldata)
 	struct nfs4_layoutget *lgp = calldata;
 	struct nfs_server *server = NFS_SERVER(lgp->args.inode);
 	struct nfs4_session *session = nfs4_get_session(server);
-	int ret;
 
 	dprintk("--> %s\n", __func__);
-	/* Note the is a race here, where a CB_LAYOUTRECALL can come in
-	 * right now covering the LAYOUTGET we are about to send.
-	 * However, that is not so catastrophic, and there seems
-	 * to be no way to prevent it completely.
-	 */
-	if (nfs41_setup_sequence(session, &lgp->args.seq_args,
-				&lgp->res.seq_res, task))
-		return;
-	ret = pnfs_choose_layoutget_stateid(&lgp->args.stateid,
-					  NFS_I(lgp->args.inode)->layout,
-					  &lgp->args.range,
-					  lgp->args.ctx->state);
-	if (ret < 0)
-		rpc_exit(task, ret);
+	nfs41_setup_sequence(session, &lgp->args.seq_args,
+				&lgp->res.seq_res, task);
+	dprintk("<-- %s\n", __func__);
 }
 
 static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
 {
 	struct nfs4_layoutget *lgp = calldata;
+
+	dprintk("--> %s\n", __func__);
+	nfs41_sequence_done(task, &lgp->res.seq_res);
+	dprintk("<-- %s\n", __func__);
+}
+
+static int
+nfs4_layoutget_handle_exception(struct rpc_task *task,
+		struct nfs4_layoutget *lgp, struct nfs4_exception *exception)
+{
 	struct inode *inode = lgp->args.inode;
 	struct nfs_server *server = NFS_SERVER(inode);
 	struct pnfs_layout_hdr *lo;
-	struct nfs4_state *state = NULL;
-	unsigned long timeo, now, giveup;
+	int status = task->tk_status;
 
 	dprintk("--> %s tk_status => %d\n", __func__, -task->tk_status);
 
-	if (!nfs41_sequence_done(task, &lgp->res.seq_res))
-		goto out;
-
-	switch (task->tk_status) {
+	switch (status) {
 	case 0:
 		goto out;
 
@@ -7863,57 +7885,43 @@ static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
 	 * retry go inband.
 	 */
 	case -NFS4ERR_LAYOUTUNAVAILABLE:
-		task->tk_status = -ENODATA;
+		status = -ENODATA;
 		goto out;
 	/*
 	 * NFS4ERR_BADLAYOUT means the MDS cannot return a layout of
 	 * length lgp->args.minlength != 0 (see RFC5661 section 18.43.3).
 	 */
 	case -NFS4ERR_BADLAYOUT:
-		goto out_overflow;
+		status = -EOVERFLOW;
+		goto out;
 	/*
 	 * NFS4ERR_LAYOUTTRYLATER is a conflict with another client
 	 * (or clients) writing to the same RAID stripe except when
 	 * the minlength argument is 0 (see RFC5661 section 18.43.3).
+	 *
+	 * Treat it like we would RECALLCONFLICT -- we retry for a little
+	 * while, and then eventually give up.
 	 */
 	case -NFS4ERR_LAYOUTTRYLATER:
-		if (lgp->args.minlength == 0)
-			goto out_overflow;
-	/*
-	 * NFS4ERR_RECALLCONFLICT is when conflict with self (must recall
-	 * existing layout before getting a new one).
-	 */
-	case -NFS4ERR_RECALLCONFLICT:
-		timeo = rpc_get_timeout(task->tk_client);
-		giveup = lgp->args.timestamp + timeo;
-		now = jiffies;
-		if (time_after(giveup, now)) {
-			unsigned long delay;
-
-			/* Delay for:
-			 * - Not less then NFS4_POLL_RETRY_MIN.
-			 * - One last time a jiffie before we give up
-			 * - exponential backoff (time_now minus start_attempt)
-			 */
-			delay = max_t(unsigned long, NFS4_POLL_RETRY_MIN,
-				    min((giveup - now - 1),
-					now - lgp->args.timestamp));
-
-			dprintk("%s: NFS4ERR_RECALLCONFLICT waiting %lu\n",
-				__func__, delay);
-			rpc_delay(task, delay);
-			/* Do not call nfs4_async_handle_error() */
-			goto out_restart;
+		if (lgp->args.minlength == 0) {
+			status = -EOVERFLOW;
+			goto out;
 		}
-		break;
+		/* Fallthrough */
+	case -NFS4ERR_RECALLCONFLICT:
+		nfs4_handle_exception(server, -NFS4ERR_RECALLCONFLICT,
+					exception);
+		status = -ERECALLCONFLICT;
+		goto out;
 	case -NFS4ERR_EXPIRED:
 	case -NFS4ERR_BAD_STATEID:
+		exception->timeout = 0;
 		spin_lock(&inode->i_lock);
 		if (nfs4_stateid_match(&lgp->args.stateid,
 					&lgp->args.ctx->state->stateid)) {
 			spin_unlock(&inode->i_lock);
 			/* If the open stateid was bad, then recover it. */
-			state = lgp->args.ctx->state;
+			exception->state = lgp->args.ctx->state;
 			break;
 		}
 		lo = NFS_I(inode)->layout;
@@ -7926,25 +7934,21 @@ static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
 			 * with the current stateid.
 			 */
 			set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-			pnfs_mark_matching_lsegs_invalid(lo, &head, NULL);
+			pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0);
 			spin_unlock(&inode->i_lock);
 			pnfs_free_lseg_list(&head);
 		} else
 			spin_unlock(&inode->i_lock);
-		goto out_restart;
+		status = -EAGAIN;
+		goto out;
 	}
-	if (nfs4_async_handle_error(task, server, state, &lgp->timeout) == -EAGAIN)
-		goto out_restart;
+
+	status = nfs4_handle_exception(server, status, exception);
+	if (exception->retry)
+		status = -EAGAIN;
 out:
 	dprintk("<-- %s\n", __func__);
-	return;
-out_restart:
-	task->tk_status = 0;
-	rpc_restart_call_prepare(task);
-	return;
-out_overflow:
-	task->tk_status = -EOVERFLOW;
-	goto out;
+	return status;
 }
 
 static size_t max_response_pages(struct nfs_server *server)
@@ -8013,7 +8017,7 @@ static const struct rpc_call_ops nfs4_layoutget_call_ops = {
 };
 
 struct pnfs_layout_segment *
-nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags)
+nfs4_proc_layoutget(struct nfs4_layoutget *lgp, long *timeout, gfp_t gfp_flags)
 {
 	struct inode *inode = lgp->args.inode;
 	struct nfs_server *server = NFS_SERVER(inode);
@@ -8033,6 +8037,7 @@ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags)
 		.flags = RPC_TASK_ASYNC,
 	};
 	struct pnfs_layout_segment *lseg = NULL;
+	struct nfs4_exception exception = { .timeout = *timeout };
 	int status = 0;
 
 	dprintk("--> %s\n", __func__);
@@ -8046,7 +8051,6 @@ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags)
 		return ERR_PTR(-ENOMEM);
 	}
 	lgp->args.layout.pglen = max_pages * PAGE_SIZE;
-	lgp->args.timestamp = jiffies;
 
 	lgp->res.layoutp = &lgp->args.layout;
 	lgp->res.seq_res.sr_slot = NULL;
@@ -8056,13 +8060,17 @@ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags)
 	if (IS_ERR(task))
 		return ERR_CAST(task);
 	status = nfs4_wait_for_completion_rpc_task(task);
-	if (status == 0)
-		status = task->tk_status;
+	if (status == 0) {
+		status = nfs4_layoutget_handle_exception(task, lgp, &exception);
+		*timeout = exception.timeout;
+	}
+
 	trace_nfs4_layoutget(lgp->args.ctx,
 			&lgp->args.range,
 			&lgp->res.range,
 			&lgp->res.stateid,
 			status);
+
 	/* if layoutp->len is 0, nfs4_layoutget_prepare called rpc_exit */
 	if (status == 0 && lgp->res.layoutp->len)
 		lseg = pnfs_layout_process(lgp);
@@ -8118,7 +8126,8 @@ static void nfs4_layoutreturn_release(void *calldata)
 
 	dprintk("--> %s\n", __func__);
 	spin_lock(&lo->plh_inode->i_lock);
-	pnfs_mark_matching_lsegs_invalid(lo, &freeme, &lrp->args.range);
+	pnfs_mark_matching_lsegs_invalid(lo, &freeme, &lrp->args.range,
+			be32_to_cpu(lrp->args.stateid.seqid));
 	pnfs_mark_layout_returned_if_empty(lo);
 	if (lrp->res.lrs_present)
 		pnfs_set_layout_stateid(lo, &lrp->res.stateid, true);
@@ -8653,6 +8662,9 @@ nfs41_free_lock_state(struct nfs_server *server, struct nfs4_lock_state *lsp)
 static bool nfs41_match_stateid(const nfs4_stateid *s1,
 		const nfs4_stateid *s2)
 {
+	if (s1->type != s2->type)
+		return false;
+
 	if (memcmp(s1->other, s2->other, sizeof(s1->other)) != 0)
 		return false;
 
@@ -8793,6 +8805,7 @@ static const struct nfs4_minor_version_ops nfs_v4_2_minor_ops = {
 		| NFS_CAP_STATEID_NFSV41
 		| NFS_CAP_ATOMIC_OPEN_V1
 		| NFS_CAP_ALLOCATE
+		| NFS_CAP_COPY
 		| NFS_CAP_DEALLOCATE
 		| NFS_CAP_SEEK
 		| NFS_CAP_LAYOUTSTATS
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index d854693a15b0..5075592df145 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -65,7 +65,10 @@
 
 #define OPENOWNER_POOL_SIZE	8
 
-const nfs4_stateid zero_stateid;
+const nfs4_stateid zero_stateid = {
+	.data = { 0 },
+	.type = NFS4_SPECIAL_STATEID_TYPE,
+};
 static DEFINE_MUTEX(nfs_clid_init_mutex);
 
 int nfs4_init_clientid(struct nfs_client *clp, struct rpc_cred *cred)
@@ -985,15 +988,20 @@ static void nfs4_copy_open_stateid(nfs4_stateid *dst, struct nfs4_state *state)
  * Byte-range lock aware utility to initialize the stateid of read/write
  * requests.
  */
-int nfs4_select_rw_stateid(nfs4_stateid *dst, struct nfs4_state *state,
-		fmode_t fmode, const struct nfs_lockowner *lockowner)
+int nfs4_select_rw_stateid(struct nfs4_state *state,
+		fmode_t fmode, const struct nfs_lockowner *lockowner,
+		nfs4_stateid *dst, struct rpc_cred **cred)
 {
-	int ret = nfs4_copy_lock_stateid(dst, state, lockowner);
+	int ret;
+
+	if (cred != NULL)
+		*cred = NULL;
+	ret = nfs4_copy_lock_stateid(dst, state, lockowner);
 	if (ret == -EIO)
 		/* A lost lock - don't even consider delegations */
 		goto out;
 	/* returns true if delegation stateid found and copied */
-	if (nfs4_copy_delegation_stateid(dst, state->inode, fmode)) {
+	if (nfs4_copy_delegation_stateid(state->inode, fmode, dst, cred)) {
 		ret = 0;
 		goto out;
 	}
diff --git a/fs/nfs/nfs4trace.h b/fs/nfs/nfs4trace.h
index 2c8d05dae5b1..9c150b153782 100644
--- a/fs/nfs/nfs4trace.h
+++ b/fs/nfs/nfs4trace.h
@@ -1520,6 +1520,8 @@ DEFINE_NFS4_INODE_EVENT(nfs4_layoutreturn_on_close);
 		{ PNFS_UPDATE_LAYOUT_FOUND_CACHED, "found cached" },	\
 		{ PNFS_UPDATE_LAYOUT_RETURN, "layoutreturn" },		\
 		{ PNFS_UPDATE_LAYOUT_BLOCKED, "layouts blocked" },	\
+		{ PNFS_UPDATE_LAYOUT_INVALID_OPEN, "invalid open" },	\
+		{ PNFS_UPDATE_LAYOUT_RETRY, "retrying" },	\
 		{ PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET, "sent layoutget" })
 
 TRACE_EVENT(pnfs_update_layout,
@@ -1528,9 +1530,10 @@ TRACE_EVENT(pnfs_update_layout,
 			u64 count,
 			enum pnfs_iomode iomode,
 			struct pnfs_layout_hdr *lo,
+			struct pnfs_layout_segment *lseg,
 			enum pnfs_update_layout_reason reason
 		),
-		TP_ARGS(inode, pos, count, iomode, lo, reason),
+		TP_ARGS(inode, pos, count, iomode, lo, lseg, reason),
 		TP_STRUCT__entry(
 			__field(dev_t, dev)
 			__field(u64, fileid)
@@ -1540,6 +1543,7 @@ TRACE_EVENT(pnfs_update_layout,
 			__field(enum pnfs_iomode, iomode)
 			__field(int, layoutstateid_seq)
 			__field(u32, layoutstateid_hash)
+			__field(long, lseg)
 			__field(enum pnfs_update_layout_reason, reason)
 		),
 		TP_fast_assign(
@@ -1559,11 +1563,12 @@ TRACE_EVENT(pnfs_update_layout,
 				__entry->layoutstateid_seq = 0;
 				__entry->layoutstateid_hash = 0;
 			}
+			__entry->lseg = (long)lseg;
 		),
 		TP_printk(
 			"fileid=%02x:%02x:%llu fhandle=0x%08x "
 			"iomode=%s pos=%llu count=%llu "
-			"layoutstateid=%d:0x%08x (%s)",
+			"layoutstateid=%d:0x%08x lseg=0x%lx (%s)",
 			MAJOR(__entry->dev), MINOR(__entry->dev),
 			(unsigned long long)__entry->fileid,
 			__entry->fhandle,
@@ -1571,6 +1576,7 @@ TRACE_EVENT(pnfs_update_layout,
 			(unsigned long long)__entry->pos,
 			(unsigned long long)__entry->count,
 			__entry->layoutstateid_seq, __entry->layoutstateid_hash,
+			__entry->lseg,
 			show_pnfs_update_layout_reason(__entry->reason)
 		)
 );
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 88474a4fc669..661e753fe1c9 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -4270,6 +4270,24 @@ static int decode_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
 	return decode_opaque_fixed(xdr, stateid, NFS4_STATEID_SIZE);
 }
 
+static int decode_open_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
+{
+	stateid->type = NFS4_OPEN_STATEID_TYPE;
+	return decode_stateid(xdr, stateid);
+}
+
+static int decode_lock_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
+{
+	stateid->type = NFS4_LOCK_STATEID_TYPE;
+	return decode_stateid(xdr, stateid);
+}
+
+static int decode_delegation_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
+{
+	stateid->type = NFS4_DELEGATION_STATEID_TYPE;
+	return decode_stateid(xdr, stateid);
+}
+
 static int decode_close(struct xdr_stream *xdr, struct nfs_closeres *res)
 {
 	int status;
@@ -4278,7 +4296,7 @@ static int decode_close(struct xdr_stream *xdr, struct nfs_closeres *res)
 	if (status != -EIO)
 		nfs_increment_open_seqid(status, res->seqid);
 	if (!status)
-		status = decode_stateid(xdr, &res->stateid);
+		status = decode_open_stateid(xdr, &res->stateid);
 	return status;
 }
 
@@ -4937,7 +4955,7 @@ static int decode_lock(struct xdr_stream *xdr, struct nfs_lock_res *res)
 	if (status == -EIO)
 		goto out;
 	if (status == 0) {
-		status = decode_stateid(xdr, &res->stateid);
+		status = decode_lock_stateid(xdr, &res->stateid);
 		if (unlikely(status))
 			goto out;
 	} else if (status == -NFS4ERR_DENIED)
@@ -4966,7 +4984,7 @@ static int decode_locku(struct xdr_stream *xdr, struct nfs_locku_res *res)
 	if (status != -EIO)
 		nfs_increment_lock_seqid(status, res->seqid);
 	if (status == 0)
-		status = decode_stateid(xdr, &res->stateid);
+		status = decode_lock_stateid(xdr, &res->stateid);
 	return status;
 }
 
@@ -5016,7 +5034,7 @@ static int decode_rw_delegation(struct xdr_stream *xdr,
 	__be32 *p;
 	int status;
 
-	status = decode_stateid(xdr, &res->delegation);
+	status = decode_delegation_stateid(xdr, &res->delegation);
 	if (unlikely(status))
 		return status;
 	p = xdr_inline_decode(xdr, 4);
@@ -5096,7 +5114,7 @@ static int decode_open(struct xdr_stream *xdr, struct nfs_openres *res)
 	nfs_increment_open_seqid(status, res->seqid);
 	if (status)
 		return status;
-	status = decode_stateid(xdr, &res->stateid);
+	status = decode_open_stateid(xdr, &res->stateid);
 	if (unlikely(status))
 		return status;
 
@@ -5136,7 +5154,7 @@ static int decode_open_confirm(struct xdr_stream *xdr, struct nfs_open_confirmre
 	if (status != -EIO)
 		nfs_increment_open_seqid(status, res->seqid);
 	if (!status)
-		status = decode_stateid(xdr, &res->stateid);
+		status = decode_open_stateid(xdr, &res->stateid);
 	return status;
 }
 
@@ -5148,7 +5166,7 @@ static int decode_open_downgrade(struct xdr_stream *xdr, struct nfs_closeres *re
 	if (status != -EIO)
 		nfs_increment_open_seqid(status, res->seqid);
 	if (!status)
-		status = decode_stateid(xdr, &res->stateid);
+		status = decode_open_stateid(xdr, &res->stateid);
 	return status;
 }
 
@@ -5838,6 +5856,12 @@ out_overflow:
 }
 
 #if defined(CONFIG_NFS_V4_1)
+static int decode_layout_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
+{
+	stateid->type = NFS4_LAYOUT_STATEID_TYPE;
+	return decode_stateid(xdr, stateid);
+}
+
 static int decode_getdeviceinfo(struct xdr_stream *xdr,
 				struct nfs4_getdeviceinfo_res *res)
 {
@@ -5919,7 +5943,7 @@ static int decode_layoutget(struct xdr_stream *xdr, struct rpc_rqst *req,
 	if (unlikely(!p))
 		goto out_overflow;
 	res->return_on_close = be32_to_cpup(p);
-	decode_stateid(xdr, &res->stateid);
+	decode_layout_stateid(xdr, &res->stateid);
 	p = xdr_inline_decode(xdr, 4);
 	if (unlikely(!p))
 		goto out_overflow;
@@ -5985,7 +6009,7 @@ static int decode_layoutreturn(struct xdr_stream *xdr,
 		goto out_overflow;
 	res->lrs_present = be32_to_cpup(p);
 	if (res->lrs_present)
-		status = decode_stateid(xdr, &res->stateid);
+		status = decode_layout_stateid(xdr, &res->stateid);
 	return status;
 out_overflow:
 	print_overflow_msg(__func__, xdr);
@@ -7515,6 +7539,7 @@ struct rpc_procinfo	nfs4_procedures[] = {
 	PROC(DEALLOCATE,	enc_deallocate,		dec_deallocate),
 	PROC(LAYOUTSTATS,	enc_layoutstats,	dec_layoutstats),
 	PROC(CLONE,		enc_clone,		dec_clone),
+	PROC(COPY,		enc_copy,		dec_copy),
 #endif /* CONFIG_NFS_V4_2 */
 };
 
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 1f6db4231057..174dd4cf5747 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -341,8 +341,10 @@ nfs_create_request(struct nfs_open_context *ctx, struct page *page,
 	 * long write-back delay. This will be adjusted in
 	 * update_nfs_request below if the region is not locked. */
 	req->wb_page    = page;
-	req->wb_index	= page_file_index(page);
-	get_page(page);
+	if (page) {
+		req->wb_index = page_file_index(page);
+		get_page(page);
+	}
 	req->wb_offset  = offset;
 	req->wb_pgbase	= offset;
 	req->wb_bytes   = count;
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 89a5ef4df08a..0c7e0d45a4de 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -270,7 +270,7 @@ pnfs_mark_layout_stateid_invalid(struct pnfs_layout_hdr *lo,
 	};
 
 	set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
-	return pnfs_mark_matching_lsegs_invalid(lo, lseg_list, &range);
+	return pnfs_mark_matching_lsegs_invalid(lo, lseg_list, &range, 0);
 }
 
 static int
@@ -308,7 +308,7 @@ pnfs_layout_io_set_failed(struct pnfs_layout_hdr *lo, u32 iomode)
 
 	spin_lock(&inode->i_lock);
 	pnfs_layout_set_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode));
-	pnfs_mark_matching_lsegs_invalid(lo, &head, &range);
+	pnfs_mark_matching_lsegs_invalid(lo, &head, &range, 0);
 	spin_unlock(&inode->i_lock);
 	pnfs_free_lseg_list(&head);
 	dprintk("%s Setting layout IOMODE_%s fail bit\n", __func__,
@@ -522,13 +522,35 @@ static int mark_lseg_invalid(struct pnfs_layout_segment *lseg,
 	return rv;
 }
 
-/* Returns count of number of matching invalid lsegs remaining in list
- * after call.
+/*
+ * Compare 2 layout stateid sequence ids, to see which is newer,
+ * taking into account wraparound issues.
+ */
+static bool pnfs_seqid_is_newer(u32 s1, u32 s2)
+{
+	return (s32)(s1 - s2) > 0;
+}
+
+/**
+ * pnfs_mark_matching_lsegs_invalid - tear down lsegs or mark them for later
+ * @lo: layout header containing the lsegs
+ * @tmp_list: list head where doomed lsegs should go
+ * @recall_range: optional recall range argument to match (may be NULL)
+ * @seq: only invalidate lsegs obtained prior to this sequence (may be 0)
+ *
+ * Walk the list of lsegs in the layout header, and tear down any that should
+ * be destroyed. If "recall_range" is specified then the segment must match
+ * that range. If "seq" is non-zero, then only match segments that were handed
+ * out at or before that sequence.
+ *
+ * Returns number of matching invalid lsegs remaining in list after scanning
+ * it and purging them.
  */
 int
 pnfs_mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo,
 			    struct list_head *tmp_list,
-			    const struct pnfs_layout_range *recall_range)
+			    const struct pnfs_layout_range *recall_range,
+			    u32 seq)
 {
 	struct pnfs_layout_segment *lseg, *next;
 	int remaining = 0;
@@ -540,10 +562,12 @@ pnfs_mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo,
 	list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list)
 		if (!recall_range ||
 		    should_free_lseg(&lseg->pls_range, recall_range)) {
-			dprintk("%s: freeing lseg %p iomode %d "
+			if (seq && pnfs_seqid_is_newer(lseg->pls_seq, seq))
+				continue;
+			dprintk("%s: freeing lseg %p iomode %d seq %u"
 				"offset %llu length %llu\n", __func__,
-				lseg, lseg->pls_range.iomode, lseg->pls_range.offset,
-				lseg->pls_range.length);
+				lseg, lseg->pls_range.iomode, lseg->pls_seq,
+				lseg->pls_range.offset, lseg->pls_range.length);
 			if (!mark_lseg_invalid(lseg, tmp_list))
 				remaining++;
 		}
@@ -730,15 +754,6 @@ pnfs_destroy_all_layouts(struct nfs_client *clp)
 	pnfs_destroy_layouts_byclid(clp, false);
 }
 
-/*
- * Compare 2 layout stateid sequence ids, to see which is newer,
- * taking into account wraparound issues.
- */
-static bool pnfs_seqid_is_newer(u32 s1, u32 s2)
-{
-	return (s32)(s1 - s2) > 0;
-}
-
 /* update lo->plh_stateid with new if is more recent */
 void
 pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
@@ -781,50 +796,22 @@ pnfs_layoutgets_blocked(const struct pnfs_layout_hdr *lo)
 		test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
 }
 
-int
-pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
-			      const struct pnfs_layout_range *range,
-			      struct nfs4_state *open_state)
-{
-	int status = 0;
-
-	dprintk("--> %s\n", __func__);
-	spin_lock(&lo->plh_inode->i_lock);
-	if (pnfs_layoutgets_blocked(lo)) {
-		status = -EAGAIN;
-	} else if (!nfs4_valid_open_stateid(open_state)) {
-		status = -EBADF;
-	} else if (list_empty(&lo->plh_segs) ||
-		   test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags)) {
-		int seq;
-
-		do {
-			seq = read_seqbegin(&open_state->seqlock);
-			nfs4_stateid_copy(dst, &open_state->stateid);
-		} while (read_seqretry(&open_state->seqlock, seq));
-	} else
-		nfs4_stateid_copy(dst, &lo->plh_stateid);
-	spin_unlock(&lo->plh_inode->i_lock);
-	dprintk("<-- %s\n", __func__);
-	return status;
-}
-
 /*
-* Get layout from server.
-*    for now, assume that whole file layouts are requested.
-*    arg->offset: 0
-*    arg->length: all ones
-*/
+ * Get layout from server.
+ *    for now, assume that whole file layouts are requested.
+ *    arg->offset: 0
+ *    arg->length: all ones
+ */
 static struct pnfs_layout_segment *
 send_layoutget(struct pnfs_layout_hdr *lo,
 	   struct nfs_open_context *ctx,
+	   nfs4_stateid *stateid,
 	   const struct pnfs_layout_range *range,
-	   gfp_t gfp_flags)
+	   long *timeout, gfp_t gfp_flags)
 {
 	struct inode *ino = lo->plh_inode;
 	struct nfs_server *server = NFS_SERVER(ino);
 	struct nfs4_layoutget *lgp;
-	struct pnfs_layout_segment *lseg;
 	loff_t i_size;
 
 	dprintk("--> %s\n", __func__);
@@ -834,40 +821,31 @@ send_layoutget(struct pnfs_layout_hdr *lo,
 	 * store in lseg. If we race with a concurrent seqid morphing
 	 * op, then re-send the LAYOUTGET.
 	 */
-	do {
-		lgp = kzalloc(sizeof(*lgp), gfp_flags);
-		if (lgp == NULL)
-			return NULL;
-
-		i_size = i_size_read(ino);
-
-		lgp->args.minlength = PAGE_SIZE;
-		if (lgp->args.minlength > range->length)
-			lgp->args.minlength = range->length;
-		if (range->iomode == IOMODE_READ) {
-			if (range->offset >= i_size)
-				lgp->args.minlength = 0;
-			else if (i_size - range->offset < lgp->args.minlength)
-				lgp->args.minlength = i_size - range->offset;
-		}
-		lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE;
-		pnfs_copy_range(&lgp->args.range, range);
-		lgp->args.type = server->pnfs_curr_ld->id;
-		lgp->args.inode = ino;
-		lgp->args.ctx = get_nfs_open_context(ctx);
-		lgp->gfp_flags = gfp_flags;
-		lgp->cred = lo->plh_lc_cred;
-
-		lseg = nfs4_proc_layoutget(lgp, gfp_flags);
-	} while (lseg == ERR_PTR(-EAGAIN));
-
-	if (IS_ERR(lseg) && !nfs_error_is_fatal(PTR_ERR(lseg)))
-		lseg = NULL;
-	else
-		pnfs_layout_clear_fail_bit(lo,
-				pnfs_iomode_to_fail_bit(range->iomode));
+	lgp = kzalloc(sizeof(*lgp), gfp_flags);
+	if (lgp == NULL)
+		return ERR_PTR(-ENOMEM);
 
-	return lseg;
+	i_size = i_size_read(ino);
+
+	lgp->args.minlength = PAGE_SIZE;
+	if (lgp->args.minlength > range->length)
+		lgp->args.minlength = range->length;
+	if (range->iomode == IOMODE_READ) {
+		if (range->offset >= i_size)
+			lgp->args.minlength = 0;
+		else if (i_size - range->offset < lgp->args.minlength)
+			lgp->args.minlength = i_size - range->offset;
+	}
+	lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE;
+	pnfs_copy_range(&lgp->args.range, range);
+	lgp->args.type = server->pnfs_curr_ld->id;
+	lgp->args.inode = ino;
+	lgp->args.ctx = get_nfs_open_context(ctx);
+	nfs4_stateid_copy(&lgp->args.stateid, stateid);
+	lgp->gfp_flags = gfp_flags;
+	lgp->cred = lo->plh_lc_cred;
+
+	return nfs4_proc_layoutget(lgp, timeout, gfp_flags);
 }
 
 static void pnfs_clear_layoutcommit(struct inode *inode,
@@ -899,6 +877,7 @@ pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo)
 	if (test_and_set_bit(NFS_LAYOUT_RETURN, &lo->plh_flags))
 		return false;
 	lo->plh_return_iomode = 0;
+	lo->plh_return_seq = 0;
 	pnfs_get_layout_hdr(lo);
 	clear_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
 	return true;
@@ -969,6 +948,7 @@ static void pnfs_layoutreturn_before_put_layout_hdr(struct pnfs_layout_hdr *lo)
 		bool send;
 
 		nfs4_stateid_copy(&stateid, &lo->plh_stateid);
+		stateid.seqid = cpu_to_be32(lo->plh_return_seq);
 		iomode = lo->plh_return_iomode;
 		send = pnfs_prepare_layoutreturn(lo);
 		spin_unlock(&inode->i_lock);
@@ -1012,7 +992,7 @@ _pnfs_return_layout(struct inode *ino)
 	pnfs_get_layout_hdr(lo);
 	empty = list_empty(&lo->plh_segs);
 	pnfs_clear_layoutcommit(ino, &tmp_list);
-	pnfs_mark_matching_lsegs_invalid(lo, &tmp_list, NULL);
+	pnfs_mark_matching_lsegs_invalid(lo, &tmp_list, NULL, 0);
 
 	if (NFS_SERVER(ino)->pnfs_curr_ld->return_range) {
 		struct pnfs_layout_range range = {
@@ -1341,23 +1321,28 @@ out_existing:
 
 /*
  * iomode matching rules:
- * iomode	lseg	match
- * -----	-----	-----
- * ANY		READ	true
- * ANY		RW	true
- * RW		READ	false
- * RW		RW	true
- * READ		READ	true
- * READ		RW	true
+ * iomode	lseg	strict match
+ *                      iomode
+ * -----	-----	------ -----
+ * ANY		READ	N/A    true
+ * ANY		RW	N/A    true
+ * RW		READ	N/A    false
+ * RW		RW	N/A    true
+ * READ		READ	N/A    true
+ * READ		RW	true   false
+ * READ		RW	false  true
  */
 static bool
 pnfs_lseg_range_match(const struct pnfs_layout_range *ls_range,
-		 const struct pnfs_layout_range *range)
+		 const struct pnfs_layout_range *range,
+		 bool strict_iomode)
 {
 	struct pnfs_layout_range range1;
 
 	if ((range->iomode == IOMODE_RW &&
 	     ls_range->iomode != IOMODE_RW) ||
+	    (range->iomode != ls_range->iomode &&
+	     strict_iomode == true) ||
 	    !pnfs_lseg_range_intersecting(ls_range, range))
 		return 0;
 
@@ -1372,7 +1357,8 @@ pnfs_lseg_range_match(const struct pnfs_layout_range *ls_range,
  */
 static struct pnfs_layout_segment *
 pnfs_find_lseg(struct pnfs_layout_hdr *lo,
-		struct pnfs_layout_range *range)
+		struct pnfs_layout_range *range,
+		bool strict_iomode)
 {
 	struct pnfs_layout_segment *lseg, *ret = NULL;
 
@@ -1381,7 +1367,8 @@ pnfs_find_lseg(struct pnfs_layout_hdr *lo,
 	list_for_each_entry(lseg, &lo->plh_segs, pls_list) {
 		if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags) &&
 		    !test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags) &&
-		    pnfs_lseg_range_match(&lseg->pls_range, range)) {
+		    pnfs_lseg_range_match(&lseg->pls_range, range,
+					  strict_iomode)) {
 			ret = pnfs_get_lseg(lseg);
 			break;
 		}
@@ -1498,6 +1485,7 @@ pnfs_update_layout(struct inode *ino,
 		   loff_t pos,
 		   u64 count,
 		   enum pnfs_iomode iomode,
+		   bool strict_iomode,
 		   gfp_t gfp_flags)
 {
 	struct pnfs_layout_range arg = {
@@ -1505,27 +1493,30 @@ pnfs_update_layout(struct inode *ino,
 		.offset = pos,
 		.length = count,
 	};
-	unsigned pg_offset;
+	unsigned pg_offset, seq;
 	struct nfs_server *server = NFS_SERVER(ino);
 	struct nfs_client *clp = server->nfs_client;
-	struct pnfs_layout_hdr *lo;
+	struct pnfs_layout_hdr *lo = NULL;
 	struct pnfs_layout_segment *lseg = NULL;
+	nfs4_stateid stateid;
+	long timeout = 0;
+	unsigned long giveup = jiffies + rpc_get_timeout(server->client);
 	bool first;
 
 	if (!pnfs_enabled_sb(NFS_SERVER(ino))) {
-		trace_pnfs_update_layout(ino, pos, count, iomode, NULL,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_NO_PNFS);
 		goto out;
 	}
 
 	if (iomode == IOMODE_READ && i_size_read(ino) == 0) {
-		trace_pnfs_update_layout(ino, pos, count, iomode, NULL,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_RD_ZEROLEN);
 		goto out;
 	}
 
 	if (pnfs_within_mdsthreshold(ctx, ino, iomode)) {
-		trace_pnfs_update_layout(ino, pos, count, iomode, NULL,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_MDSTHRESH);
 		goto out;
 	}
@@ -1536,14 +1527,14 @@ lookup_again:
 	lo = pnfs_find_alloc_layout(ino, ctx, gfp_flags);
 	if (lo == NULL) {
 		spin_unlock(&ino->i_lock);
-		trace_pnfs_update_layout(ino, pos, count, iomode, NULL,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_NOMEM);
 		goto out;
 	}
 
 	/* Do we even need to bother with this? */
 	if (test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags)) {
-		trace_pnfs_update_layout(ino, pos, count, iomode, lo,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_BULK_RECALL);
 		dprintk("%s matches recall, use MDS\n", __func__);
 		goto out_unlock;
@@ -1551,14 +1542,34 @@ lookup_again:
 
 	/* if LAYOUTGET already failed once we don't try again */
 	if (pnfs_layout_io_test_failed(lo, iomode)) {
-		trace_pnfs_update_layout(ino, pos, count, iomode, lo,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_IO_TEST_FAIL);
 		goto out_unlock;
 	}
 
-	first = list_empty(&lo->plh_segs);
-	if (first) {
-		/* The first layoutget for the file. Need to serialize per
+	lseg = pnfs_find_lseg(lo, &arg, strict_iomode);
+	if (lseg) {
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
+				PNFS_UPDATE_LAYOUT_FOUND_CACHED);
+		goto out_unlock;
+	}
+
+	if (!nfs4_valid_open_stateid(ctx->state)) {
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
+				PNFS_UPDATE_LAYOUT_INVALID_OPEN);
+		goto out_unlock;
+	}
+
+	/*
+	 * Choose a stateid for the LAYOUTGET. If we don't have a layout
+	 * stateid, or it has been invalidated, then we must use the open
+	 * stateid.
+	 */
+	if (lo->plh_stateid.seqid == 0 ||
+	    test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags)) {
+
+		/*
+		 * The first layoutget for the file. Need to serialize per
 		 * RFC 5661 Errata 3208.
 		 */
 		if (test_and_set_bit(NFS_LAYOUT_FIRST_LAYOUTGET,
@@ -1567,18 +1578,17 @@ lookup_again:
 			wait_on_bit(&lo->plh_flags, NFS_LAYOUT_FIRST_LAYOUTGET,
 				    TASK_UNINTERRUPTIBLE);
 			pnfs_put_layout_hdr(lo);
+			dprintk("%s retrying\n", __func__);
 			goto lookup_again;
 		}
+
+		first = true;
+		do {
+			seq = read_seqbegin(&ctx->state->seqlock);
+			nfs4_stateid_copy(&stateid, &ctx->state->stateid);
+		} while (read_seqretry(&ctx->state->seqlock, seq));
 	} else {
-		/* Check to see if the layout for the given range
-		 * already exists
-		 */
-		lseg = pnfs_find_lseg(lo, &arg);
-		if (lseg) {
-			trace_pnfs_update_layout(ino, pos, count, iomode, lo,
-					PNFS_UPDATE_LAYOUT_FOUND_CACHED);
-			goto out_unlock;
-		}
+		nfs4_stateid_copy(&stateid, &lo->plh_stateid);
 	}
 
 	/*
@@ -1593,15 +1603,17 @@ lookup_again:
 				pnfs_clear_first_layoutget(lo);
 			pnfs_put_layout_hdr(lo);
 			dprintk("%s retrying\n", __func__);
+			trace_pnfs_update_layout(ino, pos, count, iomode, lo,
+					lseg, PNFS_UPDATE_LAYOUT_RETRY);
 			goto lookup_again;
 		}
-		trace_pnfs_update_layout(ino, pos, count, iomode, lo,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				PNFS_UPDATE_LAYOUT_RETURN);
 		goto out_put_layout_hdr;
 	}
 
 	if (pnfs_layoutgets_blocked(lo)) {
-		trace_pnfs_update_layout(ino, pos, count, iomode, lo,
+		trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				PNFS_UPDATE_LAYOUT_BLOCKED);
 		goto out_unlock;
 	}
@@ -1626,10 +1638,36 @@ lookup_again:
 	if (arg.length != NFS4_MAX_UINT64)
 		arg.length = PAGE_ALIGN(arg.length);
 
-	lseg = send_layoutget(lo, ctx, &arg, gfp_flags);
-	atomic_dec(&lo->plh_outstanding);
-	trace_pnfs_update_layout(ino, pos, count, iomode, lo,
+	lseg = send_layoutget(lo, ctx, &stateid, &arg, &timeout, gfp_flags);
+	trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg,
 				 PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET);
+	if (IS_ERR(lseg)) {
+		switch(PTR_ERR(lseg)) {
+		case -ERECALLCONFLICT:
+			if (time_after(jiffies, giveup))
+				lseg = NULL;
+			/* Fallthrough */
+		case -EAGAIN:
+			pnfs_put_layout_hdr(lo);
+			if (first)
+				pnfs_clear_first_layoutget(lo);
+			if (lseg) {
+				trace_pnfs_update_layout(ino, pos, count,
+					iomode, lo, lseg, PNFS_UPDATE_LAYOUT_RETRY);
+				goto lookup_again;
+			}
+			/* Fallthrough */
+		default:
+			if (!nfs_error_is_fatal(PTR_ERR(lseg))) {
+				pnfs_layout_clear_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode));
+				lseg = NULL;
+			}
+		}
+	} else {
+		pnfs_layout_clear_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode));
+	}
+
+	atomic_dec(&lo->plh_outstanding);
 out_put_layout_hdr:
 	if (first)
 		pnfs_clear_first_layoutget(lo);
@@ -1678,38 +1716,36 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 	struct pnfs_layout_segment *lseg;
 	struct inode *ino = lo->plh_inode;
 	LIST_HEAD(free_me);
-	int status = -EINVAL;
 
 	if (!pnfs_sanity_check_layout_range(&res->range))
-		goto out;
+		return ERR_PTR(-EINVAL);
 
 	/* Inject layout blob into I/O device driver */
 	lseg = NFS_SERVER(ino)->pnfs_curr_ld->alloc_lseg(lo, res, lgp->gfp_flags);
-	if (!lseg || IS_ERR(lseg)) {
+	if (IS_ERR_OR_NULL(lseg)) {
 		if (!lseg)
-			status = -ENOMEM;
-		else
-			status = PTR_ERR(lseg);
-		dprintk("%s: Could not allocate layout: error %d\n",
-		       __func__, status);
-		goto out;
+			lseg = ERR_PTR(-ENOMEM);
+
+		dprintk("%s: Could not allocate layout: error %ld\n",
+		       __func__, PTR_ERR(lseg));
+		return lseg;
 	}
 
 	init_lseg(lo, lseg);
 	lseg->pls_range = res->range;
+	lseg->pls_seq = be32_to_cpu(res->stateid.seqid);
 
 	spin_lock(&ino->i_lock);
 	if (pnfs_layoutgets_blocked(lo)) {
 		dprintk("%s forget reply due to state\n", __func__);
-		goto out_forget_reply;
+		goto out_forget;
 	}
 
 	if (nfs4_stateid_match_other(&lo->plh_stateid, &res->stateid)) {
 		/* existing state ID, make sure the sequence number matches. */
 		if (pnfs_layout_stateid_blocked(lo, &res->stateid)) {
 			dprintk("%s forget reply due to sequence\n", __func__);
-			status = -EAGAIN;
-			goto out_forget_reply;
+			goto out_forget;
 		}
 		pnfs_set_layout_stateid(lo, &res->stateid, false);
 	} else {
@@ -1718,7 +1754,7 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 		 * inode invalid, and don't bother validating the stateid
 		 * sequence number.
 		 */
-		pnfs_mark_matching_lsegs_invalid(lo, &free_me, NULL);
+		pnfs_mark_matching_lsegs_invalid(lo, &free_me, NULL, 0);
 
 		nfs4_stateid_copy(&lo->plh_stateid, &res->stateid);
 		lo->plh_barrier = be32_to_cpu(res->stateid.seqid);
@@ -1735,18 +1771,17 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 	spin_unlock(&ino->i_lock);
 	pnfs_free_lseg_list(&free_me);
 	return lseg;
-out:
-	return ERR_PTR(status);
 
-out_forget_reply:
+out_forget:
 	spin_unlock(&ino->i_lock);
 	lseg->pls_layout = lo;
 	NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
-	goto out;
+	return ERR_PTR(-EAGAIN);
 }
 
 static void
-pnfs_set_plh_return_iomode(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode)
+pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode,
+			 u32 seq)
 {
 	if (lo->plh_return_iomode == iomode)
 		return;
@@ -1754,6 +1789,8 @@ pnfs_set_plh_return_iomode(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode)
 		iomode = IOMODE_ANY;
 	lo->plh_return_iomode = iomode;
 	set_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
+	if (!lo->plh_return_seq || pnfs_seqid_is_newer(seq, lo->plh_return_seq))
+		lo->plh_return_seq = seq;
 }
 
 /**
@@ -1769,7 +1806,8 @@ pnfs_set_plh_return_iomode(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode)
 int
 pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
 				struct list_head *tmp_list,
-				const struct pnfs_layout_range *return_range)
+				const struct pnfs_layout_range *return_range,
+				u32 seq)
 {
 	struct pnfs_layout_segment *lseg, *next;
 	int remaining = 0;
@@ -1792,8 +1830,11 @@ pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
 				continue;
 			remaining++;
 			set_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags);
-			pnfs_set_plh_return_iomode(lo, return_range->iomode);
 		}
+
+	if (remaining)
+		pnfs_set_plh_return_info(lo, return_range->iomode, seq);
+
 	return remaining;
 }
 
@@ -1810,13 +1851,14 @@ void pnfs_error_mark_layout_for_return(struct inode *inode,
 	bool return_now = false;
 
 	spin_lock(&inode->i_lock);
-	pnfs_set_plh_return_iomode(lo, range.iomode);
+	pnfs_set_plh_return_info(lo, range.iomode, lseg->pls_seq);
 	/*
 	 * mark all matching lsegs so that we are sure to have no live
 	 * segments at hand when sending layoutreturn. See pnfs_put_lseg()
 	 * for how it works.
 	 */
-	if (!pnfs_mark_matching_lsegs_return(lo, &free_me, &range)) {
+	if (!pnfs_mark_matching_lsegs_return(lo, &free_me,
+						&range, lseg->pls_seq)) {
 		nfs4_stateid stateid;
 		enum pnfs_iomode iomode = lo->plh_return_iomode;
 
@@ -1849,6 +1891,7 @@ pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *r
 						   req_offset(req),
 						   rd_size,
 						   IOMODE_READ,
+						   false,
 						   GFP_KERNEL);
 		if (IS_ERR(pgio->pg_lseg)) {
 			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
@@ -1873,6 +1916,7 @@ pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio,
 						   req_offset(req),
 						   wb_size,
 						   IOMODE_RW,
+						   false,
 						   GFP_NOFS);
 		if (IS_ERR(pgio->pg_lseg)) {
 			pgio->pg_error = PTR_ERR(pgio->pg_lseg);
@@ -2143,12 +2187,15 @@ pnfs_try_to_read_data(struct nfs_pgio_header *hdr,
 }
 
 /* Resend all requests through pnfs. */
-int pnfs_read_resend_pnfs(struct nfs_pgio_header *hdr)
+void pnfs_read_resend_pnfs(struct nfs_pgio_header *hdr)
 {
 	struct nfs_pageio_descriptor pgio;
 
-	nfs_pageio_init_read(&pgio, hdr->inode, false, hdr->completion_ops);
-	return nfs_pageio_resend(&pgio, hdr);
+	if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
+		nfs_pageio_init_read(&pgio, hdr->inode, false,
+					hdr->completion_ops);
+		hdr->task.tk_status = nfs_pageio_resend(&pgio, hdr);
+	}
 }
 EXPORT_SYMBOL_GPL(pnfs_read_resend_pnfs);
 
@@ -2158,12 +2205,11 @@ pnfs_do_read(struct nfs_pageio_descriptor *desc, struct nfs_pgio_header *hdr)
 	const struct rpc_call_ops *call_ops = desc->pg_rpc_callops;
 	struct pnfs_layout_segment *lseg = desc->pg_lseg;
 	enum pnfs_try_status trypnfs;
-	int err = 0;
 
 	trypnfs = pnfs_try_to_read_data(hdr, call_ops, lseg);
 	if (trypnfs == PNFS_TRY_AGAIN)
-		err = pnfs_read_resend_pnfs(hdr);
-	if (trypnfs == PNFS_NOT_ATTEMPTED || err)
+		pnfs_read_resend_pnfs(hdr);
+	if (trypnfs == PNFS_NOT_ATTEMPTED || hdr->task.tk_status)
 		pnfs_read_through_mds(desc, hdr);
 }
 
@@ -2405,7 +2451,7 @@ pnfs_report_layoutstat(struct inode *inode, gfp_t gfp_flags)
 	spin_lock(&inode->i_lock);
 	if (!NFS_I(inode)->layout) {
 		spin_unlock(&inode->i_lock);
-		goto out;
+		goto out_clear_layoutstats;
 	}
 	hdr = NFS_I(inode)->layout;
 	pnfs_get_layout_hdr(hdr);
@@ -2434,6 +2480,7 @@ out_free:
 	kfree(data);
 out_put:
 	pnfs_put_layout_hdr(hdr);
+out_clear_layoutstats:
 	smp_mb__before_atomic();
 	clear_bit(NFS_INO_LAYOUTSTATS, &nfsi->flags);
 	smp_mb__after_atomic();
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 1ac1db5f6dad..b21bd0bee784 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -64,6 +64,7 @@ struct pnfs_layout_segment {
 	struct list_head pls_lc_list;
 	struct pnfs_layout_range pls_range;
 	atomic_t pls_refcount;
+	u32 pls_seq;
 	unsigned long pls_flags;
 	struct pnfs_layout_hdr *pls_layout;
 	struct work_struct pls_work;
@@ -194,6 +195,7 @@ struct pnfs_layout_hdr {
 	unsigned long		plh_flags;
 	nfs4_stateid		plh_stateid;
 	u32			plh_barrier; /* ignore lower seqids */
+	u32			plh_return_seq;
 	enum pnfs_iomode	plh_return_iomode;
 	loff_t			plh_lwb; /* last write byte for layoutcommit */
 	struct rpc_cred		*plh_lc_cred; /* layoutcommit cred */
@@ -226,7 +228,7 @@ extern void pnfs_unregister_layoutdriver(struct pnfs_layoutdriver_type *);
 extern int nfs4_proc_getdeviceinfo(struct nfs_server *server,
 				   struct pnfs_device *dev,
 				   struct rpc_cred *cred);
-extern struct pnfs_layout_segment* nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags);
+extern struct pnfs_layout_segment* nfs4_proc_layoutget(struct nfs4_layoutget *lgp, long *timeout, gfp_t gfp_flags);
 extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool sync);
 
 /* pnfs.c */
@@ -258,16 +260,14 @@ void pnfs_put_layout_hdr(struct pnfs_layout_hdr *lo);
 void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
 			     const nfs4_stateid *new,
 			     bool update_barrier);
-int pnfs_choose_layoutget_stateid(nfs4_stateid *dst,
-				  struct pnfs_layout_hdr *lo,
-				  const struct pnfs_layout_range *range,
-				  struct nfs4_state *open_state);
 int pnfs_mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo,
 				struct list_head *tmp_list,
-				const struct pnfs_layout_range *recall_range);
+				const struct pnfs_layout_range *recall_range,
+				u32 seq);
 int pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
 				struct list_head *tmp_list,
-				const struct pnfs_layout_range *recall_range);
+				const struct pnfs_layout_range *recall_range,
+				u32 seq);
 bool pnfs_roc(struct inode *ino);
 void pnfs_roc_release(struct inode *ino);
 void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
@@ -282,12 +282,13 @@ int _pnfs_return_layout(struct inode *);
 int pnfs_commit_and_return_layout(struct inode *);
 void pnfs_ld_write_done(struct nfs_pgio_header *);
 void pnfs_ld_read_done(struct nfs_pgio_header *);
-int pnfs_read_resend_pnfs(struct nfs_pgio_header *);
+void pnfs_read_resend_pnfs(struct nfs_pgio_header *);
 struct pnfs_layout_segment *pnfs_update_layout(struct inode *ino,
 					       struct nfs_open_context *ctx,
 					       loff_t pos,
 					       u64 count,
 					       enum pnfs_iomode iomode,
+					       bool strict_iomode,
 					       gfp_t gfp_flags);
 void pnfs_clear_layoutreturn_waitbit(struct pnfs_layout_hdr *lo);
 
diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c
index 4aaed890048f..0dfc476da3e1 100644
--- a/fs/nfs/pnfs_nfs.c
+++ b/fs/nfs/pnfs_nfs.c
@@ -61,7 +61,7 @@ EXPORT_SYMBOL_GPL(pnfs_generic_commit_release);
 
 /* The generic layer is about to remove the req from the commit list.
  * If this will make the bucket empty, it will need to put the lseg reference.
- * Note this must be called holding the inode (/cinfo) lock
+ * Note this must be called holding i_lock
  */
 void
 pnfs_generic_clear_request_commit(struct nfs_page *req,
@@ -98,7 +98,7 @@ pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst,
 		if (!nfs_lock_request(req))
 			continue;
 		kref_get(&req->wb_kref);
-		if (cond_resched_lock(cinfo->lock))
+		if (cond_resched_lock(&cinfo->inode->i_lock))
 			list_safe_reset_next(req, tmp, wb_list);
 		nfs_request_remove_commit_list(req, cinfo);
 		clear_bit(PG_COMMIT_TO_DS, &req->wb_flags);
@@ -119,7 +119,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
 	struct list_head *dst = &bucket->committing;
 	int ret;
 
-	lockdep_assert_held(cinfo->lock);
+	lockdep_assert_held(&cinfo->inode->i_lock);
 	ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max);
 	if (ret) {
 		cinfo->ds->nwritten -= ret;
@@ -142,7 +142,7 @@ int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo,
 {
 	int i, rv = 0, cnt;
 
-	lockdep_assert_held(cinfo->lock);
+	lockdep_assert_held(&cinfo->inode->i_lock);
 	for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) {
 		cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i],
 						       cinfo, max);
@@ -161,16 +161,16 @@ void pnfs_generic_recover_commit_reqs(struct list_head *dst,
 	struct pnfs_layout_segment *freeme;
 	int i;
 
-	lockdep_assert_held(cinfo->lock);
+	lockdep_assert_held(&cinfo->inode->i_lock);
 restart:
 	for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
 		if (pnfs_generic_transfer_commit_list(&b->written, dst,
 						      cinfo, 0)) {
 			freeme = b->wlseg;
 			b->wlseg = NULL;
-			spin_unlock(cinfo->lock);
+			spin_unlock(&cinfo->inode->i_lock);
 			pnfs_put_lseg(freeme);
-			spin_lock(cinfo->lock);
+			spin_lock(&cinfo->inode->i_lock);
 			goto restart;
 		}
 	}
@@ -186,7 +186,7 @@ static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
 	LIST_HEAD(pages);
 	int i;
 
-	spin_lock(cinfo->lock);
+	spin_lock(&cinfo->inode->i_lock);
 	for (i = idx; i < fl_cinfo->nbuckets; i++) {
 		bucket = &fl_cinfo->buckets[i];
 		if (list_empty(&bucket->committing))
@@ -194,12 +194,12 @@ static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
 		freeme = bucket->clseg;
 		bucket->clseg = NULL;
 		list_splice_init(&bucket->committing, &pages);
-		spin_unlock(cinfo->lock);
+		spin_unlock(&cinfo->inode->i_lock);
 		nfs_retry_commit(&pages, freeme, cinfo, i);
 		pnfs_put_lseg(freeme);
-		spin_lock(cinfo->lock);
+		spin_lock(&cinfo->inode->i_lock);
 	}
-	spin_unlock(cinfo->lock);
+	spin_unlock(&cinfo->inode->i_lock);
 }
 
 static unsigned int
@@ -238,14 +238,31 @@ void pnfs_fetch_commit_bucket_list(struct list_head *pages,
 	struct pnfs_commit_bucket *bucket;
 
 	bucket = &cinfo->ds->buckets[data->ds_commit_index];
-	spin_lock(cinfo->lock);
+	spin_lock(&cinfo->inode->i_lock);
 	list_splice_init(&bucket->committing, pages);
 	data->lseg = bucket->clseg;
 	bucket->clseg = NULL;
-	spin_unlock(cinfo->lock);
+	spin_unlock(&cinfo->inode->i_lock);
 
 }
 
+/* Helper function for pnfs_generic_commit_pagelist to catch an empty
+ * page list. This can happen when two commits race. */
+static bool
+pnfs_generic_commit_cancel_empty_pagelist(struct list_head *pages,
+					  struct nfs_commit_data *data,
+					  struct nfs_commit_info *cinfo)
+{
+	if (list_empty(pages)) {
+		if (atomic_dec_and_test(&cinfo->mds->rpcs_out))
+			wake_up_atomic_t(&cinfo->mds->rpcs_out);
+		nfs_commitdata_release(data);
+		return true;
+	}
+
+	return false;
+}
+
 /* This follows nfs_commit_list pretty closely */
 int
 pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
@@ -280,6 +297,11 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 	list_for_each_entry_safe(data, tmp, &list, pages) {
 		list_del_init(&data->pages);
 		if (data->ds_commit_index < 0) {
+			/* another commit raced with us */
+			if (pnfs_generic_commit_cancel_empty_pagelist(mds_pages,
+				data, cinfo))
+				continue;
+
 			nfs_init_commit(data, mds_pages, NULL, cinfo);
 			nfs_initiate_commit(NFS_CLIENT(inode), data,
 					    NFS_PROTO(data->inode),
@@ -288,6 +310,12 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 			LIST_HEAD(pages);
 
 			pnfs_fetch_commit_bucket_list(&pages, data, cinfo);
+
+			/* another commit raced with us */
+			if (pnfs_generic_commit_cancel_empty_pagelist(&pages,
+				data, cinfo))
+				continue;
+
 			nfs_init_commit(data, &pages, data->lseg, cinfo);
 			initiate_commit(data, how);
 		}
@@ -874,12 +902,12 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
 	struct list_head *list;
 	struct pnfs_commit_bucket *buckets;
 
-	spin_lock(cinfo->lock);
+	spin_lock(&cinfo->inode->i_lock);
 	buckets = cinfo->ds->buckets;
 	list = &buckets[ds_commit_idx].written;
 	if (list_empty(list)) {
 		if (!pnfs_is_valid_lseg(lseg)) {
-			spin_unlock(cinfo->lock);
+			spin_unlock(&cinfo->inode->i_lock);
 			cinfo->completion_ops->resched_write(cinfo, req);
 			return;
 		}
@@ -896,7 +924,7 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
 	cinfo->ds->nwritten++;
 
 	nfs_request_add_commit_list_locked(req, list, cinfo);
-	spin_unlock(cinfo->lock);
+	spin_unlock(&cinfo->inode->i_lock);
 	nfs_mark_page_unstable(req->wb_page, cinfo);
 }
 EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit);
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index f1268280244e..2137e0202f25 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -191,6 +191,7 @@ static const match_table_t nfs_mount_option_tokens = {
 
 enum {
 	Opt_xprt_udp, Opt_xprt_udp6, Opt_xprt_tcp, Opt_xprt_tcp6, Opt_xprt_rdma,
+	Opt_xprt_rdma6,
 
 	Opt_xprt_err
 };
@@ -201,6 +202,7 @@ static const match_table_t nfs_xprt_protocol_tokens = {
 	{ Opt_xprt_tcp, "tcp" },
 	{ Opt_xprt_tcp6, "tcp6" },
 	{ Opt_xprt_rdma, "rdma" },
+	{ Opt_xprt_rdma6, "rdma6" },
 
 	{ Opt_xprt_err, NULL }
 };
@@ -1456,6 +1458,8 @@ static int nfs_parse_mount_options(char *raw,
 				mnt->flags |= NFS_MOUNT_TCP;
 				mnt->nfs_server.protocol = XPRT_TRANSPORT_TCP;
 				break;
+			case Opt_xprt_rdma6:
+				protofamily = AF_INET6;
 			case Opt_xprt_rdma:
 				/* vector side protocols to TCP */
 				mnt->flags |= NFS_MOUNT_TCP;
@@ -2408,6 +2412,11 @@ static int nfs_compare_super_address(struct nfs_server *server1,
 				     struct nfs_server *server2)
 {
 	struct sockaddr *sap1, *sap2;
+	struct rpc_xprt *xprt1 = server1->client->cl_xprt;
+	struct rpc_xprt *xprt2 = server2->client->cl_xprt;
+
+	if (!net_eq(xprt1->xprt_net, xprt2->xprt_net))
+		return 0;
 
 	sap1 = (struct sockaddr *)&server1->nfs_client->cl_addr;
 	sap2 = (struct sockaddr *)&server2->nfs_client->cl_addr;
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 5f4fd53e5764..e1c74d3db64d 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -245,8 +245,7 @@ static void nfs_mark_uptodate(struct nfs_page *req)
 static int wb_priority(struct writeback_control *wbc)
 {
 	int ret = 0;
-	if (wbc->for_reclaim)
-		return FLUSH_HIGHPRI | FLUSH_COND_STABLE;
+
 	if (wbc->sync_mode == WB_SYNC_ALL)
 		ret = FLUSH_COND_STABLE;
 	return ret;
@@ -737,7 +736,7 @@ static void nfs_inode_remove_request(struct nfs_page *req)
 		head = req->wb_head;
 
 		spin_lock(&inode->i_lock);
-		if (likely(!PageSwapCache(head->wb_page))) {
+		if (likely(head->wb_page && !PageSwapCache(head->wb_page))) {
 			set_page_private(head->wb_page, 0);
 			ClearPagePrivate(head->wb_page);
 			smp_mb__after_atomic();
@@ -759,7 +758,8 @@ static void nfs_inode_remove_request(struct nfs_page *req)
 static void
 nfs_mark_request_dirty(struct nfs_page *req)
 {
-	__set_page_dirty_nobuffers(req->wb_page);
+	if (req->wb_page)
+		__set_page_dirty_nobuffers(req->wb_page);
 }
 
 /*
@@ -804,7 +804,7 @@ nfs_page_search_commits_for_head_request_locked(struct nfs_inode *nfsi,
  * number of outstanding requests requiring a commit as well as
  * the MM page stats.
  *
- * The caller must hold the cinfo->lock, and the nfs_page lock.
+ * The caller must hold cinfo->inode->i_lock, and the nfs_page lock.
  */
 void
 nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst,
@@ -832,10 +832,11 @@ EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked);
 void
 nfs_request_add_commit_list(struct nfs_page *req, struct nfs_commit_info *cinfo)
 {
-	spin_lock(cinfo->lock);
+	spin_lock(&cinfo->inode->i_lock);
 	nfs_request_add_commit_list_locked(req, &cinfo->mds->list, cinfo);
-	spin_unlock(cinfo->lock);
-	nfs_mark_page_unstable(req->wb_page, cinfo);
+	spin_unlock(&cinfo->inode->i_lock);
+	if (req->wb_page)
+		nfs_mark_page_unstable(req->wb_page, cinfo);
 }
 EXPORT_SYMBOL_GPL(nfs_request_add_commit_list);
 
@@ -864,7 +865,7 @@ EXPORT_SYMBOL_GPL(nfs_request_remove_commit_list);
 static void nfs_init_cinfo_from_inode(struct nfs_commit_info *cinfo,
 				      struct inode *inode)
 {
-	cinfo->lock = &inode->i_lock;
+	cinfo->inode = inode;
 	cinfo->mds = &NFS_I(inode)->commit_info;
 	cinfo->ds = pnfs_get_ds_info(inode);
 	cinfo->dreq = NULL;
@@ -967,7 +968,7 @@ nfs_reqs_to_commit(struct nfs_commit_info *cinfo)
 	return cinfo->mds->ncommit;
 }
 
-/* cinfo->lock held by caller */
+/* cinfo->inode->i_lock held by caller */
 int
 nfs_scan_commit_list(struct list_head *src, struct list_head *dst,
 		     struct nfs_commit_info *cinfo, int max)
@@ -979,7 +980,7 @@ nfs_scan_commit_list(struct list_head *src, struct list_head *dst,
 		if (!nfs_lock_request(req))
 			continue;
 		kref_get(&req->wb_kref);
-		if (cond_resched_lock(cinfo->lock))
+		if (cond_resched_lock(&cinfo->inode->i_lock))
 			list_safe_reset_next(req, tmp, wb_list);
 		nfs_request_remove_commit_list(req, cinfo);
 		nfs_list_add_request(req, dst);
@@ -1005,7 +1006,7 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst,
 {
 	int ret = 0;
 
-	spin_lock(cinfo->lock);
+	spin_lock(&cinfo->inode->i_lock);
 	if (cinfo->mds->ncommit > 0) {
 		const int max = INT_MAX;
 
@@ -1013,7 +1014,7 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst,
 					   cinfo, max);
 		ret += pnfs_scan_commit_lists(inode, cinfo, max - ret);
 	}
-	spin_unlock(cinfo->lock);
+	spin_unlock(&cinfo->inode->i_lock);
 	return ret;
 }
 
@@ -1709,6 +1710,10 @@ nfs_commit_list(struct inode *inode, struct list_head *head, int how,
 {
 	struct nfs_commit_data	*data;
 
+	/* another commit raced with us */
+	if (list_empty(head))
+		return 0;
+
 	data = nfs_commitdata_alloc();
 
 	if (!data)
@@ -1724,6 +1729,36 @@ nfs_commit_list(struct inode *inode, struct list_head *head, int how,
 	return -ENOMEM;
 }
 
+int nfs_commit_file(struct file *file, struct nfs_write_verifier *verf)
+{
+	struct inode *inode = file_inode(file);
+	struct nfs_open_context *open;
+	struct nfs_commit_info cinfo;
+	struct nfs_page *req;
+	int ret;
+
+	open = get_nfs_open_context(nfs_file_open_context(file));
+	req  = nfs_create_request(open, NULL, NULL, 0, i_size_read(inode));
+	if (IS_ERR(req)) {
+		ret = PTR_ERR(req);
+		goto out_put;
+	}
+
+	nfs_init_cinfo_from_inode(&cinfo, inode);
+
+	memcpy(&req->wb_verf, verf, sizeof(struct nfs_write_verifier));
+	nfs_request_add_commit_list(req, &cinfo);
+	ret = nfs_commit_inode(inode, FLUSH_SYNC);
+	if (ret > 0)
+		ret = 0;
+
+	nfs_free_request(req);
+out_put:
+	put_nfs_open_context(open);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(nfs_commit_file);
+
 /*
  * COMMIT call returned
  */
@@ -1748,7 +1783,8 @@ static void nfs_commit_release_pages(struct nfs_commit_data *data)
 	while (!list_empty(&data->pages)) {
 		req = nfs_list_entry(data->pages.next);
 		nfs_list_remove_request(req);
-		nfs_clear_page_commit(req->wb_page);
+		if (req->wb_page)
+			nfs_clear_page_commit(req->wb_page);
 
 		dprintk("NFS:       commit (%s/%llu %d@%lld)",
 			req->wb_context->dentry->d_sb->s_id,
diff --git a/include/linux/errno.h b/include/linux/errno.h
index 89627b9187f9..7ce9fb1b7d28 100644
--- a/include/linux/errno.h
+++ b/include/linux/errno.h
@@ -28,5 +28,6 @@
 #define EBADTYPE	527	/* Type not supported by server */
 #define EJUKEBOX	528	/* Request initiated, but will not complete before timeout */
 #define EIOCBQUEUED	529	/* iocb queued, will get completion event */
+#define ERECALLCONFLICT	530	/* conflict with recalled state */
 
 #endif
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 011433478a14..bfed6b367350 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -50,12 +50,27 @@ struct nfs4_label {
 
 typedef struct { char data[NFS4_VERIFIER_SIZE]; } nfs4_verifier;
 
-struct nfs_stateid4 {
-	__be32 seqid;
-	char other[NFS4_STATEID_OTHER_SIZE];
-} __attribute__ ((packed));
+struct nfs4_stateid_struct {
+	union {
+		char data[NFS4_STATEID_SIZE];
+		struct {
+			__be32 seqid;
+			char other[NFS4_STATEID_OTHER_SIZE];
+		} __attribute__ ((packed));
+	};
+
+	enum {
+		NFS4_INVALID_STATEID_TYPE = 0,
+		NFS4_SPECIAL_STATEID_TYPE,
+		NFS4_OPEN_STATEID_TYPE,
+		NFS4_LOCK_STATEID_TYPE,
+		NFS4_DELEGATION_STATEID_TYPE,
+		NFS4_LAYOUT_STATEID_TYPE,
+		NFS4_PNFS_DS_STATEID_TYPE,
+	} type;
+};
 
-typedef struct nfs_stateid4 nfs4_stateid;
+typedef struct nfs4_stateid_struct nfs4_stateid;
 
 enum nfs_opnum4 {
 	OP_ACCESS = 3,
@@ -504,6 +519,7 @@ enum {
 	NFSPROC4_CLNT_DEALLOCATE,
 	NFSPROC4_CLNT_LAYOUTSTATS,
 	NFSPROC4_CLNT_CLONE,
+	NFSPROC4_CLNT_COPY,
 };
 
 /* nfs41 types */
@@ -621,7 +637,9 @@ enum pnfs_update_layout_reason {
 	PNFS_UPDATE_LAYOUT_IO_TEST_FAIL,
 	PNFS_UPDATE_LAYOUT_FOUND_CACHED,
 	PNFS_UPDATE_LAYOUT_RETURN,
+	PNFS_UPDATE_LAYOUT_RETRY,
 	PNFS_UPDATE_LAYOUT_BLOCKED,
+	PNFS_UPDATE_LAYOUT_INVALID_OPEN,
 	PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET,
 };
 
diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
index 7fcc13c8cf1f..14a762d2734d 100644
--- a/include/linux/nfs_fs_sb.h
+++ b/include/linux/nfs_fs_sb.h
@@ -246,5 +246,6 @@ struct nfs_server {
 #define NFS_CAP_DEALLOCATE	(1U << 21)
 #define NFS_CAP_LAYOUTSTATS	(1U << 22)
 #define NFS_CAP_CLONE		(1U << 23)
+#define NFS_CAP_COPY		(1U << 24)
 
 #endif
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index ee8491dadbf3..c304a11b5b1a 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -233,7 +233,6 @@ struct nfs4_layoutget_args {
 	struct inode *inode;
 	struct nfs_open_context *ctx;
 	nfs4_stateid stateid;
-	unsigned long timestamp;
 	struct nfs4_layoutdriver_data layout;
 };
 
@@ -251,7 +250,6 @@ struct nfs4_layoutget {
 	struct nfs4_layoutget_res res;
 	struct rpc_cred *cred;
 	gfp_t gfp_flags;
-	long timeout;
 };
 
 struct nfs4_getdeviceinfo_args {
@@ -1343,6 +1341,32 @@ struct nfs42_falloc_res {
 	const struct nfs_server		*falloc_server;
 };
 
+struct nfs42_copy_args {
+	struct nfs4_sequence_args	seq_args;
+
+	struct nfs_fh			*src_fh;
+	nfs4_stateid			src_stateid;
+	u64				src_pos;
+
+	struct nfs_fh			*dst_fh;
+	nfs4_stateid			dst_stateid;
+	u64				dst_pos;
+
+	u64				count;
+};
+
+struct nfs42_write_res {
+	u64			count;
+	struct nfs_writeverf	verifier;
+};
+
+struct nfs42_copy_res {
+	struct nfs4_sequence_res	seq_res;
+	struct nfs42_write_res		write_res;
+	bool				consecutive;
+	bool				synchronous;
+};
+
 struct nfs42_seek_args {
 	struct nfs4_sequence_args	seq_args;
 
@@ -1431,7 +1455,7 @@ struct nfs_commit_completion_ops {
 };
 
 struct nfs_commit_info {
-	spinlock_t			*lock;	/* inode->i_lock */
+	struct inode 			*inode;	/* Needed for inode->i_lock */
 	struct nfs_mds_commit_info	*mds;
 	struct pnfs_ds_commit_info	*ds;
 	struct nfs_direct_req		*dreq;	/* O_DIRECT request */
diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 6a241a277249..899791573a40 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -127,7 +127,7 @@ struct rpc_authops {
 	void			(*destroy)(struct rpc_auth *);
 
 	struct rpc_cred *	(*lookup_cred)(struct rpc_auth *, struct auth_cred *, int);
-	struct rpc_cred *	(*crcreate)(struct rpc_auth*, struct auth_cred *, int);
+	struct rpc_cred *	(*crcreate)(struct rpc_auth*, struct auth_cred *, int, gfp_t);
 	int			(*list_pseudoflavors)(rpc_authflavor_t *, int);
 	rpc_authflavor_t	(*info2flavor)(struct rpcsec_gss_info *);
 	int			(*flavor2info)(rpc_authflavor_t,
@@ -167,6 +167,7 @@ void 			rpc_destroy_authunix(void);
 
 struct rpc_cred *	rpc_lookup_cred(void);
 struct rpc_cred *	rpc_lookup_cred_nonblock(void);
+struct rpc_cred *	rpc_lookup_generic_cred(struct auth_cred *, int, gfp_t);
 struct rpc_cred *	rpc_lookup_machine_cred(const char *service_name);
 int			rpcauth_register(const struct rpc_authops *);
 int			rpcauth_unregister(const struct rpc_authops *);
@@ -178,7 +179,7 @@ rpc_authflavor_t	rpcauth_get_pseudoflavor(rpc_authflavor_t,
 int			rpcauth_get_gssinfo(rpc_authflavor_t,
 				struct rpcsec_gss_info *);
 int			rpcauth_list_flavors(rpc_authflavor_t *, int);
-struct rpc_cred *	rpcauth_lookup_credcache(struct rpc_auth *, struct auth_cred *, int);
+struct rpc_cred *	rpcauth_lookup_credcache(struct rpc_auth *, struct auth_cred *, int, gfp_t);
 void			rpcauth_init_cred(struct rpc_cred *, const struct auth_cred *, struct rpc_auth *, const struct rpc_credops *);
 struct rpc_cred *	rpcauth_lookupcred(struct rpc_auth *, int);
 struct rpc_cred *	rpcauth_generic_bind_cred(struct rpc_task *, struct rpc_cred *, int);
@@ -201,9 +202,28 @@ char *			rpcauth_stringify_acceptor(struct rpc_cred *);
 static inline
 struct rpc_cred *	get_rpccred(struct rpc_cred *cred)
 {
-	atomic_inc(&cred->cr_count);
+	if (cred != NULL)
+		atomic_inc(&cred->cr_count);
 	return cred;
 }
 
+/**
+ * get_rpccred_rcu - get a reference to a cred using rcu-protected pointer
+ * @cred: cred of which to take a reference
+ *
+ * In some cases, we may have a pointer to a credential to which we
+ * want to take a reference, but don't already have one. Because these
+ * objects are freed using RCU, we can access the cr_count while its
+ * on its way to destruction and only take a reference if it's not already
+ * zero.
+ */
+static inline struct rpc_cred *
+get_rpccred_rcu(struct rpc_cred *cred)
+{
+	if (atomic_inc_not_zero(&cred->cr_count))
+		return cred;
+	return NULL;
+}
+
 #endif /* __KERNEL__ */
 #endif /* _LINUX_SUNRPC_AUTH_H */
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 9a7ddbaf116e..19c659d1c0f8 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -176,6 +176,7 @@ void		rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int);
 int		rpc_protocol(struct rpc_clnt *);
 struct net *	rpc_net_ns(struct rpc_clnt *);
 size_t		rpc_max_payload(struct rpc_clnt *);
+size_t		rpc_max_bc_payload(struct rpc_clnt *);
 unsigned long	rpc_get_timeout(struct rpc_clnt *clnt);
 void		rpc_force_rebind(struct rpc_clnt *);
 size_t		rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
diff --git a/include/linux/sunrpc/msg_prot.h b/include/linux/sunrpc/msg_prot.h
index 807371357160..59cbf16eaeb5 100644
--- a/include/linux/sunrpc/msg_prot.h
+++ b/include/linux/sunrpc/msg_prot.h
@@ -158,9 +158,9 @@ typedef __be32	rpc_fraghdr;
 
 /*
  * Note that RFC 1833 does not put any size restrictions on the
- * netid string, but all currently defined netid's fit in 4 bytes.
+ * netid string, but all currently defined netid's fit in 5 bytes.
  */
-#define RPCBIND_MAXNETIDLEN	(4u)
+#define RPCBIND_MAXNETIDLEN	(5u)
 
 /*
  * Universal addresses are introduced in RFC 1833 and further spelled
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index fb0d212e0d3a..5aa3834619a8 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -142,6 +142,7 @@ struct rpc_xprt_ops {
 	int		(*bc_setup)(struct rpc_xprt *xprt,
 				    unsigned int min_reqs);
 	int		(*bc_up)(struct svc_serv *serv, struct net *net);
+	size_t		(*bc_maxpayload)(struct rpc_xprt *xprt);
 	void		(*bc_free_rqst)(struct rpc_rqst *rqst);
 	void		(*bc_destroy)(struct rpc_xprt *xprt,
 				      unsigned int max_reqs);
diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index 767190b01363..39267dc3486a 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -52,7 +52,9 @@
 #define RPCRDMA_DEF_SLOT_TABLE	(128U)
 #define RPCRDMA_MAX_SLOT_TABLE	(256U)
 
-#define RPCRDMA_DEF_INLINE  (1024)	/* default inline max */
+#define RPCRDMA_MIN_INLINE  (1024)	/* min inline thresh */
+#define RPCRDMA_DEF_INLINE  (1024)	/* default inline thresh */
+#define RPCRDMA_MAX_INLINE  (3068)	/* max inline thresh */
 
 /* Memory registration strategies, by number.
  * This is part of a kernel / user space API. Do not remove. */
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 02f53674dc39..040ff627c18a 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -543,7 +543,7 @@ rpcauth_cache_enforce_limit(void)
  */
 struct rpc_cred *
 rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
-		int flags)
+		int flags, gfp_t gfp)
 {
 	LIST_HEAD(free);
 	struct rpc_cred_cache *cache = auth->au_credcache;
@@ -580,7 +580,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred,
 	if (flags & RPCAUTH_LOOKUP_RCU)
 		return ERR_PTR(-ECHILD);
 
-	new = auth->au_ops->crcreate(auth, acred, flags);
+	new = auth->au_ops->crcreate(auth, acred, flags, gfp);
 	if (IS_ERR(new)) {
 		cred = new;
 		goto out;
@@ -703,8 +703,7 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
 		new = rpcauth_bind_new_cred(task, lookupflags);
 	if (IS_ERR(new))
 		return PTR_ERR(new);
-	if (req->rq_cred != NULL)
-		put_rpccred(req->rq_cred);
+	put_rpccred(req->rq_cred);
 	req->rq_cred = new;
 	return 0;
 }
@@ -712,6 +711,8 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
 void
 put_rpccred(struct rpc_cred *cred)
 {
+	if (cred == NULL)
+		return;
 	/* Fast path for unhashed credentials */
 	if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) {
 		if (atomic_dec_and_test(&cred->cr_count))
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 41248b1820c7..54dd3fdead54 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -38,6 +38,13 @@ struct rpc_cred *rpc_lookup_cred(void)
 }
 EXPORT_SYMBOL_GPL(rpc_lookup_cred);
 
+struct rpc_cred *
+rpc_lookup_generic_cred(struct auth_cred *acred, int flags, gfp_t gfp)
+{
+	return rpcauth_lookup_credcache(&generic_auth, acred, flags, gfp);
+}
+EXPORT_SYMBOL_GPL(rpc_lookup_generic_cred);
+
 struct rpc_cred *rpc_lookup_cred_nonblock(void)
 {
 	return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU);
@@ -77,15 +84,15 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task,
 static struct rpc_cred *
 generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 {
-	return rpcauth_lookup_credcache(&generic_auth, acred, flags);
+	return rpcauth_lookup_credcache(&generic_auth, acred, flags, GFP_KERNEL);
 }
 
 static struct rpc_cred *
-generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
 {
 	struct generic_cred *gcred;
 
-	gcred = kmalloc(sizeof(*gcred), GFP_KERNEL);
+	gcred = kmalloc(sizeof(*gcred), gfp);
 	if (gcred == NULL)
 		return ERR_PTR(-ENOMEM);
 
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 15612ffa8d57..e64ae93d5b4f 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1299,11 +1299,11 @@ gss_destroy_cred(struct rpc_cred *cred)
 static struct rpc_cred *
 gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 {
-	return rpcauth_lookup_credcache(auth, acred, flags);
+	return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
 }
 
 static struct rpc_cred *
-gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
 {
 	struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth);
 	struct gss_cred	*cred = NULL;
@@ -1313,7 +1313,7 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 		__func__, from_kuid(&init_user_ns, acred->uid),
 		auth->au_flavor);
 
-	if (!(cred = kzalloc(sizeof(*cred), GFP_NOFS)))
+	if (!(cred = kzalloc(sizeof(*cred), gfp)))
 		goto out_err;
 
 	rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops);
diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c
index 0d3dd364c22f..9f65452b7cbc 100644
--- a/net/sunrpc/auth_unix.c
+++ b/net/sunrpc/auth_unix.c
@@ -52,11 +52,11 @@ unx_destroy(struct rpc_auth *auth)
 static struct rpc_cred *
 unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 {
-	return rpcauth_lookup_credcache(auth, acred, flags);
+	return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS);
 }
 
 static struct rpc_cred *
-unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
+unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp)
 {
 	struct unx_cred	*cred;
 	unsigned int groups = 0;
@@ -66,7 +66,7 @@ unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags)
 			from_kuid(&init_user_ns, acred->uid),
 			from_kgid(&init_user_ns, acred->gid));
 
-	if (!(cred = kmalloc(sizeof(*cred), GFP_NOFS)))
+	if (!(cred = kmalloc(sizeof(*cred), gfp)))
 		return ERR_PTR(-ENOMEM);
 
 	rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 7e0c9bf22df8..06b4df9faaa1 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1414,6 +1414,23 @@ size_t rpc_max_payload(struct rpc_clnt *clnt)
 EXPORT_SYMBOL_GPL(rpc_max_payload);
 
 /**
+ * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
+ * @clnt: RPC client to query
+ */
+size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
+{
+	struct rpc_xprt *xprt;
+	size_t ret;
+
+	rcu_read_lock();
+	xprt = rcu_dereference(clnt->cl_xprt);
+	ret = xprt->ops->bc_maxpayload(xprt);
+	rcu_read_unlock();
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
+
+/**
  * rpc_get_timeout - Get timeout for transport in units of HZ
  * @clnt: RPC client to query
  */
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 6bdb3865212d..c4f3cc0c0775 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -797,6 +797,8 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
 		xdr_set_iov(xdr, buf->head, buf->len);
 	else if (buf->page_len != 0)
 		xdr_set_page_base(xdr, 0, buf->len);
+	else
+		xdr_set_iov(xdr, buf->head, buf->len);
 	if (p != NULL && p > xdr->p && xdr->end >= p) {
 		xdr->nwords -= p - xdr->p;
 		xdr->p = p;
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 2dcd7640eeb5..87762d976b63 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -192,6 +192,22 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
 }
 
 /**
+ * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
+ * @xprt: transport
+ *
+ * Returns maximum size, in bytes, of a backchannel message
+ */
+size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+	size_t maxmsg;
+
+	maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
+	return maxmsg - RPCRDMA_HDRLEN_MIN;
+}
+
+/**
  * rpcrdma_bc_marshal_reply - Send backwards direction reply
  * @rqst: buffer containing RPC reply data
  *
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index b289e106540b..6326ebe8b595 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -35,10 +35,71 @@
 /* Maximum scatter/gather per FMR */
 #define RPCRDMA_MAX_FMR_SGES	(64)
 
+static struct workqueue_struct *fmr_recovery_wq;
+
+#define FMR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND)
+
+int
+fmr_alloc_recovery_wq(void)
+{
+	fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
+	return !fmr_recovery_wq ? -ENOMEM : 0;
+}
+
+void
+fmr_destroy_recovery_wq(void)
+{
+	struct workqueue_struct *wq;
+
+	if (!fmr_recovery_wq)
+		return;
+
+	wq = fmr_recovery_wq;
+	fmr_recovery_wq = NULL;
+	destroy_workqueue(wq);
+}
+
+static int
+__fmr_unmap(struct rpcrdma_mw *mw)
+{
+	LIST_HEAD(l);
+
+	list_add(&mw->fmr.fmr->list, &l);
+	return ib_unmap_fmr(&l);
+}
+
+/* Deferred reset of a single FMR. Generate a fresh rkey by
+ * replacing the MR. There's no recovery if this fails.
+ */
+static void
+__fmr_recovery_worker(struct work_struct *work)
+{
+	struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
+					    mw_work);
+	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+
+	__fmr_unmap(mw);
+	rpcrdma_put_mw(r_xprt, mw);
+	return;
+}
+
+/* A broken MR was discovered in a context that can't sleep.
+ * Defer recovery to the recovery worker.
+ */
+static void
+__fmr_queue_recovery(struct rpcrdma_mw *mw)
+{
+	INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
+	queue_work(fmr_recovery_wq, &mw->mw_work);
+}
+
 static int
 fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 	    struct rpcrdma_create_data_internal *cdata)
 {
+	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
+						      RPCRDMA_MAX_DATA_SEGS /
+						      RPCRDMA_MAX_FMR_SGES));
 	return 0;
 }
 
@@ -48,7 +109,7 @@ static size_t
 fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 {
 	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES);
+		     RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES);
 }
 
 static int
@@ -89,6 +150,7 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
 		if (IS_ERR(r->fmr.fmr))
 			goto out_fmr_err;
 
+		r->mw_xprt = r_xprt;
 		list_add(&r->mw_list, &buf->rb_mws);
 		list_add(&r->mw_all, &buf->rb_all);
 	}
@@ -104,15 +166,6 @@ out:
 	return rc;
 }
 
-static int
-__fmr_unmap(struct rpcrdma_mw *r)
-{
-	LIST_HEAD(l);
-
-	list_add(&r->fmr.fmr->list, &l);
-	return ib_unmap_fmr(&l);
-}
-
 /* Use the ib_map_phys_fmr() verb to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
@@ -183,15 +236,10 @@ static void
 __fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
 {
 	struct ib_device *device = r_xprt->rx_ia.ri_device;
-	struct rpcrdma_mw *mw = seg->rl_mw;
 	int nsegs = seg->mr_nsegs;
 
-	seg->rl_mw = NULL;
-
 	while (nsegs--)
 		rpcrdma_unmap_one(device, seg++);
-
-	rpcrdma_put_mw(r_xprt, mw);
 }
 
 /* Invalidate all memory regions that were registered for "req".
@@ -234,42 +282,50 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 		seg = &req->rl_segments[i];
 
 		__fmr_dma_unmap(r_xprt, seg);
+		rpcrdma_put_mw(r_xprt, seg->rl_mw);
 
 		i += seg->mr_nsegs;
 		seg->mr_nsegs = 0;
+		seg->rl_mw = NULL;
 	}
 
 	req->rl_nchunks = 0;
 }
 
-/* Use the ib_unmap_fmr() verb to prevent further remote
- * access via RDMA READ or RDMA WRITE.
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ *
+ * In the asynchronous case, DMA unmapping occurs first here
+ * because the rpcrdma_mr_seg is released immediately after this
+ * call. It's contents won't be available in __fmr_dma_unmap later.
+ * FIXME.
  */
-static int
-fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
+static void
+fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		  bool sync)
 {
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_mr_seg *seg1 = seg;
-	struct rpcrdma_mw *mw = seg1->rl_mw;
-	int rc, nsegs = seg->mr_nsegs;
+	struct rpcrdma_mr_seg *seg;
+	struct rpcrdma_mw *mw;
+	unsigned int i;
 
-	dprintk("RPC:       %s: FMR %p\n", __func__, mw);
+	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
+		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
 
-	seg1->rl_mw = NULL;
-	while (seg1->mr_nsegs--)
-		rpcrdma_unmap_one(ia->ri_device, seg++);
-	rc = __fmr_unmap(mw);
-	if (rc)
-		goto out_err;
-	rpcrdma_put_mw(r_xprt, mw);
-	return nsegs;
+		if (sync) {
+			/* ORDER */
+			__fmr_unmap(mw);
+			__fmr_dma_unmap(r_xprt, seg);
+			rpcrdma_put_mw(r_xprt, mw);
+		} else {
+			__fmr_dma_unmap(r_xprt, seg);
+			__fmr_queue_recovery(mw);
+		}
 
-out_err:
-	/* The FMR is abandoned, but remains in rb_all. fmr_op_destroy
-	 * will attempt to release it when the transport is destroyed.
-	 */
-	dprintk("RPC:       %s: ib_unmap_fmr status %i\n", __func__, rc);
-	return nsegs;
+		i += seg->mr_nsegs;
+		seg->mr_nsegs = 0;
+		seg->rl_mw = NULL;
+	}
 }
 
 static void
@@ -295,7 +351,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
 const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
 	.ro_map				= fmr_op_map,
 	.ro_unmap_sync			= fmr_op_unmap_sync,
-	.ro_unmap			= fmr_op_unmap,
+	.ro_unmap_safe			= fmr_op_unmap_safe,
 	.ro_open			= fmr_op_open,
 	.ro_maxpages			= fmr_op_maxpages,
 	.ro_init			= fmr_op_init,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 94c3fa910b85..c0947544babe 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -98,6 +98,47 @@ frwr_destroy_recovery_wq(void)
 	destroy_workqueue(wq);
 }
 
+static int
+__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
+{
+	struct rpcrdma_frmr *f = &r->frmr;
+	int rc;
+
+	rc = ib_dereg_mr(f->fr_mr);
+	if (rc) {
+		pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n",
+			rc, r);
+		return rc;
+	}
+
+	f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
+			       ia->ri_max_frmr_depth);
+	if (IS_ERR(f->fr_mr)) {
+		pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
+			PTR_ERR(f->fr_mr), r);
+		return PTR_ERR(f->fr_mr);
+	}
+
+	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
+	f->fr_state = FRMR_IS_INVALID;
+	return 0;
+}
+
+static void
+__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_frmr *f = &mw->frmr;
+	int rc;
+
+	rc = __frwr_reset_mr(ia, mw);
+	ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir);
+	if (rc)
+		return;
+
+	rpcrdma_put_mw(r_xprt, mw);
+}
+
 /* Deferred reset of a single FRMR. Generate a fresh rkey by
  * replacing the MR.
  *
@@ -109,26 +150,10 @@ static void
 __frwr_recovery_worker(struct work_struct *work)
 {
 	struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-					    frmr.fr_work);
-	struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
-	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
-	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
-
-	if (ib_dereg_mr(r->frmr.fr_mr))
-		goto out_fail;
+					    mw_work);
 
-	r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
-	if (IS_ERR(r->frmr.fr_mr))
-		goto out_fail;
-
-	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
-	r->frmr.fr_state = FRMR_IS_INVALID;
-	rpcrdma_put_mw(r_xprt, r);
+	__frwr_reset_and_unmap(r->mw_xprt, r);
 	return;
-
-out_fail:
-	pr_warn("RPC:       %s: FRMR %p unrecovered\n",
-		__func__, r);
 }
 
 /* A broken MR was discovered in a context that can't sleep.
@@ -137,8 +162,8 @@ out_fail:
 static void
 __frwr_queue_recovery(struct rpcrdma_mw *r)
 {
-	INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
-	queue_work(frwr_recovery_wq, &r->frmr.fr_work);
+	INIT_WORK(&r->mw_work, __frwr_recovery_worker);
+	queue_work(frwr_recovery_wq, &r->mw_work);
 }
 
 static int
@@ -152,11 +177,11 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
 	if (IS_ERR(f->fr_mr))
 		goto out_mr_err;
 
-	f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL);
-	if (!f->sg)
+	f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL);
+	if (!f->fr_sg)
 		goto out_list_err;
 
-	sg_init_table(f->sg, depth);
+	sg_init_table(f->fr_sg, depth);
 
 	init_completion(&f->fr_linv_done);
 
@@ -185,7 +210,7 @@ __frwr_release(struct rpcrdma_mw *r)
 	if (rc)
 		dprintk("RPC:       %s: ib_dereg_mr status %i\n",
 			__func__, rc);
-	kfree(r->frmr.sg);
+	kfree(r->frmr.fr_sg);
 }
 
 static int
@@ -231,6 +256,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 					       depth;
 	}
 
+	rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1,
+						      RPCRDMA_MAX_DATA_SEGS /
+						      ia->ri_max_frmr_depth));
 	return 0;
 }
 
@@ -243,7 +271,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
 	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
+		     RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth);
 }
 
 static void
@@ -350,9 +378,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
 			return rc;
 		}
 
+		r->mw_xprt = r_xprt;
 		list_add(&r->mw_list, &buf->rb_mws);
 		list_add(&r->mw_all, &buf->rb_all);
-		r->frmr.fr_xprt = r_xprt;
 	}
 
 	return 0;
@@ -396,12 +424,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
 	for (i = 0; i < nsegs;) {
 		if (seg->mr_page)
-			sg_set_page(&frmr->sg[i],
+			sg_set_page(&frmr->fr_sg[i],
 				    seg->mr_page,
 				    seg->mr_len,
 				    offset_in_page(seg->mr_offset));
 		else
-			sg_set_buf(&frmr->sg[i], seg->mr_offset,
+			sg_set_buf(&frmr->fr_sg[i], seg->mr_offset,
 				   seg->mr_len);
 
 		++seg;
@@ -412,25 +440,26 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
 	}
-	frmr->sg_nents = i;
+	frmr->fr_nents = i;
+	frmr->fr_dir = direction;
 
-	dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction);
+	dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction);
 	if (!dma_nents) {
 		pr_err("RPC:       %s: failed to dma map sg %p sg_nents %u\n",
-		       __func__, frmr->sg, frmr->sg_nents);
+		       __func__, frmr->fr_sg, frmr->fr_nents);
 		return -ENOMEM;
 	}
 
-	n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE);
-	if (unlikely(n != frmr->sg_nents)) {
+	n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE);
+	if (unlikely(n != frmr->fr_nents)) {
 		pr_err("RPC:       %s: failed to map mr %p (%u/%u)\n",
-		       __func__, frmr->fr_mr, n, frmr->sg_nents);
+		       __func__, frmr->fr_mr, n, frmr->fr_nents);
 		rc = n < 0 ? n : -EINVAL;
 		goto out_senderr;
 	}
 
 	dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
-		__func__, mw, frmr->sg_nents, mr->length);
+		__func__, mw, frmr->fr_nents, mr->length);
 
 	key = (u8)(mr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(mr, ++key);
@@ -452,18 +481,16 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	if (rc)
 		goto out_senderr;
 
-	seg1->mr_dir = direction;
 	seg1->rl_mw = mw;
 	seg1->mr_rkey = mr->rkey;
 	seg1->mr_base = mr->iova;
-	seg1->mr_nsegs = frmr->sg_nents;
+	seg1->mr_nsegs = frmr->fr_nents;
 	seg1->mr_len = mr->length;
 
-	return frmr->sg_nents;
+	return frmr->fr_nents;
 
 out_senderr:
 	dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-	ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
 	__frwr_queue_recovery(mw);
 	return rc;
 }
@@ -487,24 +514,6 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
 	return invalidate_wr;
 }
 
-static void
-__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
-		 int rc)
-{
-	struct ib_device *device = r_xprt->rx_ia.ri_device;
-	struct rpcrdma_mw *mw = seg->rl_mw;
-	struct rpcrdma_frmr *f = &mw->frmr;
-
-	seg->rl_mw = NULL;
-
-	ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir);
-
-	if (!rc)
-		rpcrdma_put_mw(r_xprt, mw);
-	else
-		__frwr_queue_recovery(mw);
-}
-
 /* Invalidate all memory regions that were registered for "req".
  *
  * Sleeps until it is safe for the host CPU to access the
@@ -518,6 +527,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	struct rpcrdma_mr_seg *seg;
 	unsigned int i, nchunks;
 	struct rpcrdma_frmr *f;
+	struct rpcrdma_mw *mw;
 	int rc;
 
 	dprintk("RPC:       %s: req %p\n", __func__, req);
@@ -558,11 +568,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * unless ri_id->qp is a valid pointer.
 	 */
 	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
-	if (rc) {
-		pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
-		rdma_disconnect(ia->ri_id);
-		goto unmap;
-	}
+	if (rc)
+		goto reset_mrs;
 
 	wait_for_completion(&f->fr_linv_done);
 
@@ -572,56 +579,65 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 unmap:
 	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
 		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
+		seg->rl_mw = NULL;
 
-		__frwr_dma_unmap(r_xprt, seg, rc);
+		ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents,
+				f->fr_dir);
+		rpcrdma_put_mw(r_xprt, mw);
 
 		i += seg->mr_nsegs;
 		seg->mr_nsegs = 0;
 	}
 
 	req->rl_nchunks = 0;
-}
+	return;
 
-/* Post a LOCAL_INV Work Request to prevent further remote access
- * via RDMA READ or RDMA WRITE.
- */
-static int
-frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
-	struct rpcrdma_mr_seg *seg1 = seg;
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_mw *mw = seg1->rl_mw;
-	struct rpcrdma_frmr *frmr = &mw->frmr;
-	struct ib_send_wr *invalidate_wr, *bad_wr;
-	int rc, nsegs = seg->mr_nsegs;
+reset_mrs:
+	pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
 
-	dprintk("RPC:       %s: FRMR %p\n", __func__, mw);
+	/* Find and reset the MRs in the LOCAL_INV WRs that did not
+	 * get posted. This is synchronous, and slow.
+	 */
+	for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
+		f = &mw->frmr;
 
-	seg1->rl_mw = NULL;
-	frmr->fr_state = FRMR_IS_INVALID;
-	invalidate_wr = &mw->frmr.fr_invwr;
+		if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
+			__frwr_reset_mr(ia, mw);
+			bad_wr = bad_wr->next;
+		}
 
-	memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-	frmr->fr_cqe.done = frwr_wc_localinv;
-	invalidate_wr->wr_cqe = &frmr->fr_cqe;
-	invalidate_wr->opcode = IB_WR_LOCAL_INV;
-	invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
-	DECR_CQCOUNT(&r_xprt->rx_ep);
+		i += seg->mr_nsegs;
+	}
+	goto unmap;
+}
 
-	ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
-	read_lock(&ia->ri_qplock);
-	rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr);
-	read_unlock(&ia->ri_qplock);
-	if (rc)
-		goto out_err;
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ */
+static void
+frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		   bool sync)
+{
+	struct rpcrdma_mr_seg *seg;
+	struct rpcrdma_mw *mw;
+	unsigned int i;
 
-	rpcrdma_put_mw(r_xprt, mw);
-	return nsegs;
+	for (i = 0; req->rl_nchunks; req->rl_nchunks--) {
+		seg = &req->rl_segments[i];
+		mw = seg->rl_mw;
 
-out_err:
-	dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-	__frwr_queue_recovery(mw);
-	return nsegs;
+		if (sync)
+			__frwr_reset_and_unmap(r_xprt, mw);
+		else
+			__frwr_queue_recovery(mw);
+
+		i += seg->mr_nsegs;
+		seg->mr_nsegs = 0;
+		seg->rl_mw = NULL;
+	}
 }
 
 static void
@@ -643,7 +659,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
 const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
 	.ro_map				= frwr_op_map,
 	.ro_unmap_sync			= frwr_op_unmap_sync,
-	.ro_unmap			= frwr_op_unmap,
+	.ro_unmap_safe			= frwr_op_unmap_safe,
 	.ro_open			= frwr_op_open,
 	.ro_maxpages			= frwr_op_maxpages,
 	.ro_init			= frwr_op_init,
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index 481b9b6f4a15..3750596cc432 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 		       __func__, PTR_ERR(mr));
 		return -ENOMEM;
 	}
-
 	ia->ri_dma_mr = mr;
+
+	rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int,
+						      RPCRDMA_MAX_DATA_SEGS,
+						      RPCRDMA_MAX_HDR_SEGS));
 	return 0;
 }
 
@@ -47,7 +50,7 @@ static size_t
 physical_op_maxpages(struct rpcrdma_xprt *r_xprt)
 {
 	return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-		     rpcrdma_max_segments(r_xprt));
+		     RPCRDMA_MAX_HDR_SEGS);
 }
 
 static int
@@ -71,17 +74,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	return 1;
 }
 
-/* Unmap a memory region, but leave it registered.
- */
-static int
-physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
-{
-	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-
-	rpcrdma_unmap_one(ia->ri_device, seg);
-	return 1;
-}
-
 /* DMA unmap all memory regions that were mapped for "req".
  */
 static void
@@ -94,6 +86,25 @@ physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 		rpcrdma_unmap_one(device, &req->rl_segments[i++]);
 }
 
+/* Use a slow, safe mechanism to invalidate all memory regions
+ * that were registered for "req".
+ *
+ * For physical memory registration, there is no good way to
+ * fence a single MR that has been advertised to the server. The
+ * client has already handed the server an R_key that cannot be
+ * invalidated and is shared by all MRs on this connection.
+ * Tearing down the PD might be the only safe choice, but it's
+ * not clear that a freshly acquired DMA R_key would be different
+ * than the one used by the PD that was just destroyed.
+ * FIXME.
+ */
+static void
+physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+		       bool sync)
+{
+	physical_op_unmap_sync(r_xprt, req);
+}
+
 static void
 physical_op_destroy(struct rpcrdma_buffer *buf)
 {
@@ -102,7 +113,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf)
 const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
 	.ro_map				= physical_op_map,
 	.ro_unmap_sync			= physical_op_unmap_sync,
-	.ro_unmap			= physical_op_unmap,
+	.ro_unmap_safe			= physical_op_unmap_safe,
 	.ro_open			= physical_op_open,
 	.ro_maxpages			= physical_op_maxpages,
 	.ro_init			= physical_op_init,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 888823bb6dae..35a81096e83d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -61,26 +61,84 @@ enum rpcrdma_chunktype {
 	rpcrdma_replych
 };
 
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 static const char transfertypes[][12] = {
-	"pure inline",	/* no chunks */
-	" read chunk",	/* some argument via rdma read */
-	"*read chunk",	/* entire request via rdma read */
-	"write chunk",	/* some result via rdma write */
+	"inline",	/* no chunks */
+	"read list",	/* some argument via rdma read */
+	"*read list",	/* entire request via rdma read */
+	"write list",	/* some result via rdma write */
 	"reply chunk"	/* entire reply via rdma write */
 };
-#endif
+
+/* Returns size of largest RPC-over-RDMA header in a Call message
+ *
+ * The largest Call header contains a full-size Read list and a
+ * minimal Reply chunk.
+ */
+static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
+{
+	unsigned int size;
+
+	/* Fixed header fields and list discriminators */
+	size = RPCRDMA_HDRLEN_MIN;
+
+	/* Maximum Read list size */
+	maxsegs += 2;	/* segment for head and tail buffers */
+	size = maxsegs * sizeof(struct rpcrdma_read_chunk);
+
+	/* Minimal Read chunk size */
+	size += sizeof(__be32);	/* segment count */
+	size += sizeof(struct rpcrdma_segment);
+	size += sizeof(__be32);	/* list discriminator */
+
+	dprintk("RPC:       %s: max call header size = %u\n",
+		__func__, size);
+	return size;
+}
+
+/* Returns size of largest RPC-over-RDMA header in a Reply message
+ *
+ * There is only one Write list or one Reply chunk per Reply
+ * message.  The larger list is the Write list.
+ */
+static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
+{
+	unsigned int size;
+
+	/* Fixed header fields and list discriminators */
+	size = RPCRDMA_HDRLEN_MIN;
+
+	/* Maximum Write list size */
+	maxsegs += 2;	/* segment for head and tail buffers */
+	size = sizeof(__be32);		/* segment count */
+	size += maxsegs * sizeof(struct rpcrdma_segment);
+	size += sizeof(__be32);	/* list discriminator */
+
+	dprintk("RPC:       %s: max reply header size = %u\n",
+		__func__, size);
+	return size;
+}
+
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia,
+				  struct rpcrdma_create_data_internal *cdata,
+				  unsigned int maxsegs)
+{
+	ia->ri_max_inline_write = cdata->inline_wsize -
+				  rpcrdma_max_call_header_size(maxsegs);
+	ia->ri_max_inline_read = cdata->inline_rsize -
+				 rpcrdma_max_reply_header_size(maxsegs);
+}
 
 /* The client can send a request inline as long as the RPCRDMA header
  * plus the RPC call fit under the transport's inline limit. If the
  * combined call message size exceeds that limit, the client must use
  * the read chunk list for this operation.
  */
-static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
+static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
+				struct rpc_rqst *rqst)
 {
-	unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+	return rqst->rq_snd_buf.len <= ia->ri_max_inline_write;
 }
 
 /* The client can't know how large the actual reply will be. Thus it
@@ -89,11 +147,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
  * limit, the client must provide a write list or a reply chunk for
  * this request.
  */
-static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
+static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
+				   struct rpc_rqst *rqst)
 {
-	unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+	return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
 }
 
 static int
@@ -226,23 +285,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 	return n;
 }
 
-/*
- * Create read/write chunk lists, and reply chunks, for RDMA
- *
- *   Assume check against THRESHOLD has been done, and chunks are required.
- *   Assume only encoding one list entry for read|write chunks. The NFSv3
- *     protocol is simple enough to allow this as it only has a single "bulk
- *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
- *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
- *
- * When used for a single reply chunk (which is a special write
- * chunk used for the entire reply, rather than just the data), it
- * is used primarily for READDIR and READLINK which would otherwise
- * be severely size-limited by a small rdma inline read max. The server
- * response will come back as an RDMA Write, followed by a message
- * of type RDMA_NOMSG carrying the xid and length. As a result, reply
- * chunks do not provide data alignment, however they do not require
- * "fixup" (moving the response to the upper layer buffer) either.
+static inline __be32 *
+xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
+{
+	*iptr++ = cpu_to_be32(seg->mr_rkey);
+	*iptr++ = cpu_to_be32(seg->mr_len);
+	return xdr_encode_hyper(iptr, seg->mr_base);
+}
+
+/* XDR-encode the Read list. Supports encoding a list of read
+ * segments that belong to a single read chunk.
  *
  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
  *
@@ -250,131 +302,190 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
  *   N elements, position P (same P for all chunks of same arg!):
  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
  *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Read list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+			 struct rpcrdma_req *req, struct rpc_rqst *rqst,
+			 __be32 *iptr, enum rpcrdma_chunktype rtype)
+{
+	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	unsigned int pos;
+	int n, nsegs;
+
+	if (rtype == rpcrdma_noch) {
+		*iptr++ = xdr_zero;	/* item not present */
+		return iptr;
+	}
+
+	pos = rqst->rq_snd_buf.head[0].iov_len;
+	if (rtype == rpcrdma_areadch)
+		pos = 0;
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
+				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
+	if (nsegs < 0)
+		return ERR_PTR(nsegs);
+
+	do {
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
+		if (n <= 0)
+			return ERR_PTR(n);
+
+		*iptr++ = xdr_one;	/* item present */
+
+		/* All read segments in this chunk
+		 * have the same "position".
+		 */
+		*iptr++ = cpu_to_be32(pos);
+		iptr = xdr_encode_rdma_segment(iptr, seg);
+
+		dprintk("RPC: %5u %s: read segment pos %u "
+			"%d@0x%016llx:0x%08x (%s)\n",
+			rqst->rq_task->tk_pid, __func__, pos,
+			seg->mr_len, (unsigned long long)seg->mr_base,
+			seg->mr_rkey, n < nsegs ? "more" : "last");
+
+		r_xprt->rx_stats.read_chunk_count++;
+		req->rl_nchunks++;
+		seg += n;
+		nsegs -= n;
+	} while (nsegs);
+	req->rl_nextseg = seg;
+
+	/* Finish Read list */
+	*iptr++ = xdr_zero;	/* Next item not present */
+	return iptr;
+}
+
+/* XDR-encode the Write list. Supports encoding a list containing
+ * one array of plain segments that belong to a single write chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
  *  Write chunklist (a list of (one) counted array):
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO - 0
  *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Write list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+			  struct rpc_rqst *rqst, __be32 *iptr,
+			  enum rpcrdma_chunktype wtype)
+{
+	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	int n, nsegs, nchunks;
+	__be32 *segcount;
+
+	if (wtype != rpcrdma_writech) {
+		*iptr++ = xdr_zero;	/* no Write list present */
+		return iptr;
+	}
+
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
+				     rqst->rq_rcv_buf.head[0].iov_len,
+				     wtype, seg,
+				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
+	if (nsegs < 0)
+		return ERR_PTR(nsegs);
+
+	*iptr++ = xdr_one;	/* Write list present */
+	segcount = iptr++;	/* save location of segment count */
+
+	nchunks = 0;
+	do {
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+		if (n <= 0)
+			return ERR_PTR(n);
+
+		iptr = xdr_encode_rdma_segment(iptr, seg);
+
+		dprintk("RPC: %5u %s: write segment "
+			"%d@0x016%llx:0x%08x (%s)\n",
+			rqst->rq_task->tk_pid, __func__,
+			seg->mr_len, (unsigned long long)seg->mr_base,
+			seg->mr_rkey, n < nsegs ? "more" : "last");
+
+		r_xprt->rx_stats.write_chunk_count++;
+		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+		req->rl_nchunks++;
+		nchunks++;
+		seg   += n;
+		nsegs -= n;
+	} while (nsegs);
+	req->rl_nextseg = seg;
+
+	/* Update count of segments in this Write chunk */
+	*segcount = cpu_to_be32(nchunks);
+
+	/* Finish Write list */
+	*iptr++ = xdr_zero;	/* Next item not present */
+	return iptr;
+}
+
+/* XDR-encode the Reply chunk. Supports encoding an array of plain
+ * segments that belong to a single write (reply) chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
  *  Reply chunk (a counted array):
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
  *
- * Returns positive RPC/RDMA header size, or negative errno.
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Reply chunk, or an error pointer.
  */
-
-static ssize_t
-rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
-		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
+static __be32 *
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+			   struct rpcrdma_req *req, struct rpc_rqst *rqst,
+			   __be32 *iptr, enum rpcrdma_chunktype wtype)
 {
-	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-	int n, nsegs, nchunks = 0;
-	unsigned int pos;
-	struct rpcrdma_mr_seg *seg = req->rl_segments;
-	struct rpcrdma_read_chunk *cur_rchunk = NULL;
-	struct rpcrdma_write_array *warray = NULL;
-	struct rpcrdma_write_chunk *cur_wchunk = NULL;
-	__be32 *iptr = headerp->rm_body.rm_chunks;
-	int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool);
-
-	if (type == rpcrdma_readch || type == rpcrdma_areadch) {
-		/* a read chunk - server will RDMA Read our memory */
-		cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
-	} else {
-		/* a write or reply chunk - server will RDMA Write our memory */
-		*iptr++ = xdr_zero;	/* encode a NULL read chunk list */
-		if (type == rpcrdma_replych)
-			*iptr++ = xdr_zero;	/* a NULL write chunk list */
-		warray = (struct rpcrdma_write_array *) iptr;
-		cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
-	}
+	struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+	int n, nsegs, nchunks;
+	__be32 *segcount;
 
-	if (type == rpcrdma_replych || type == rpcrdma_areadch)
-		pos = 0;
-	else
-		pos = target->head[0].iov_len;
+	if (wtype != rpcrdma_replych) {
+		*iptr++ = xdr_zero;	/* no Reply chunk present */
+		return iptr;
+	}
 
-	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
+	nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+				     RPCRDMA_MAX_SEGS - req->rl_nchunks);
 	if (nsegs < 0)
-		return nsegs;
+		return ERR_PTR(nsegs);
 
-	map = r_xprt->rx_ia.ri_ops->ro_map;
+	*iptr++ = xdr_one;	/* Reply chunk present */
+	segcount = iptr++;	/* save location of segment count */
+
+	nchunks = 0;
 	do {
-		n = map(r_xprt, seg, nsegs, cur_wchunk != NULL);
+		n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
 		if (n <= 0)
-			goto out;
-		if (cur_rchunk) {	/* read */
-			cur_rchunk->rc_discrim = xdr_one;
-			/* all read chunks have the same "position" */
-			cur_rchunk->rc_position = cpu_to_be32(pos);
-			cur_rchunk->rc_target.rs_handle =
-						cpu_to_be32(seg->mr_rkey);
-			cur_rchunk->rc_target.rs_length =
-						cpu_to_be32(seg->mr_len);
-			xdr_encode_hyper(
-					(__be32 *)&cur_rchunk->rc_target.rs_offset,
-					seg->mr_base);
-			dprintk("RPC:       %s: read chunk "
-				"elem %d@0x%llx:0x%x pos %u (%s)\n", __func__,
-				seg->mr_len, (unsigned long long)seg->mr_base,
-				seg->mr_rkey, pos, n < nsegs ? "more" : "last");
-			cur_rchunk++;
-			r_xprt->rx_stats.read_chunk_count++;
-		} else {		/* write/reply */
-			cur_wchunk->wc_target.rs_handle =
-						cpu_to_be32(seg->mr_rkey);
-			cur_wchunk->wc_target.rs_length =
-						cpu_to_be32(seg->mr_len);
-			xdr_encode_hyper(
-					(__be32 *)&cur_wchunk->wc_target.rs_offset,
-					seg->mr_base);
-			dprintk("RPC:       %s: %s chunk "
-				"elem %d@0x%llx:0x%x (%s)\n", __func__,
-				(type == rpcrdma_replych) ? "reply" : "write",
-				seg->mr_len, (unsigned long long)seg->mr_base,
-				seg->mr_rkey, n < nsegs ? "more" : "last");
-			cur_wchunk++;
-			if (type == rpcrdma_replych)
-				r_xprt->rx_stats.reply_chunk_count++;
-			else
-				r_xprt->rx_stats.write_chunk_count++;
-			r_xprt->rx_stats.total_rdma_request += seg->mr_len;
-		}
+			return ERR_PTR(n);
+
+		iptr = xdr_encode_rdma_segment(iptr, seg);
+
+		dprintk("RPC: %5u %s: reply segment "
+			"%d@0x%016llx:0x%08x (%s)\n",
+			rqst->rq_task->tk_pid, __func__,
+			seg->mr_len, (unsigned long long)seg->mr_base,
+			seg->mr_rkey, n < nsegs ? "more" : "last");
+
+		r_xprt->rx_stats.reply_chunk_count++;
+		r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+		req->rl_nchunks++;
 		nchunks++;
 		seg   += n;
 		nsegs -= n;
 	} while (nsegs);
+	req->rl_nextseg = seg;
 
-	/* success. all failures return above */
-	req->rl_nchunks = nchunks;
-
-	/*
-	 * finish off header. If write, marshal discrim and nchunks.
-	 */
-	if (cur_rchunk) {
-		iptr = (__be32 *) cur_rchunk;
-		*iptr++ = xdr_zero;	/* finish the read chunk list */
-		*iptr++ = xdr_zero;	/* encode a NULL write chunk list */
-		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */
-	} else {
-		warray->wc_discrim = xdr_one;
-		warray->wc_nchunks = cpu_to_be32(nchunks);
-		iptr = (__be32 *) cur_wchunk;
-		if (type == rpcrdma_writech) {
-			*iptr++ = xdr_zero; /* finish the write chunk list */
-			*iptr++ = xdr_zero; /* encode a NULL reply chunk */
-		}
-	}
-
-	/*
-	 * Return header size.
-	 */
-	return (unsigned char *)iptr - (unsigned char *)headerp;
+	/* Update count of segments in the Reply chunk */
+	*segcount = cpu_to_be32(nchunks);
 
-out:
-	for (pos = 0; nchunks--;)
-		pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
-						      &req->rl_segments[pos]);
-	return n;
+	return iptr;
 }
 
 /*
@@ -440,13 +551,10 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
  * Marshal a request: the primary job of this routine is to choose
  * the transfer modes. See comments below.
  *
- * Uses multiple RDMA IOVs for a request:
- *  [0] -- RPC RDMA header, which uses memory from the *start* of the
- *         preregistered buffer that already holds the RPC data in
- *         its middle.
- *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
- *  [2] -- optional padding.
- *  [3] -- if padded, header only in [1] and data here.
+ * Prepares up to two IOVs per Call message:
+ *
+ *  [0] -- RPC RDMA header
+ *  [1] -- the RPC header/data
  *
  * Returns zero on success, otherwise a negative errno.
  */
@@ -457,24 +565,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-	char *base;
-	size_t rpclen;
-	ssize_t hdrlen;
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
+	ssize_t hdrlen;
+	size_t rpclen;
+	__be32 *iptr;
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
 		return rpcrdma_bc_marshal_reply(rqst);
 #endif
 
-	/*
-	 * rpclen gets amount of data in first buffer, which is the
-	 * pre-registered buffer.
-	 */
-	base = rqst->rq_svec[0].iov_base;
-	rpclen = rqst->rq_svec[0].iov_len;
-
 	headerp = rdmab_to_msg(req->rl_rdmabuf);
 	/* don't byte-swap XID, it's already done in request */
 	headerp->rm_xid = rqst->rq_xid;
@@ -485,15 +586,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	/*
 	 * Chunks needed for results?
 	 *
-	 * o Read ops return data as write chunk(s), header as inline.
 	 * o If the expected result is under the inline threshold, all ops
 	 *   return as inline.
+	 * o Large read ops return data as write chunk(s), header as
+	 *   inline.
 	 * o Large non-read ops return as a single reply chunk.
 	 */
-	if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
-		wtype = rpcrdma_writech;
-	else if (rpcrdma_results_inline(rqst))
+	if (rpcrdma_results_inline(r_xprt, rqst))
 		wtype = rpcrdma_noch;
+	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+		wtype = rpcrdma_writech;
 	else
 		wtype = rpcrdma_replych;
 
@@ -511,10 +613,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * that both has a data payload, and whose non-data arguments
 	 * by themselves are larger than the inline threshold.
 	 */
-	if (rpcrdma_args_inline(rqst)) {
+	if (rpcrdma_args_inline(r_xprt, rqst)) {
 		rtype = rpcrdma_noch;
+		rpcrdma_inline_pullup(rqst);
+		rpclen = rqst->rq_svec[0].iov_len;
 	} else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
 		rtype = rpcrdma_readch;
+		rpclen = rqst->rq_svec[0].iov_len;
+		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
 	} else {
 		r_xprt->rx_stats.nomsg_call_count++;
 		headerp->rm_type = htonl(RDMA_NOMSG);
@@ -522,57 +628,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 		rpclen = 0;
 	}
 
-	/* The following simplification is not true forever */
-	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
-		wtype = rpcrdma_noch;
-	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
-		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
-			__func__);
-		return -EIO;
-	}
-
-	hdrlen = RPCRDMA_HDRLEN_MIN;
-
-	/*
-	 * Pull up any extra send data into the preregistered buffer.
-	 * When padding is in use and applies to the transfer, insert
-	 * it and change the message type.
+	/* This implementation supports the following combinations
+	 * of chunk lists in one RPC-over-RDMA Call message:
+	 *
+	 *   - Read list
+	 *   - Write list
+	 *   - Reply chunk
+	 *   - Read list + Reply chunk
+	 *
+	 * It might not yet support the following combinations:
+	 *
+	 *   - Read list + Write list
+	 *
+	 * It does not support the following combinations:
+	 *
+	 *   - Write list + Reply chunk
+	 *   - Read list + Write list + Reply chunk
+	 *
+	 * This implementation supports only a single chunk in each
+	 * Read or Write list. Thus for example the client cannot
+	 * send a Call message with a Position Zero Read chunk and a
+	 * regular Read chunk at the same time.
 	 */
-	if (rtype == rpcrdma_noch) {
-
-		rpcrdma_inline_pullup(rqst);
-
-		headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
-		headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
-		headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
-		/* new length after pullup */
-		rpclen = rqst->rq_svec[0].iov_len;
-	} else if (rtype == rpcrdma_readch)
-		rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
-	if (rtype != rpcrdma_noch) {
-		hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
-					       headerp, rtype);
-		wtype = rtype;	/* simplify dprintk */
-
-	} else if (wtype != rpcrdma_noch) {
-		hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
-					       headerp, wtype);
-	}
-	if (hdrlen < 0)
-		return hdrlen;
+	req->rl_nchunks = 0;
+	req->rl_nextseg = req->rl_segments;
+	iptr = headerp->rm_body.rm_chunks;
+	iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
+	if (IS_ERR(iptr))
+		goto out_unmap;
+	iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
+	if (IS_ERR(iptr))
+		goto out_unmap;
+	iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
+	if (IS_ERR(iptr))
+		goto out_unmap;
+	hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
+
+	if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+		goto out_overflow;
+
+	dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
+		rqst->rq_task->tk_pid, __func__,
+		transfertypes[rtype], transfertypes[wtype],
+		hdrlen, rpclen);
 
-	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd"
-		" headerp 0x%p base 0x%p lkey 0x%x\n",
-		__func__, transfertypes[wtype], hdrlen, rpclen,
-		headerp, base, rdmab_lkey(req->rl_rdmabuf));
-
-	/*
-	 * initialize send_iov's - normally only two: rdma chunk header and
-	 * single preregistered RPC header buffer, but if padding is present,
-	 * then use a preregistered (and zeroed) pad buffer between the RPC
-	 * header and any write data. In all non-rdma cases, any following
-	 * data has been copied into the RPC header buffer.
-	 */
 	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
 	req->rl_send_iov[0].length = hdrlen;
 	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
@@ -587,6 +686,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 
 	req->rl_niovs = 2;
 	return 0;
+
+out_overflow:
+	pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
+		hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
+	/* Terminate this RPC. Chunks registered above will be
+	 * released by xprt_release -> xprt_rmda_free .
+	 */
+	return -EIO;
+
+out_unmap:
+	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
+	return PTR_ERR(iptr);
 }
 
 /*
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index b1b009f10ea3..99d2e5b72726 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -73,6 +73,8 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
 
 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
+static unsigned int min_inline_size = RPCRDMA_MIN_INLINE;
+static unsigned int max_inline_size = RPCRDMA_MAX_INLINE;
 static unsigned int zero;
 static unsigned int max_padding = PAGE_SIZE;
 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
@@ -96,6 +98,8 @@ static struct ctl_table xr_tunables_table[] = {
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec,
+		.extra1		= &min_inline_size,
+		.extra2		= &max_inline_size,
 	},
 	{
 		.procname	= "rdma_max_inline_write",
@@ -103,6 +107,8 @@ static struct ctl_table xr_tunables_table[] = {
 		.maxlen		= sizeof(unsigned int),
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec,
+		.extra1		= &min_inline_size,
+		.extra2		= &max_inline_size,
 	},
 	{
 		.procname	= "rdma_inline_write_padding",
@@ -508,6 +514,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
 out:
 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
 	req->rl_connect_cookie = 0;	/* our reserved value */
+	req->rl_task = task;
 	return req->rl_sendbuf->rg_base;
 
 out_rdmabuf:
@@ -564,7 +571,6 @@ xprt_rdma_free(void *buffer)
 	struct rpcrdma_req *req;
 	struct rpcrdma_xprt *r_xprt;
 	struct rpcrdma_regbuf *rb;
-	int i;
 
 	if (buffer == NULL)
 		return;
@@ -578,11 +584,8 @@ xprt_rdma_free(void *buffer)
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	for (i = 0; req->rl_nchunks;) {
-		--req->rl_nchunks;
-		i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
-						    &req->rl_segments[i]);
-	}
+	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req,
+					    !RPC_IS_ASYNC(req->rl_task));
 
 	rpcrdma_buffer_put(req);
 }
@@ -707,6 +710,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	.bc_setup		= xprt_rdma_bc_setup,
 	.bc_up			= xprt_rdma_bc_up,
+	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
 	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
 	.bc_destroy		= xprt_rdma_bc_destroy,
 #endif
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f5ed9f982cd7..b044d98a1370 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -203,15 +203,6 @@ out_fail:
 	goto out_schedule;
 }
 
-static void
-rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
-{
-	struct ib_wc wc;
-
-	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-		rpcrdma_receive_wc(NULL, &wc);
-}
-
 static int
 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 {
@@ -374,23 +365,6 @@ out:
 }
 
 /*
- * Drain any cq, prior to teardown.
- */
-static void
-rpcrdma_clean_cq(struct ib_cq *cq)
-{
-	struct ib_wc wc;
-	int count = 0;
-
-	while (1 == ib_poll_cq(cq, 1, &wc))
-		++count;
-
-	if (count)
-		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
-			__func__, count, wc.opcode);
-}
-
-/*
  * Exported functions.
  */
 
@@ -459,7 +433,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
 		__func__, ia->ri_ops->ro_displayname);
 
-	rwlock_init(&ia->ri_qplock);
 	return 0;
 
 out3:
@@ -515,7 +488,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 			__func__);
 		return -ENOMEM;
 	}
-	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS;
+	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
 
 	/* check provider's send/recv wr limits */
 	if (cdata->max_requests > max_qp_wr)
@@ -526,11 +499,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
+	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
 	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 	if (rc)
 		return rc;
 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
 	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
+	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
 	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
 	ep->rep_attr.cap.max_recv_sge = 1;
 	ep->rep_attr.cap.max_inline_data = 0;
@@ -578,6 +553,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_attr.recv_cq = recvcq;
 
 	/* Initialize cma parameters */
+	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
 
 	/* RPC/RDMA does not use private data */
 	ep->rep_remote_cma.private_data = NULL;
@@ -591,7 +567,16 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 		ep->rep_remote_cma.responder_resources =
 						ia->ri_device->attrs.max_qp_rd_atom;
 
-	ep->rep_remote_cma.retry_count = 7;
+	/* Limit transport retries so client can detect server
+	 * GID changes quickly. RPC layer handles re-establishing
+	 * transport connection and retransmission.
+	 */
+	ep->rep_remote_cma.retry_count = 6;
+
+	/* RPC-over-RDMA handles its own flow control. In addition,
+	 * make all RNR NAKs visible so we know that RPC-over-RDMA
+	 * flow control is working correctly (no NAKs should be seen).
+	 */
 	ep->rep_remote_cma.flow_control = 0;
 	ep->rep_remote_cma.rnr_retry_count = 0;
 
@@ -622,13 +607,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
 	cancel_delayed_work_sync(&ep->rep_connect_worker);
 
-	if (ia->ri_id->qp)
-		rpcrdma_ep_disconnect(ep, ia);
-
-	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
-	rpcrdma_clean_cq(ep->rep_attr.send_cq);
-
 	if (ia->ri_id->qp) {
+		rpcrdma_ep_disconnect(ep, ia);
 		rdma_destroy_qp(ia->ri_id);
 		ia->ri_id->qp = NULL;
 	}
@@ -659,7 +639,6 @@ retry:
 		dprintk("RPC:       %s: reconnecting...\n", __func__);
 
 		rpcrdma_ep_disconnect(ep, ia);
-		rpcrdma_flush_cqs(ep);
 
 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
 		id = rpcrdma_create_id(xprt, ia,
@@ -692,10 +671,8 @@ retry:
 			goto out;
 		}
 
-		write_lock(&ia->ri_qplock);
 		old = ia->ri_id;
 		ia->ri_id = id;
-		write_unlock(&ia->ri_qplock);
 
 		rdma_destroy_qp(old);
 		rpcrdma_destroy_id(old);
@@ -785,7 +762,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
 	int rc;
 
-	rpcrdma_flush_cqs(ep);
 	rc = rdma_disconnect(ia->ri_id);
 	if (!rc) {
 		/* returns without wait if not connected */
@@ -797,6 +773,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
 		ep->rep_connected = rc;
 	}
+
+	ib_drain_qp(ia->ri_id->qp);
 }
 
 struct rpcrdma_req *
@@ -1271,25 +1249,3 @@ out_rc:
 	rpcrdma_recv_buffer_put(rep);
 	return rc;
 }
-
-/* How many chunk list items fit within our inline buffers?
- */
-unsigned int
-rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
-{
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	int bytes, segments;
-
-	bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
-	bytes -= RPCRDMA_HDRLEN_MIN;
-	if (bytes < sizeof(struct rpcrdma_segment) * 2) {
-		pr_warn("RPC:       %s: inline threshold too small\n",
-			__func__);
-		return 0;
-	}
-
-	segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
-	dprintk("RPC:       %s: max chunk list size = %d segments\n",
-		__func__, segments);
-	return segments;
-}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2ebc743cb96f..95cdc66225ee 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -65,7 +65,6 @@
  */
 struct rpcrdma_ia {
 	const struct rpcrdma_memreg_ops	*ri_ops;
-	rwlock_t		ri_qplock;
 	struct ib_device	*ri_device;
 	struct rdma_cm_id 	*ri_id;
 	struct ib_pd		*ri_pd;
@@ -73,6 +72,8 @@ struct rpcrdma_ia {
 	struct completion	ri_done;
 	int			ri_async_rc;
 	unsigned int		ri_max_frmr_depth;
+	unsigned int		ri_max_inline_write;
+	unsigned int		ri_max_inline_read;
 	struct ib_qp_attr	ri_qp_attr;
 	struct ib_qp_init_attr	ri_qp_init_attr;
 };
@@ -144,6 +145,26 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
 
 #define RPCRDMA_DEF_GFP		(GFP_NOIO | __GFP_NOWARN)
 
+/* To ensure a transport can always make forward progress,
+ * the number of RDMA segments allowed in header chunk lists
+ * is capped at 8. This prevents less-capable devices and
+ * memory registrations from overrunning the Send buffer
+ * while building chunk lists.
+ *
+ * Elements of the Read list take up more room than the
+ * Write list or Reply chunk. 8 read segments means the Read
+ * list (or Write list or Reply chunk) cannot consume more
+ * than
+ *
+ * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes.
+ *
+ * And the fixed part of the header is another 24 bytes.
+ *
+ * The smallest inline threshold is 1024 bytes, ensuring that
+ * at least 750 bytes are available for RPC messages.
+ */
+#define RPCRDMA_MAX_HDR_SEGS	(8)
+
 /*
  * struct rpcrdma_rep -- this structure encapsulates state required to recv
  * and complete a reply, asychronously. It needs several pieces of
@@ -162,7 +183,9 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
  */
 
 #define RPCRDMA_MAX_DATA_SEGS	((1 * 1024 * 1024) / PAGE_SIZE)
-#define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
+
+/* data segments + head/tail for Call + head/tail for Reply */
+#define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 4)
 
 struct rpcrdma_buffer;
 
@@ -198,14 +221,13 @@ enum rpcrdma_frmr_state {
 };
 
 struct rpcrdma_frmr {
-	struct scatterlist		*sg;
-	int				sg_nents;
+	struct scatterlist		*fr_sg;
+	int				fr_nents;
+	enum dma_data_direction		fr_dir;
 	struct ib_mr			*fr_mr;
 	struct ib_cqe			fr_cqe;
 	enum rpcrdma_frmr_state		fr_state;
 	struct completion		fr_linv_done;
-	struct work_struct		fr_work;
-	struct rpcrdma_xprt		*fr_xprt;
 	union {
 		struct ib_reg_wr	fr_regwr;
 		struct ib_send_wr	fr_invwr;
@@ -222,6 +244,8 @@ struct rpcrdma_mw {
 		struct rpcrdma_fmr	fmr;
 		struct rpcrdma_frmr	frmr;
 	};
+	struct work_struct	mw_work;
+	struct rpcrdma_xprt	*mw_xprt;
 	struct list_head	mw_list;
 	struct list_head	mw_all;
 };
@@ -270,12 +294,14 @@ struct rpcrdma_req {
 	unsigned int		rl_niovs;
 	unsigned int		rl_nchunks;
 	unsigned int		rl_connect_cookie;
+	struct rpc_task		*rl_task;
 	struct rpcrdma_buffer	*rl_buffer;
 	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
 	struct ib_sge		rl_send_iov[RPCRDMA_MAX_IOVS];
 	struct rpcrdma_regbuf	*rl_rdmabuf;
 	struct rpcrdma_regbuf	*rl_sendbuf;
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
+	struct rpcrdma_mr_seg	*rl_nextseg;
 
 	struct ib_cqe		rl_cqe;
 	struct list_head	rl_all;
@@ -372,8 +398,8 @@ struct rpcrdma_memreg_ops {
 				  struct rpcrdma_mr_seg *, int, bool);
 	void		(*ro_unmap_sync)(struct rpcrdma_xprt *,
 					 struct rpcrdma_req *);
-	int		(*ro_unmap)(struct rpcrdma_xprt *,
-				    struct rpcrdma_mr_seg *);
+	void		(*ro_unmap_safe)(struct rpcrdma_xprt *,
+					 struct rpcrdma_req *, bool);
 	int		(*ro_open)(struct rpcrdma_ia *,
 				   struct rpcrdma_ep *,
 				   struct rpcrdma_create_data_internal *);
@@ -456,7 +482,6 @@ struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
 void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 			 struct rpcrdma_regbuf *);
 
-unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
 int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
 int frwr_alloc_recovery_wq(void);
@@ -519,6 +544,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
  */
 int rpcrdma_marshal_req(struct rpc_rqst *);
+void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *,
+				  struct rpcrdma_create_data_internal *,
+				  unsigned int);
 
 /* RPC/RDMA module init - xprtrdma/transport.c
  */
@@ -534,6 +562,7 @@ void xprt_rdma_cleanup(void);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
 int xprt_rdma_bc_up(struct svc_serv *, struct net *);
+size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
 int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
 int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index b90c5397b5e1..2d3e0c42361e 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1364,6 +1364,11 @@ static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
 		return ret;
 	return 0;
 }
+
+static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
+{
+	return PAGE_SIZE;
+}
 #else
 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 					struct xdr_skb_reader *desc)
@@ -2661,6 +2666,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
 #ifdef CONFIG_SUNRPC_BACKCHANNEL
 	.bc_setup		= xprt_setup_bc,
 	.bc_up			= xs_tcp_bc_up,
+	.bc_maxpayload		= xs_tcp_bc_maxpayload,
 	.bc_free_rqst		= xprt_free_bc_rqst,
 	.bc_destroy		= xprt_destroy_bc,
 #endif