summary refs log tree commit diff
path: root/fs/nfs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2020-04-07 13:51:39 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2020-04-07 13:51:39 -0700
commit04de788e61a576820baf03ff8accc246ca146cb3 (patch)
treece7398fd61364c78b2a353729f8a92d77d02c076 /fs/nfs
parentf40f31cadc0ea5dcdd224c8b324add26469c2379 (diff)
parent93ce4af774bc3d8a72ce2271d03241c96383629d (diff)
downloadlinux-04de788e61a576820baf03ff8accc246ca146cb3.tar.gz
Merge tag 'nfs-for-5.7-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Stable fixes:
   - Fix a page leak in nfs_destroy_unlinked_subrequests()

   - Fix use-after-free issues in nfs_pageio_add_request()

   - Fix new mount code constant_table array definitions

   - finish_automount() requires us to hold 2 refs to the mount record

  Features:
   - Improve the accuracy of telldir/seekdir by using 64-bit cookies
     when possible.

   - Allow one RDMA active connection and several zombie connections to
     prevent blocking if the remote server is unresponsive.

   - Limit the size of the NFS access cache by default

   - Reduce the number of references to credentials that are taken by
     NFS

   - pNFS files and flexfiles drivers now support per-layout segment
     COMMIT lists.

   - Enable partial-file layout segments in the pNFS/flexfiles driver.

   - Add support for CB_RECALL_ANY to the pNFS flexfiles layout type

   - pNFS/flexfiles Report NFS4ERR_DELAY and NFS4ERR_GRACE errors from
     the DS using the layouterror mechanism.

  Bugfixes and cleanups:
   - SUNRPC: Fix krb5p regressions

   - Don't specify NFS version in "UDP not supported" error

   - nfsroot: set tcp as the default transport protocol

   - pnfs: Return valid stateids in nfs_layout_find_inode_by_stateid()

   - alloc_nfs_open_context() must use the file cred when available

   - Fix locking when dereferencing the delegation cred

   - Fix memory leaks in O_DIRECT when nfs_get_lock_context() fails

   - Various clean ups of the NFS O_DIRECT commit code

   - Clean up RDMA connect/disconnect

   - Replace zero-length arrays with C99-style flexible arrays"

* tag 'nfs-for-5.7-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (86 commits)
  NFS: Clean up process of marking inode stale.
  SUNRPC: Don't start a timer on an already queued rpc task
  NFS/pnfs: Reference the layout cred in pnfs_prepare_layoutreturn()
  NFS/pnfs: Fix dereference of layout cred in pnfs_layoutcommit_inode()
  NFS: Beware when dereferencing the delegation cred
  NFS: Add a module parameter to set nfs_mountpoint_expiry_timeout
  NFS: finish_automount() requires us to hold 2 refs to the mount record
  NFS: Fix a few constant_table array definitions
  NFS: Try to join page groups before an O_DIRECT retransmission
  NFS: Refactor nfs_lock_and_join_requests()
  NFS: Reverse the submission order of requests in __nfs_pageio_add_request()
  NFS: Clean up nfs_lock_and_join_requests()
  NFS: Remove the redundant function nfs_pgio_has_mirroring()
  NFS: Fix memory leaks in nfs_pageio_stop_mirroring()
  NFS: Fix a request reference leak in nfs_direct_write_clear_reqs()
  NFS: Fix use-after-free issues in nfs_pageio_add_request()
  NFS: Fix races nfs_page_group_destroy() vs nfs_destroy_unlinked_subrequests()
  NFS: Fix a page leak in nfs_destroy_unlinked_subrequests()
  NFS: Remove unused FLUSH_SYNC support in nfs_initiate_pgio()
  pNFS/flexfiles: Specify the layout segment range in LAYOUTGET
  ...
Diffstat (limited to 'fs/nfs')
-rw-r--r--fs/nfs/blocklayout/blocklayout.c2
-rw-r--r--fs/nfs/callback.h4
-rw-r--r--fs/nfs/callback_proc.c69
-rw-r--r--fs/nfs/delegation.c319
-rw-r--r--fs/nfs/dir.c79
-rw-r--r--fs/nfs/direct.c197
-rw-r--r--fs/nfs/filelayout/filelayout.c165
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.c229
-rw-r--r--fs/nfs/flexfilelayout/flexfilelayout.h2
-rw-r--r--fs/nfs/fs_context.c9
-rw-r--r--fs/nfs/inode.c28
-rw-r--r--fs/nfs/internal.h36
-rw-r--r--fs/nfs/namespace.c67
-rw-r--r--fs/nfs/nfs4_fs.h4
-rw-r--r--fs/nfs/nfs4file.c3
-rw-r--r--fs/nfs/nfs4namespace.c2
-rw-r--r--fs/nfs/nfs4proc.c19
-rw-r--r--fs/nfs/nfs4state.c24
-rw-r--r--fs/nfs/nfs4trace.h8
-rw-r--r--fs/nfs/nfsroot.c2
-rw-r--r--fs/nfs/nfstrace.h1
-rw-r--r--fs/nfs/pagelist.c367
-rw-r--r--fs/nfs/pnfs.c241
-rw-r--r--fs/nfs/pnfs.h143
-rw-r--r--fs/nfs/pnfs_nfs.c514
-rw-r--r--fs/nfs/read.c2
-rw-r--r--fs/nfs/super.c35
-rw-r--r--fs/nfs/unlink.c4
-rw-r--r--fs/nfs/write.c276
29 files changed, 1717 insertions, 1134 deletions
diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c
index 690221747b47..d1a0e2c8b1b4 100644
--- a/fs/nfs/blocklayout/blocklayout.c
+++ b/fs/nfs/blocklayout/blocklayout.c
@@ -476,7 +476,7 @@ static void bl_free_layout_hdr(struct pnfs_layout_hdr *lo)
 	err = ext_tree_remove(bl, true, 0, LLONG_MAX);
 	WARN_ON(err);
 
-	kfree(bl);
+	kfree_rcu(bl, bl_layout.plh_rcu);
 }
 
 static struct pnfs_layout_hdr *__bl_alloc_layout_hdr(struct inode *inode,
diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
index 549350259840..6a2033131c06 100644
--- a/fs/nfs/callback.h
+++ b/fs/nfs/callback.h
@@ -127,7 +127,9 @@ extern __be32 nfs4_callback_sequence(void *argp, void *resp,
 #define RCA4_TYPE_MASK_OBJ_LAYOUT_MAX  9
 #define RCA4_TYPE_MASK_OTHER_LAYOUT_MIN 12
 #define RCA4_TYPE_MASK_OTHER_LAYOUT_MAX 15
-#define RCA4_TYPE_MASK_ALL 0xf31f
+#define PNFS_FF_RCA4_TYPE_MASK_READ 16
+#define PNFS_FF_RCA4_TYPE_MASK_RW 17
+#define RCA4_TYPE_MASK_ALL 0x3f31f
 
 struct cb_recallanyargs {
 	uint32_t	craa_objs_to_keep;
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index cd4c6bc81cae..e61dbc9b86ae 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -121,31 +121,31 @@ out:
  */
 static struct inode *nfs_layout_find_inode_by_stateid(struct nfs_client *clp,
 		const nfs4_stateid *stateid)
+	__must_hold(RCU)
 {
 	struct nfs_server *server;
 	struct inode *inode;
 	struct pnfs_layout_hdr *lo;
 
+	rcu_read_lock();
 	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
-		list_for_each_entry(lo, &server->layouts, plh_layouts) {
+		list_for_each_entry_rcu(lo, &server->layouts, plh_layouts) {
+			if (!pnfs_layout_is_valid(lo))
+				continue;
 			if (stateid != NULL &&
 			    !nfs4_stateid_match_other(stateid, &lo->plh_stateid))
 				continue;
+			if (!nfs_sb_active(server->super))
+				continue;
 			inode = igrab(lo->plh_inode);
-			if (!inode)
-				return ERR_PTR(-EAGAIN);
-			if (!nfs_sb_active(inode->i_sb)) {
-				rcu_read_unlock();
-				spin_unlock(&clp->cl_lock);
-				iput(inode);
-				spin_lock(&clp->cl_lock);
-				rcu_read_lock();
-				return ERR_PTR(-EAGAIN);
-			}
-			return inode;
+			rcu_read_unlock();
+			if (inode)
+				return inode;
+			nfs_sb_deactive(server->super);
+			return ERR_PTR(-EAGAIN);
 		}
 	}
-
+	rcu_read_unlock();
 	return ERR_PTR(-ENOENT);
 }
 
@@ -163,28 +163,25 @@ static struct inode *nfs_layout_find_inode_by_fh(struct nfs_client *clp,
 	struct inode *inode;
 	struct pnfs_layout_hdr *lo;
 
+	rcu_read_lock();
 	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
-		list_for_each_entry(lo, &server->layouts, plh_layouts) {
+		list_for_each_entry_rcu(lo, &server->layouts, plh_layouts) {
 			nfsi = NFS_I(lo->plh_inode);
 			if (nfs_compare_fh(fh, &nfsi->fh))
 				continue;
 			if (nfsi->layout != lo)
 				continue;
+			if (!nfs_sb_active(server->super))
+				continue;
 			inode = igrab(lo->plh_inode);
-			if (!inode)
-				return ERR_PTR(-EAGAIN);
-			if (!nfs_sb_active(inode->i_sb)) {
-				rcu_read_unlock();
-				spin_unlock(&clp->cl_lock);
-				iput(inode);
-				spin_lock(&clp->cl_lock);
-				rcu_read_lock();
-				return ERR_PTR(-EAGAIN);
-			}
-			return inode;
+			rcu_read_unlock();
+			if (inode)
+				return inode;
+			nfs_sb_deactive(server->super);
+			return ERR_PTR(-EAGAIN);
 		}
 	}
-
+	rcu_read_unlock();
 	return ERR_PTR(-ENOENT);
 }
 
@@ -194,14 +191,9 @@ static struct inode *nfs_layout_find_inode(struct nfs_client *clp,
 {
 	struct inode *inode;
 
-	spin_lock(&clp->cl_lock);
-	rcu_read_lock();
 	inode = nfs_layout_find_inode_by_stateid(clp, stateid);
 	if (inode == ERR_PTR(-ENOENT))
 		inode = nfs_layout_find_inode_by_fh(clp, fh);
-	rcu_read_unlock();
-	spin_unlock(&clp->cl_lock);
-
 	return inode;
 }
 
@@ -280,7 +272,7 @@ static u32 initiate_file_draining(struct nfs_client *clp,
 		goto unlock;
 	}
 
-	pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
+	pnfs_set_layout_stateid(lo, &args->cbl_stateid, NULL, true);
 	switch (pnfs_mark_matching_lsegs_return(lo, &free_me_list,
 				&args->cbl_range,
 				be32_to_cpu(args->cbl_stateid.seqid))) {
@@ -605,6 +597,7 @@ __be32 nfs4_callback_recallany(void *argp, void *resp,
 	struct cb_recallanyargs *args = argp;
 	__be32 status;
 	fmode_t flags = 0;
+	bool schedule_manager = false;
 
 	status = cpu_to_be32(NFS4ERR_OP_NOT_IN_SESSION);
 	if (!cps->clp) /* set in cb_sequence */
@@ -627,6 +620,18 @@ __be32 nfs4_callback_recallany(void *argp, void *resp,
 
 	if (args->craa_type_mask & BIT(RCA4_TYPE_MASK_FILE_LAYOUT))
 		pnfs_recall_all_layouts(cps->clp);
+
+	if (args->craa_type_mask & BIT(PNFS_FF_RCA4_TYPE_MASK_READ)) {
+		set_bit(NFS4CLNT_RECALL_ANY_LAYOUT_READ, &cps->clp->cl_state);
+		schedule_manager = true;
+	}
+	if (args->craa_type_mask & BIT(PNFS_FF_RCA4_TYPE_MASK_RW)) {
+		set_bit(NFS4CLNT_RECALL_ANY_LAYOUT_RW, &cps->clp->cl_state);
+		schedule_manager = true;
+	}
+	if (schedule_manager)
+		nfs4_schedule_state_manager(cps->clp);
+
 out:
 	dprintk("%s: exit with status = %d\n", __func__, ntohl(status));
 	return status;
diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c
index 1865322de142..816e1427f17e 100644
--- a/fs/nfs/delegation.c
+++ b/fs/nfs/delegation.c
@@ -378,6 +378,18 @@ nfs_inode_detach_delegation(struct inode *inode)
 }
 
 static void
+nfs_update_delegation_cred(struct nfs_delegation *delegation,
+		const struct cred *cred)
+{
+	const struct cred *old;
+
+	if (cred_fscmp(delegation->cred, cred) != 0) {
+		old = xchg(&delegation->cred, get_cred(cred));
+		put_cred(old);
+	}
+}
+
+static void
 nfs_update_inplace_delegation(struct nfs_delegation *delegation,
 		const struct nfs_delegation *update)
 {
@@ -385,8 +397,14 @@ nfs_update_inplace_delegation(struct nfs_delegation *delegation,
 		delegation->stateid.seqid = update->stateid.seqid;
 		smp_wmb();
 		delegation->type = update->type;
-		if (test_and_clear_bit(NFS_DELEGATION_REVOKED, &delegation->flags))
+		delegation->pagemod_limit = update->pagemod_limit;
+		if (test_bit(NFS_DELEGATION_REVOKED, &delegation->flags)) {
+			delegation->change_attr = update->change_attr;
+			nfs_update_delegation_cred(delegation, update->cred);
+			/* smp_mb__before_atomic() is implicit due to xchg() */
+			clear_bit(NFS_DELEGATION_REVOKED, &delegation->flags);
 			atomic_long_inc(&nfs_active_delegations);
+		}
 	}
 }
 
@@ -545,21 +563,11 @@ static bool nfs_delegation_need_return(struct nfs_delegation *delegation)
 	return ret;
 }
 
-/**
- * nfs_client_return_marked_delegations - return previously marked delegations
- * @clp: nfs_client to process
- *
- * Note that this function is designed to be called by the state
- * manager thread. For this reason, it cannot flush the dirty data,
- * since that could deadlock in case of a state recovery error.
- *
- * Returns zero on success, or a negative errno value.
- */
-int nfs_client_return_marked_delegations(struct nfs_client *clp)
+static int nfs_server_return_marked_delegations(struct nfs_server *server,
+		void __always_unused *data)
 {
 	struct nfs_delegation *delegation;
 	struct nfs_delegation *prev;
-	struct nfs_server *server;
 	struct inode *inode;
 	struct inode *place_holder = NULL;
 	struct nfs_delegation *place_holder_deleg = NULL;
@@ -569,78 +577,79 @@ restart:
 	/*
 	 * To avoid quadratic looping we hold a reference
 	 * to an inode place_holder.  Each time we restart, we
-	 * list nfs_servers from the server of that inode, and
-	 * delegation in the server from the delegations of that
-	 * inode.
+	 * list delegation in the server from the delegations
+	 * of that inode.
 	 * prev is an RCU-protected pointer to a delegation which
 	 * wasn't marked for return and might be a good choice for
 	 * the next place_holder.
 	 */
-	rcu_read_lock();
 	prev = NULL;
+	delegation = NULL;
+	rcu_read_lock();
 	if (place_holder)
-		server = NFS_SERVER(place_holder);
-	else
-		server = list_entry_rcu(clp->cl_superblocks.next,
-					struct nfs_server, client_link);
-	list_for_each_entry_from_rcu(server, &clp->cl_superblocks, client_link) {
-		delegation = NULL;
-		if (place_holder && server == NFS_SERVER(place_holder))
-			delegation = rcu_dereference(NFS_I(place_holder)->delegation);
-		if (!delegation || delegation != place_holder_deleg)
-			delegation = list_entry_rcu(server->delegations.next,
-						    struct nfs_delegation, super_list);
-		list_for_each_entry_from_rcu(delegation, &server->delegations, super_list) {
-			struct inode *to_put = NULL;
-
-			if (!nfs_delegation_need_return(delegation)) {
+		delegation = rcu_dereference(NFS_I(place_holder)->delegation);
+	if (!delegation || delegation != place_holder_deleg)
+		delegation = list_entry_rcu(server->delegations.next,
+					    struct nfs_delegation, super_list);
+	list_for_each_entry_from_rcu(delegation, &server->delegations, super_list) {
+		struct inode *to_put = NULL;
+
+		if (test_bit(NFS_DELEGATION_INODE_FREEING, &delegation->flags))
+			continue;
+		if (!nfs_delegation_need_return(delegation)) {
+			if (nfs4_is_valid_delegation(delegation, 0))
 				prev = delegation;
-				continue;
-			}
-			if (!nfs_sb_active(server->super))
-				break; /* continue in outer loop */
-
-			if (prev) {
-				struct inode *tmp;
-
-				tmp = nfs_delegation_grab_inode(prev);
-				if (tmp) {
-					to_put = place_holder;
-					place_holder = tmp;
-					place_holder_deleg = prev;
-				}
-			}
+			continue;
+		}
 
-			inode = nfs_delegation_grab_inode(delegation);
-			if (inode == NULL) {
-				rcu_read_unlock();
-				if (to_put)
-					iput(to_put);
-				nfs_sb_deactive(server->super);
-				goto restart;
+		if (prev) {
+			struct inode *tmp = nfs_delegation_grab_inode(prev);
+			if (tmp) {
+				to_put = place_holder;
+				place_holder = tmp;
+				place_holder_deleg = prev;
 			}
-			delegation = nfs_start_delegation_return_locked(NFS_I(inode));
+		}
+
+		inode = nfs_delegation_grab_inode(delegation);
+		if (inode == NULL) {
 			rcu_read_unlock();
+			iput(to_put);
+			goto restart;
+		}
+		delegation = nfs_start_delegation_return_locked(NFS_I(inode));
+		rcu_read_unlock();
 
-			if (to_put)
-				iput(to_put);
+		iput(to_put);
 
-			err = nfs_end_delegation_return(inode, delegation, 0);
-			iput(inode);
-			nfs_sb_deactive(server->super);
-			cond_resched();
-			if (!err)
-				goto restart;
-			set_bit(NFS4CLNT_DELEGRETURN, &clp->cl_state);
-			if (place_holder)
-				iput(place_holder);
-			return err;
-		}
+		err = nfs_end_delegation_return(inode, delegation, 0);
+		iput(inode);
+		cond_resched();
+		if (!err)
+			goto restart;
+		set_bit(NFS4CLNT_DELEGRETURN, &server->nfs_client->cl_state);
+		goto out;
 	}
 	rcu_read_unlock();
-	if (place_holder)
-		iput(place_holder);
-	return 0;
+out:
+	iput(place_holder);
+	return err;
+}
+
+/**
+ * nfs_client_return_marked_delegations - return previously marked delegations
+ * @clp: nfs_client to process
+ *
+ * Note that this function is designed to be called by the state
+ * manager thread. For this reason, it cannot flush the dirty data,
+ * since that could deadlock in case of a state recovery error.
+ *
+ * Returns zero on success, or a negative errno value.
+ */
+int nfs_client_return_marked_delegations(struct nfs_client *clp)
+{
+	return nfs_client_for_each_server(clp,
+			nfs_server_return_marked_delegations, NULL);
 }
 
 /**
@@ -1083,53 +1092,51 @@ void nfs_delegation_mark_reclaim(struct nfs_client *clp)
 	rcu_read_unlock();
 }
 
-/**
- * nfs_delegation_reap_unclaimed - reap unclaimed delegations after reboot recovery is done
- * @clp: nfs_client to process
- *
- */
-void nfs_delegation_reap_unclaimed(struct nfs_client *clp)
+static int nfs_server_reap_unclaimed_delegations(struct nfs_server *server,
+		void __always_unused *data)
 {
 	struct nfs_delegation *delegation;
-	struct nfs_server *server;
 	struct inode *inode;
-
 restart:
 	rcu_read_lock();
-	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
-		list_for_each_entry_rcu(delegation, &server->delegations,
-								super_list) {
-			if (test_bit(NFS_DELEGATION_INODE_FREEING,
-						&delegation->flags) ||
-			    test_bit(NFS_DELEGATION_RETURNING,
-						&delegation->flags) ||
-			    test_bit(NFS_DELEGATION_NEED_RECLAIM,
-						&delegation->flags) == 0)
-				continue;
-			if (!nfs_sb_active(server->super))
-				break; /* continue in outer loop */
-			inode = nfs_delegation_grab_inode(delegation);
-			if (inode == NULL) {
-				rcu_read_unlock();
-				nfs_sb_deactive(server->super);
-				goto restart;
-			}
-			delegation = nfs_start_delegation_return_locked(NFS_I(inode));
-			rcu_read_unlock();
-			if (delegation != NULL) {
-				if (nfs_detach_delegation(NFS_I(inode), delegation,
-							server) != NULL)
-					nfs_free_delegation(delegation);
-				/* Match nfs_start_delegation_return_locked */
-				nfs_put_delegation(delegation);
-			}
-			iput(inode);
-			nfs_sb_deactive(server->super);
-			cond_resched();
-			goto restart;
+restart_locked:
+	list_for_each_entry_rcu(delegation, &server->delegations, super_list) {
+		if (test_bit(NFS_DELEGATION_INODE_FREEING,
+					&delegation->flags) ||
+		    test_bit(NFS_DELEGATION_RETURNING,
+					&delegation->flags) ||
+		    test_bit(NFS_DELEGATION_NEED_RECLAIM,
+					&delegation->flags) == 0)
+			continue;
+		inode = nfs_delegation_grab_inode(delegation);
+		if (inode == NULL)
+			goto restart_locked;
+		delegation = nfs_start_delegation_return_locked(NFS_I(inode));
+		rcu_read_unlock();
+		if (delegation != NULL) {
+			if (nfs_detach_delegation(NFS_I(inode), delegation,
+						server) != NULL)
+				nfs_free_delegation(delegation);
+			/* Match nfs_start_delegation_return_locked */
+			nfs_put_delegation(delegation);
 		}
+		iput(inode);
+		cond_resched();
+		goto restart;
 	}
 	rcu_read_unlock();
+	return 0;
+}
+
+/**
+ * nfs_delegation_reap_unclaimed - reap unclaimed delegations after reboot recovery is done
+ * @clp: nfs_client to process
+ *
+ */
+void nfs_delegation_reap_unclaimed(struct nfs_client *clp)
+{
+	nfs_client_for_each_server(clp, nfs_server_reap_unclaimed_delegations,
+			NULL);
 }
 
 static inline bool nfs4_server_rebooted(const struct nfs_client *clp)
@@ -1215,62 +1222,61 @@ nfs_delegation_test_free_expired(struct inode *inode,
 		nfs_remove_bad_delegation(inode, stateid);
 }
 
-/**
- * nfs_reap_expired_delegations - reap expired delegations
- * @clp: nfs_client to process
- *
- * Iterates through all the delegations associated with this server and
- * checks if they have may have been revoked. This function is usually
- * expected to be called in cases where the server may have lost its
- * lease.
- */
-void nfs_reap_expired_delegations(struct nfs_client *clp)
+static int nfs_server_reap_expired_delegations(struct nfs_server *server,
+		void __always_unused *data)
 {
 	struct nfs_delegation *delegation;
-	struct nfs_server *server;
 	struct inode *inode;
 	const struct cred *cred;
 	nfs4_stateid stateid;
-
 restart:
 	rcu_read_lock();
-	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
-		list_for_each_entry_rcu(delegation, &server->delegations,
-								super_list) {
-			if (test_bit(NFS_DELEGATION_INODE_FREEING,
-						&delegation->flags) ||
-			    test_bit(NFS_DELEGATION_RETURNING,
-						&delegation->flags) ||
-			    test_bit(NFS_DELEGATION_TEST_EXPIRED,
-						&delegation->flags) == 0)
-				continue;
-			if (!nfs_sb_active(server->super))
-				break; /* continue in outer loop */
-			inode = nfs_delegation_grab_inode(delegation);
-			if (inode == NULL) {
-				rcu_read_unlock();
-				nfs_sb_deactive(server->super);
-				goto restart;
-			}
-			cred = get_cred_rcu(delegation->cred);
-			nfs4_stateid_copy(&stateid, &delegation->stateid);
-			clear_bit(NFS_DELEGATION_TEST_EXPIRED, &delegation->flags);
-			rcu_read_unlock();
-			nfs_delegation_test_free_expired(inode, &stateid, cred);
-			put_cred(cred);
-			if (nfs4_server_rebooted(clp)) {
-				nfs_inode_mark_test_expired_delegation(server,inode);
-				iput(inode);
-				nfs_sb_deactive(server->super);
-				return;
-			}
+restart_locked:
+	list_for_each_entry_rcu(delegation, &server->delegations, super_list) {
+		if (test_bit(NFS_DELEGATION_INODE_FREEING,
+					&delegation->flags) ||
+		    test_bit(NFS_DELEGATION_RETURNING,
+					&delegation->flags) ||
+		    test_bit(NFS_DELEGATION_TEST_EXPIRED,
+					&delegation->flags) == 0)
+			continue;
+		inode = nfs_delegation_grab_inode(delegation);
+		if (inode == NULL)
+			goto restart_locked;
+		spin_lock(&delegation->lock);
+		cred = get_cred_rcu(delegation->cred);
+		nfs4_stateid_copy(&stateid, &delegation->stateid);
+		spin_unlock(&delegation->lock);
+		clear_bit(NFS_DELEGATION_TEST_EXPIRED, &delegation->flags);
+		rcu_read_unlock();
+		nfs_delegation_test_free_expired(inode, &stateid, cred);
+		put_cred(cred);
+		if (!nfs4_server_rebooted(server->nfs_client)) {
 			iput(inode);
-			nfs_sb_deactive(server->super);
 			cond_resched();
 			goto restart;
 		}
+		nfs_inode_mark_test_expired_delegation(server,inode);
+		iput(inode);
+		return -EAGAIN;
 	}
 	rcu_read_unlock();
+	return 0;
+}
+
+/**
+ * nfs_reap_expired_delegations - reap expired delegations
+ * @clp: nfs_client to process
+ *
+ * Iterates through all the delegations associated with this server and
+ * checks if they have may have been revoked. This function is usually
+ * expected to be called in cases where the server may have lost its
+ * lease.
+ */
+void nfs_reap_expired_delegations(struct nfs_client *clp)
+{
+	nfs_client_for_each_server(clp, nfs_server_reap_expired_delegations,
+			NULL);
 }
 
 void nfs_inode_find_delegation_state_and_recover(struct inode *inode,
@@ -1359,11 +1365,14 @@ bool nfs4_copy_delegation_stateid(struct inode *inode, fmode_t flags,
 {
 	struct nfs_inode *nfsi = NFS_I(inode);
 	struct nfs_delegation *delegation;
-	bool ret;
+	bool ret = false;
 
 	flags &= FMODE_READ|FMODE_WRITE;
 	rcu_read_lock();
 	delegation = rcu_dereference(nfsi->delegation);
+	if (!delegation)
+		goto out;
+	spin_lock(&delegation->lock);
 	ret = nfs4_is_valid_delegation(delegation, flags);
 	if (ret) {
 		nfs4_stateid_copy(dst, &delegation->stateid);
@@ -1371,6 +1380,8 @@ bool nfs4_copy_delegation_stateid(struct inode *inode, fmode_t flags,
 		if (cred)
 			*cred = get_cred(delegation->cred);
 	}
+	spin_unlock(&delegation->lock);
+out:
 	rcu_read_unlock();
 	return ret;
 }
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index d4b839b6cf89..5a331da5f55a 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -141,10 +141,9 @@ struct nfs_cache_array {
 	int size;
 	int eof_index;
 	u64 last_cookie;
-	struct nfs_cache_array_entry array[0];
+	struct nfs_cache_array_entry array[];
 };
 
-typedef int (*decode_dirent_t)(struct xdr_stream *, struct nfs_entry *, bool);
 typedef struct {
 	struct file	*file;
 	struct page	*page;
@@ -153,7 +152,7 @@ typedef struct {
 	u64		*dir_cookie;
 	u64		last_cookie;
 	loff_t		current_index;
-	decode_dirent_t	decode;
+	loff_t		prev_index;
 
 	unsigned long	dir_verifier;
 	unsigned long	timestamp;
@@ -240,6 +239,25 @@ out:
 	return ret;
 }
 
+static inline
+int is_32bit_api(void)
+{
+#ifdef CONFIG_COMPAT
+	return in_compat_syscall();
+#else
+	return (BITS_PER_LONG == 32);
+#endif
+}
+
+static
+bool nfs_readdir_use_cookie(const struct file *filp)
+{
+	if ((filp->f_mode & FMODE_32BITHASH) ||
+	    (!(filp->f_mode & FMODE_64BITHASH) && is_32bit_api()))
+		return false;
+	return true;
+}
+
 static
 int nfs_readdir_search_for_pos(struct nfs_cache_array *array, nfs_readdir_descriptor_t *desc)
 {
@@ -289,7 +307,7 @@ int nfs_readdir_search_for_cookie(struct nfs_cache_array *array, nfs_readdir_des
 			    !nfs_readdir_inode_mapping_valid(nfsi)) {
 				ctx->duped = 0;
 				ctx->attr_gencount = nfsi->attr_gencount;
-			} else if (new_pos < desc->ctx->pos) {
+			} else if (new_pos < desc->prev_index) {
 				if (ctx->duped > 0
 				    && ctx->dup_cookie == *desc->dir_cookie) {
 					if (printk_ratelimit()) {
@@ -305,7 +323,11 @@ int nfs_readdir_search_for_cookie(struct nfs_cache_array *array, nfs_readdir_des
 				ctx->dup_cookie = *desc->dir_cookie;
 				ctx->duped = -1;
 			}
-			desc->ctx->pos = new_pos;
+			if (nfs_readdir_use_cookie(desc->file))
+				desc->ctx->pos = *desc->dir_cookie;
+			else
+				desc->ctx->pos = new_pos;
+			desc->prev_index = new_pos;
 			desc->cache_entry_index = i;
 			return 0;
 		}
@@ -376,9 +398,10 @@ error:
 static int xdr_decode(nfs_readdir_descriptor_t *desc,
 		      struct nfs_entry *entry, struct xdr_stream *xdr)
 {
+	struct inode *inode = file_inode(desc->file);
 	int error;
 
-	error = desc->decode(xdr, entry, desc->plus);
+	error = NFS_PROTO(inode)->decode_dirent(xdr, entry, desc->plus);
 	if (error)
 		return error;
 	entry->fattr->time_start = desc->timestamp;
@@ -756,6 +779,7 @@ int readdir_search_pagecache(nfs_readdir_descriptor_t *desc)
 
 	if (desc->page_index == 0) {
 		desc->current_index = 0;
+		desc->prev_index = 0;
 		desc->last_cookie = 0;
 	}
 	do {
@@ -786,11 +810,14 @@ int nfs_do_filldir(nfs_readdir_descriptor_t *desc)
 			desc->eof = true;
 			break;
 		}
-		desc->ctx->pos++;
 		if (i < (array->size-1))
 			*desc->dir_cookie = array->array[i+1].cookie;
 		else
 			*desc->dir_cookie = array->last_cookie;
+		if (nfs_readdir_use_cookie(file))
+			desc->ctx->pos = *desc->dir_cookie;
+		else
+			desc->ctx->pos++;
 		if (ctx->duped != 0)
 			ctx->duped = 1;
 	}
@@ -860,9 +887,14 @@ static int nfs_readdir(struct file *file, struct dir_context *ctx)
 {
 	struct dentry	*dentry = file_dentry(file);
 	struct inode	*inode = d_inode(dentry);
-	nfs_readdir_descriptor_t my_desc,
-			*desc = &my_desc;
 	struct nfs_open_dir_context *dir_ctx = file->private_data;
+	nfs_readdir_descriptor_t my_desc = {
+		.file = file,
+		.ctx = ctx,
+		.dir_cookie = &dir_ctx->dir_cookie,
+		.plus = nfs_use_readdirplus(inode, ctx),
+	},
+			*desc = &my_desc;
 	int res = 0;
 
 	dfprintk(FILE, "NFS: readdir(%pD2) starting at cookie %llu\n",
@@ -875,14 +907,6 @@ static int nfs_readdir(struct file *file, struct dir_context *ctx)
 	 * to either find the entry with the appropriate number or
 	 * revalidate the cookie.
 	 */
-	memset(desc, 0, sizeof(*desc));
-
-	desc->file = file;
-	desc->ctx = ctx;
-	desc->dir_cookie = &dir_ctx->dir_cookie;
-	desc->decode = NFS_PROTO(inode)->decode_dirent;
-	desc->plus = nfs_use_readdirplus(inode, ctx);
-
 	if (ctx->pos == 0 || nfs_attribute_cache_expired(inode))
 		res = nfs_revalidate_mapping(inode, file->f_mapping);
 	if (res < 0)
@@ -954,7 +978,10 @@ static loff_t nfs_llseek_dir(struct file *filp, loff_t offset, int whence)
 	}
 	if (offset != filp->f_pos) {
 		filp->f_pos = offset;
-		dir_ctx->dir_cookie = 0;
+		if (nfs_readdir_use_cookie(filp))
+			dir_ctx->dir_cookie = offset;
+		else
+			dir_ctx->dir_cookie = 0;
 		dir_ctx->duped = 0;
 	}
 	inode_unlock(inode);
@@ -2282,7 +2309,7 @@ static DEFINE_SPINLOCK(nfs_access_lru_lock);
 static LIST_HEAD(nfs_access_lru_list);
 static atomic_long_t nfs_access_nr_entries;
 
-static unsigned long nfs_access_max_cachesize = ULONG_MAX;
+static unsigned long nfs_access_max_cachesize = 4*1024*1024;
 module_param(nfs_access_max_cachesize, ulong, 0644);
 MODULE_PARM_DESC(nfs_access_max_cachesize, "NFS access maximum total cache length");
 
@@ -2642,9 +2669,10 @@ static int nfs_do_access(struct inode *inode, const struct cred *cred, int mask)
 	status = NFS_PROTO(inode)->access(inode, &cache);
 	if (status != 0) {
 		if (status == -ESTALE) {
-			nfs_zap_caches(inode);
 			if (!S_ISDIR(inode->i_mode))
-				set_bit(NFS_INO_STALE, &NFS_I(inode)->flags);
+				nfs_set_inode_stale(inode);
+			else
+				nfs_zap_caches(inode);
 		}
 		goto out;
 	}
@@ -2732,14 +2760,7 @@ force_lookup:
 	if (!NFS_PROTO(inode)->access)
 		goto out_notsup;
 
-	/* Always try fast lookups first */
-	rcu_read_lock();
-	res = nfs_do_access(inode, cred, mask|MAY_NOT_BLOCK);
-	rcu_read_unlock();
-	if (res == -ECHILD && !(mask & MAY_NOT_BLOCK)) {
-		/* Fast lookup failed, try the slow way */
-		res = nfs_do_access(inode, cred, mask);
-	}
+	res = nfs_do_access(inode, cred, mask);
 out:
 	if (!res && (mask & MAY_EXEC))
 		res = nfs_execute_ok(inode, mask);
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index b768a0b42e82..a57e7c72c7f4 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -94,7 +94,7 @@ struct nfs_direct_req {
 #define NFS_ODIRECT_RESCHED_WRITES	(2)	/* write verification failed */
 	/* for read */
 #define NFS_ODIRECT_SHOULD_DIRTY	(3)	/* dirty user-space page after read */
-	struct nfs_writeverf	verf;		/* unstable write verifier */
+#define NFS_ODIRECT_DONE		INT_MAX	/* write verification failed */
 };
 
 static const struct nfs_pgio_completion_ops nfs_direct_write_completion_ops;
@@ -151,106 +151,6 @@ nfs_direct_count_bytes(struct nfs_direct_req *dreq,
 		dreq->count = dreq_len;
 }
 
-/*
- * nfs_direct_select_verf - select the right verifier
- * @dreq - direct request possibly spanning multiple servers
- * @ds_clp - nfs_client of data server or NULL if MDS / non-pnfs
- * @commit_idx - commit bucket index for the DS
- *
- * returns the correct verifier to use given the role of the server
- */
-static struct nfs_writeverf *
-nfs_direct_select_verf(struct nfs_direct_req *dreq,
-		       struct nfs_client *ds_clp,
-		       int commit_idx)
-{
-	struct nfs_writeverf *verfp = &dreq->verf;
-
-#ifdef CONFIG_NFS_V4_1
-	/*
-	 * pNFS is in use, use the DS verf except commit_through_mds is set
-	 * for layout segment where nbuckets is zero.
-	 */
-	if (ds_clp && dreq->ds_cinfo.nbuckets > 0) {
-		if (commit_idx >= 0 && commit_idx < dreq->ds_cinfo.nbuckets)
-			verfp = &dreq->ds_cinfo.buckets[commit_idx].direct_verf;
-		else
-			WARN_ON_ONCE(1);
-	}
-#endif
-	return verfp;
-}
-
-
-/*
- * nfs_direct_set_hdr_verf - set the write/commit verifier
- * @dreq - direct request possibly spanning multiple servers
- * @hdr - pageio header to validate against previously seen verfs
- *
- * Set the server's (MDS or DS) "seen" verifier
- */
-static void nfs_direct_set_hdr_verf(struct nfs_direct_req *dreq,
-				    struct nfs_pgio_header *hdr)
-{
-	struct nfs_writeverf *verfp;
-
-	verfp = nfs_direct_select_verf(dreq, hdr->ds_clp, hdr->ds_commit_idx);
-	WARN_ON_ONCE(verfp->committed >= 0);
-	memcpy(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
-	WARN_ON_ONCE(verfp->committed < 0);
-}
-
-static int nfs_direct_cmp_verf(const struct nfs_writeverf *v1,
-		const struct nfs_writeverf *v2)
-{
-	return nfs_write_verifier_cmp(&v1->verifier, &v2->verifier);
-}
-
-/*
- * nfs_direct_cmp_hdr_verf - compare verifier for pgio header
- * @dreq - direct request possibly spanning multiple servers
- * @hdr - pageio header to validate against previously seen verf
- *
- * set the server's "seen" verf if not initialized.
- * returns result of comparison between @hdr->verf and the "seen"
- * verf of the server used by @hdr (DS or MDS)
- */
-static int nfs_direct_set_or_cmp_hdr_verf(struct nfs_direct_req *dreq,
-					  struct nfs_pgio_header *hdr)
-{
-	struct nfs_writeverf *verfp;
-
-	verfp = nfs_direct_select_verf(dreq, hdr->ds_clp, hdr->ds_commit_idx);
-	if (verfp->committed < 0) {
-		nfs_direct_set_hdr_verf(dreq, hdr);
-		return 0;
-	}
-	return nfs_direct_cmp_verf(verfp, &hdr->verf);
-}
-
-/*
- * nfs_direct_cmp_commit_data_verf - compare verifier for commit data
- * @dreq - direct request possibly spanning multiple servers
- * @data - commit data to validate against previously seen verf
- *
- * returns result of comparison between @data->verf and the verf of
- * the server used by @data (DS or MDS)
- */
-static int nfs_direct_cmp_commit_data_verf(struct nfs_direct_req *dreq,
-					   struct nfs_commit_data *data)
-{
-	struct nfs_writeverf *verfp;
-
-	verfp = nfs_direct_select_verf(dreq, data->ds_clp,
-					 data->ds_commit_index);
-
-	/* verifier not set so always fail */
-	if (verfp->committed < 0 || data->res.verf->committed <= NFS_UNSTABLE)
-		return 1;
-
-	return nfs_direct_cmp_verf(verfp, data->res.verf);
-}
-
 /**
  * nfs_direct_IO - NFS address space operation for direct I/O
  * @iocb: target I/O control block
@@ -305,7 +205,7 @@ static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
 	kref_get(&dreq->kref);
 	init_completion(&dreq->completion);
 	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
-	dreq->verf.committed = NFS_INVALID_STABLE_HOW;	/* not set yet */
+	pnfs_init_ds_commit_info(&dreq->ds_cinfo);
 	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
 	spin_lock_init(&dreq->lock);
 
@@ -316,7 +216,7 @@ static void nfs_direct_req_free(struct kref *kref)
 {
 	struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
 
-	nfs_free_pnfs_ds_cinfo(&dreq->ds_cinfo);
+	pnfs_release_ds_info(&dreq->ds_cinfo, dreq->inode);
 	if (dreq->l_ctx != NULL)
 		nfs_put_lock_context(dreq->l_ctx);
 	if (dreq->ctx != NULL)
@@ -571,6 +471,7 @@ ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter)
 	l_ctx = nfs_get_lock_context(dreq->ctx);
 	if (IS_ERR(l_ctx)) {
 		result = PTR_ERR(l_ctx);
+		nfs_direct_req_release(dreq);
 		goto out_release;
 	}
 	dreq->l_ctx = l_ctx;
@@ -605,15 +506,30 @@ out:
 }
 
 static void
+nfs_direct_join_group(struct list_head *list, struct inode *inode)
+{
+	struct nfs_page *req, *next;
+
+	list_for_each_entry(req, list, wb_list) {
+		if (req->wb_head != req || req->wb_this_page == req)
+			continue;
+		for (next = req->wb_this_page;
+				next != req->wb_head;
+				next = next->wb_this_page) {
+			nfs_list_remove_request(next);
+			nfs_release_request(next);
+		}
+		nfs_join_page_group(req, inode);
+	}
+}
+
+static void
 nfs_direct_write_scan_commit_list(struct inode *inode,
 				  struct list_head *list,
 				  struct nfs_commit_info *cinfo)
 {
 	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
-#ifdef CONFIG_NFS_V4_1
-	if (cinfo->ds != NULL && cinfo->ds->nwritten != 0)
-		NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo);
-#endif
+	pnfs_recover_commit_reqs(list, cinfo);
 	nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0);
 	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 }
@@ -629,11 +545,12 @@ static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
 	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
 
+	nfs_direct_join_group(&reqs, dreq->inode);
+
 	dreq->count = 0;
 	dreq->max_count = 0;
 	list_for_each_entry(req, &reqs, wb_list)
 		dreq->max_count += req->wb_bytes;
-	dreq->verf.committed = NFS_INVALID_STABLE_HOW;
 	nfs_clear_pnfs_ds_commit_verifiers(&dreq->ds_cinfo);
 	get_dreq(dreq);
 
@@ -670,27 +587,35 @@ static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
 
 static void nfs_direct_commit_complete(struct nfs_commit_data *data)
 {
+	const struct nfs_writeverf *verf = data->res.verf;
 	struct nfs_direct_req *dreq = data->dreq;
 	struct nfs_commit_info cinfo;
 	struct nfs_page *req;
 	int status = data->task.tk_status;
 
+	if (status < 0) {
+		/* Errors in commit are fatal */
+		dreq->error = status;
+		dreq->max_count = 0;
+		dreq->count = 0;
+		dreq->flags = NFS_ODIRECT_DONE;
+	} else if (dreq->flags == NFS_ODIRECT_DONE)
+		status = dreq->error;
+
 	nfs_init_cinfo_from_dreq(&cinfo, dreq);
-	if (status < 0 || nfs_direct_cmp_commit_data_verf(dreq, data))
-		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 
 	while (!list_empty(&data->pages)) {
 		req = nfs_list_entry(data->pages.next);
 		nfs_list_remove_request(req);
-		if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES) {
+		if (status >= 0 && !nfs_write_match_verf(verf, req)) {
+			dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 			/*
 			 * Despite the reboot, the write was successful,
 			 * so reset wb_nio.
 			 */
 			req->wb_nio = 0;
-			/* Note the rewrite will go through mds */
 			nfs_mark_request_commit(req, NULL, &cinfo, 0);
-		} else
+		} else /* Error or match */
 			nfs_release_request(req);
 		nfs_unlock_and_release_request(req);
 	}
@@ -705,7 +630,8 @@ static void nfs_direct_resched_write(struct nfs_commit_info *cinfo,
 	struct nfs_direct_req *dreq = cinfo->dreq;
 
 	spin_lock(&dreq->lock);
-	dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
+	if (dreq->flags != NFS_ODIRECT_DONE)
+		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 	spin_unlock(&dreq->lock);
 	nfs_mark_request_commit(req, NULL, cinfo, 0);
 }
@@ -728,6 +654,23 @@ static void nfs_direct_commit_schedule(struct nfs_direct_req *dreq)
 		nfs_direct_write_reschedule(dreq);
 }
 
+static void nfs_direct_write_clear_reqs(struct nfs_direct_req *dreq)
+{
+	struct nfs_commit_info cinfo;
+	struct nfs_page *req;
+	LIST_HEAD(reqs);
+
+	nfs_init_cinfo_from_dreq(&cinfo, dreq);
+	nfs_direct_write_scan_commit_list(dreq->inode, &reqs, &cinfo);
+
+	while (!list_empty(&reqs)) {
+		req = nfs_list_entry(reqs.next);
+		nfs_list_remove_request(req);
+		nfs_release_request(req);
+		nfs_unlock_and_release_request(req);
+	}
+}
+
 static void nfs_direct_write_schedule_work(struct work_struct *work)
 {
 	struct nfs_direct_req *dreq = container_of(work, struct nfs_direct_req, work);
@@ -742,6 +685,7 @@ static void nfs_direct_write_schedule_work(struct work_struct *work)
 			nfs_direct_write_reschedule(dreq);
 			break;
 		default:
+			nfs_direct_write_clear_reqs(dreq);
 			nfs_zap_mapping(dreq->inode, dreq->inode->i_mapping);
 			nfs_direct_complete(dreq);
 	}
@@ -768,20 +712,15 @@ static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
 	}
 
 	nfs_direct_count_bytes(dreq, hdr);
-	if (hdr->good_bytes != 0) {
-		if (nfs_write_need_commit(hdr)) {
-			if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
-				request_commit = true;
-			else if (dreq->flags == 0) {
-				nfs_direct_set_hdr_verf(dreq, hdr);
-				request_commit = true;
-				dreq->flags = NFS_ODIRECT_DO_COMMIT;
-			} else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
-				request_commit = true;
-				if (nfs_direct_set_or_cmp_hdr_verf(dreq, hdr))
-					dreq->flags =
-						NFS_ODIRECT_RESCHED_WRITES;
-			}
+	if (hdr->good_bytes != 0 && nfs_write_need_commit(hdr)) {
+		switch (dreq->flags) {
+		case 0:
+			dreq->flags = NFS_ODIRECT_DO_COMMIT;
+			request_commit = true;
+			break;
+		case NFS_ODIRECT_RESCHED_WRITES:
+		case NFS_ODIRECT_DO_COMMIT:
+			request_commit = true;
 		}
 	}
 	spin_unlock(&dreq->lock);
@@ -990,11 +929,13 @@ ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter)
 	l_ctx = nfs_get_lock_context(dreq->ctx);
 	if (IS_ERR(l_ctx)) {
 		result = PTR_ERR(l_ctx);
+		nfs_direct_req_release(dreq);
 		goto out_release;
 	}
 	dreq->l_ctx = l_ctx;
 	if (!is_sync_kiocb(iocb))
 		dreq->iocb = iocb;
+	pnfs_init_ds_commit_info_ops(&dreq->ds_cinfo, inode);
 
 	nfs_start_io_direct(inode);
 
diff --git a/fs/nfs/filelayout/filelayout.c b/fs/nfs/filelayout/filelayout.c
index c9b605f6c9cb..a13e69009f19 100644
--- a/fs/nfs/filelayout/filelayout.c
+++ b/fs/nfs/filelayout/filelayout.c
@@ -49,6 +49,7 @@ MODULE_AUTHOR("Dean Hildebrand <dhildebz@umich.edu>");
 MODULE_DESCRIPTION("The NFSv4 file layout driver");
 
 #define FILELAYOUT_POLL_RETRY_MAX     (15*HZ)
+static const struct pnfs_commit_ops filelayout_commit_ops;
 
 static loff_t
 filelayout_get_dense_offset(struct nfs4_filelayout_segment *flseg,
@@ -750,72 +751,17 @@ filelayout_free_lseg(struct pnfs_layout_segment *lseg)
 	/* This assumes a single RW lseg */
 	if (lseg->pls_range.iomode == IOMODE_RW) {
 		struct nfs4_filelayout *flo;
+		struct inode *inode;
 
 		flo = FILELAYOUT_FROM_HDR(lseg->pls_layout);
-		flo->commit_info.nbuckets = 0;
-		kfree(flo->commit_info.buckets);
-		flo->commit_info.buckets = NULL;
+		inode = flo->generic_hdr.plh_inode;
+		spin_lock(&inode->i_lock);
+		pnfs_generic_ds_cinfo_release_lseg(&flo->commit_info, lseg);
+		spin_unlock(&inode->i_lock);
 	}
 	_filelayout_free_lseg(fl);
 }
 
-static int
-filelayout_alloc_commit_info(struct pnfs_layout_segment *lseg,
-			     struct nfs_commit_info *cinfo,
-			     gfp_t gfp_flags)
-{
-	struct nfs4_filelayout_segment *fl = FILELAYOUT_LSEG(lseg);
-	struct pnfs_commit_bucket *buckets;
-	int size, i;
-
-	if (fl->commit_through_mds)
-		return 0;
-
-	size = (fl->stripe_type == STRIPE_SPARSE) ?
-		fl->dsaddr->ds_num : fl->dsaddr->stripe_count;
-
-	if (cinfo->ds->nbuckets >= size) {
-		/* This assumes there is only one IOMODE_RW lseg.  What
-		 * we really want to do is have a layout_hdr level
-		 * dictionary of <multipath_list4, fh> keys, each
-		 * associated with a struct list_head, populated by calls
-		 * to filelayout_write_pagelist().
-		 * */
-		return 0;
-	}
-
-	buckets = kcalloc(size, sizeof(struct pnfs_commit_bucket),
-			  gfp_flags);
-	if (!buckets)
-		return -ENOMEM;
-	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&buckets[i].written);
-		INIT_LIST_HEAD(&buckets[i].committing);
-		/* mark direct verifier as unset */
-		buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW;
-	}
-
-	spin_lock(&cinfo->inode->i_lock);
-	if (cinfo->ds->nbuckets >= size)
-		goto out;
-	for (i = 0; i < cinfo->ds->nbuckets; i++) {
-		list_splice(&cinfo->ds->buckets[i].written,
-			    &buckets[i].written);
-		list_splice(&cinfo->ds->buckets[i].committing,
-			    &buckets[i].committing);
-		buckets[i].direct_verf.committed =
-			cinfo->ds->buckets[i].direct_verf.committed;
-		buckets[i].wlseg = cinfo->ds->buckets[i].wlseg;
-		buckets[i].clseg = cinfo->ds->buckets[i].clseg;
-	}
-	swap(cinfo->ds->buckets, buckets);
-	cinfo->ds->nbuckets = size;
-out:
-	spin_unlock(&cinfo->inode->i_lock);
-	kfree(buckets);
-	return 0;
-}
-
 static struct pnfs_layout_segment *
 filelayout_alloc_lseg(struct pnfs_layout_hdr *layoutid,
 		      struct nfs4_layoutget_res *lgr,
@@ -938,9 +884,6 @@ static void
 filelayout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 			 struct nfs_page *req)
 {
-	struct nfs_commit_info cinfo;
-	int status;
-
 	pnfs_generic_pg_check_layout(pgio);
 	if (!pgio->pg_lseg) {
 		pgio->pg_lseg = fl_pnfs_update_layout(pgio->pg_inode,
@@ -959,17 +902,7 @@ filelayout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 
 	/* If no lseg, fall back to write through mds */
 	if (pgio->pg_lseg == NULL)
-		goto out_mds;
-	nfs_init_cinfo(&cinfo, pgio->pg_inode, pgio->pg_dreq);
-	status = filelayout_alloc_commit_info(pgio->pg_lseg, &cinfo, GFP_NOFS);
-	if (status < 0) {
-		pnfs_put_lseg(pgio->pg_lseg);
-		pgio->pg_lseg = NULL;
-		goto out_mds;
-	}
-	return;
-out_mds:
-	nfs_pageio_reset_write_mds(pgio);
+		nfs_pageio_reset_write_mds(pgio);
 }
 
 static const struct nfs_pageio_ops filelayout_pg_read_ops = {
@@ -1078,36 +1011,6 @@ out_err:
 	return -EAGAIN;
 }
 
-/* filelayout_search_commit_reqs - Search lists in @cinfo for the head reqest
- *				   for @page
- * @cinfo - commit info for current inode
- * @page - page to search for matching head request
- *
- * Returns a the head request if one is found, otherwise returns NULL.
- */
-static struct nfs_page *
-filelayout_search_commit_reqs(struct nfs_commit_info *cinfo, struct page *page)
-{
-	struct nfs_page *freq, *t;
-	struct pnfs_commit_bucket *b;
-	int i;
-
-	/* Linearly search the commit lists for each bucket until a matching
-	 * request is found */
-	for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
-		list_for_each_entry_safe(freq, t, &b->written, wb_list) {
-			if (freq->wb_page == page)
-				return freq->wb_head;
-		}
-		list_for_each_entry_safe(freq, t, &b->committing, wb_list) {
-			if (freq->wb_page == page)
-				return freq->wb_head;
-		}
-	}
-
-	return NULL;
-}
-
 static int
 filelayout_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 			   int how, struct nfs_commit_info *cinfo)
@@ -1140,13 +1043,17 @@ filelayout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
 	struct nfs4_filelayout *flo;
 
 	flo = kzalloc(sizeof(*flo), gfp_flags);
-	return flo != NULL ? &flo->generic_hdr : NULL;
+	if (flo == NULL)
+		return NULL;
+	pnfs_init_ds_commit_info(&flo->commit_info);
+	flo->commit_info.ops = &filelayout_commit_ops;
+	return &flo->generic_hdr;
 }
 
 static void
 filelayout_free_layout_hdr(struct pnfs_layout_hdr *lo)
 {
-	kfree(FILELAYOUT_FROM_HDR(lo));
+	kfree_rcu(FILELAYOUT_FROM_HDR(lo), generic_hdr.plh_rcu);
 }
 
 static struct pnfs_ds_commit_info *
@@ -1160,6 +1067,46 @@ filelayout_get_ds_info(struct inode *inode)
 		return &FILELAYOUT_FROM_HDR(layout)->commit_info;
 }
 
+static void
+filelayout_setup_ds_info(struct pnfs_ds_commit_info *fl_cinfo,
+		struct pnfs_layout_segment *lseg)
+{
+	struct nfs4_filelayout_segment *fl = FILELAYOUT_LSEG(lseg);
+	struct inode *inode = lseg->pls_layout->plh_inode;
+	struct pnfs_commit_array *array, *new;
+	unsigned int size = (fl->stripe_type == STRIPE_SPARSE) ?
+		fl->dsaddr->ds_num : fl->dsaddr->stripe_count;
+
+	new = pnfs_alloc_commit_array(size, GFP_NOIO);
+	if (new) {
+		spin_lock(&inode->i_lock);
+		array = pnfs_add_commit_array(fl_cinfo, new, lseg);
+		spin_unlock(&inode->i_lock);
+		if (array != new)
+			pnfs_free_commit_array(new);
+	}
+}
+
+static void
+filelayout_release_ds_info(struct pnfs_ds_commit_info *fl_cinfo,
+		struct inode *inode)
+{
+	spin_lock(&inode->i_lock);
+	pnfs_generic_ds_cinfo_destroy(fl_cinfo);
+	spin_unlock(&inode->i_lock);
+}
+
+static const struct pnfs_commit_ops filelayout_commit_ops = {
+	.setup_ds_info		= filelayout_setup_ds_info,
+	.release_ds_info	= filelayout_release_ds_info,
+	.mark_request_commit	= filelayout_mark_request_commit,
+	.clear_request_commit	= pnfs_generic_clear_request_commit,
+	.scan_commit_lists	= pnfs_generic_scan_commit_lists,
+	.recover_commit_reqs	= pnfs_generic_recover_commit_reqs,
+	.search_commit_reqs	= pnfs_generic_search_commit_reqs,
+	.commit_pagelist	= filelayout_commit_pagelist,
+};
+
 static struct pnfs_layoutdriver_type filelayout_type = {
 	.id			= LAYOUT_NFSV4_1_FILES,
 	.name			= "LAYOUT_NFSV4_1_FILES",
@@ -1173,12 +1120,6 @@ static struct pnfs_layoutdriver_type filelayout_type = {
 	.pg_read_ops		= &filelayout_pg_read_ops,
 	.pg_write_ops		= &filelayout_pg_write_ops,
 	.get_ds_info		= &filelayout_get_ds_info,
-	.mark_request_commit	= filelayout_mark_request_commit,
-	.clear_request_commit	= pnfs_generic_clear_request_commit,
-	.scan_commit_lists	= pnfs_generic_scan_commit_lists,
-	.recover_commit_reqs	= pnfs_generic_recover_commit_reqs,
-	.search_commit_reqs	= filelayout_search_commit_reqs,
-	.commit_pagelist	= filelayout_commit_pagelist,
 	.read_pagelist		= filelayout_read_pagelist,
 	.write_pagelist		= filelayout_write_pagelist,
 	.alloc_deviceid_node	= filelayout_alloc_deviceid_node,
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index bb9148b83166..7d399f72ebbb 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -32,6 +32,7 @@
 
 static unsigned short io_maxretrans;
 
+static const struct pnfs_commit_ops ff_layout_commit_ops;
 static void ff_layout_read_record_layoutstats_done(struct rpc_task *task,
 		struct nfs_pgio_header *hdr);
 static int ff_layout_mirror_prepare_stats(struct pnfs_layout_hdr *lo,
@@ -48,9 +49,11 @@ ff_layout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
 
 	ffl = kzalloc(sizeof(*ffl), gfp_flags);
 	if (ffl) {
+		pnfs_init_ds_commit_info(&ffl->commit_info);
 		INIT_LIST_HEAD(&ffl->error_list);
 		INIT_LIST_HEAD(&ffl->mirrors);
 		ffl->last_report_time = ktime_get();
+		ffl->commit_info.ops = &ff_layout_commit_ops;
 		return &ffl->generic_hdr;
 	} else
 		return NULL;
@@ -59,14 +62,14 @@ ff_layout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
 static void
 ff_layout_free_layout_hdr(struct pnfs_layout_hdr *lo)
 {
+	struct nfs4_flexfile_layout *ffl = FF_LAYOUT_FROM_HDR(lo);
 	struct nfs4_ff_layout_ds_err *err, *n;
 
-	list_for_each_entry_safe(err, n, &FF_LAYOUT_FROM_HDR(lo)->error_list,
-				 list) {
+	list_for_each_entry_safe(err, n, &ffl->error_list, list) {
 		list_del(&err->list);
 		kfree(err);
 	}
-	kfree(FF_LAYOUT_FROM_HDR(lo));
+	kfree_rcu(ffl, generic_hdr.plh_rcu);
 }
 
 static int decode_pnfs_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid)
@@ -248,36 +251,10 @@ static void ff_layout_put_mirror(struct nfs4_ff_layout_mirror *mirror)
 
 static void ff_layout_free_mirror_array(struct nfs4_ff_layout_segment *fls)
 {
-	int i;
-
-	if (fls->mirror_array) {
-		for (i = 0; i < fls->mirror_array_cnt; i++) {
-			/* normally mirror_ds is freed in
-			 * .free_deviceid_node but we still do it here
-			 * for .alloc_lseg error path */
-			ff_layout_put_mirror(fls->mirror_array[i]);
-		}
-		kfree(fls->mirror_array);
-		fls->mirror_array = NULL;
-	}
-}
-
-static int ff_layout_check_layout(struct nfs4_layoutget_res *lgr)
-{
-	int ret = 0;
+	u32 i;
 
-	dprintk("--> %s\n", __func__);
-
-	/* FIXME: remove this check when layout segment support is added */
-	if (lgr->range.offset != 0 ||
-	    lgr->range.length != NFS4_MAX_UINT64) {
-		dprintk("%s Only whole file layouts supported. Use MDS i/o\n",
-			__func__);
-		ret = -EINVAL;
-	}
-
-	dprintk("--> %s returns %d\n", __func__, ret);
-	return ret;
+	for (i = 0; i < fls->mirror_array_cnt; i++)
+		ff_layout_put_mirror(fls->mirror_array[i]);
 }
 
 static void _ff_layout_free_lseg(struct nfs4_ff_layout_segment *fls)
@@ -289,6 +266,23 @@ static void _ff_layout_free_lseg(struct nfs4_ff_layout_segment *fls)
 }
 
 static bool
+ff_lseg_match_mirrors(struct pnfs_layout_segment *l1,
+		struct pnfs_layout_segment *l2)
+{
+	const struct nfs4_ff_layout_segment *fl1 = FF_LAYOUT_LSEG(l1);
+	const struct nfs4_ff_layout_segment *fl2 = FF_LAYOUT_LSEG(l1);
+	u32 i;
+
+	if (fl1->mirror_array_cnt != fl2->mirror_array_cnt)
+		return false;
+	for (i = 0; i < fl1->mirror_array_cnt; i++) {
+		if (fl1->mirror_array[i] != fl2->mirror_array[i])
+			return false;
+	}
+	return true;
+}
+
+static bool
 ff_lseg_range_is_after(const struct pnfs_layout_range *l1,
 		const struct pnfs_layout_range *l2)
 {
@@ -323,6 +317,8 @@ ff_lseg_merge(struct pnfs_layout_segment *new,
 			new->pls_range.length);
 	if (new_end < old->pls_range.offset)
 		return false;
+	if (!ff_lseg_match_mirrors(new, old))
+		return false;
 
 	/* Mergeable: copy info from 'old' to 'new' */
 	if (new_end < old_end)
@@ -400,16 +396,13 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 		goto out_err_free;
 
 	rc = -ENOMEM;
-	fls = kzalloc(sizeof(*fls), gfp_flags);
+	fls = kzalloc(struct_size(fls, mirror_array, mirror_array_cnt),
+			gfp_flags);
 	if (!fls)
 		goto out_err_free;
 
 	fls->mirror_array_cnt = mirror_array_cnt;
 	fls->stripe_unit = stripe_unit;
-	fls->mirror_array = kcalloc(fls->mirror_array_cnt,
-				    sizeof(fls->mirror_array[0]), gfp_flags);
-	if (fls->mirror_array == NULL)
-		goto out_err_free;
 
 	for (i = 0; i < fls->mirror_array_cnt; i++) {
 		struct nfs4_ff_layout_mirror *mirror;
@@ -545,9 +538,6 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh,
 
 out_sort_mirrors:
 	ff_layout_sort_mirrors(fls);
-	rc = ff_layout_check_layout(lgr);
-	if (rc)
-		goto out_err_free;
 	ret = &fls->generic_hdr;
 	dprintk("<-- %s (success)\n", __func__);
 out_free_page:
@@ -560,17 +550,6 @@ out_err_free:
 	goto out_free_page;
 }
 
-static bool ff_layout_has_rw_segments(struct pnfs_layout_hdr *layout)
-{
-	struct pnfs_layout_segment *lseg;
-
-	list_for_each_entry(lseg, &layout->plh_segs, pls_list)
-		if (lseg->pls_range.iomode == IOMODE_RW)
-			return true;
-
-	return false;
-}
-
 static void
 ff_layout_free_lseg(struct pnfs_layout_segment *lseg)
 {
@@ -585,23 +564,12 @@ ff_layout_free_lseg(struct pnfs_layout_segment *lseg)
 		ffl = FF_LAYOUT_FROM_HDR(lseg->pls_layout);
 		inode = ffl->generic_hdr.plh_inode;
 		spin_lock(&inode->i_lock);
-		if (!ff_layout_has_rw_segments(lseg->pls_layout)) {
-			ffl->commit_info.nbuckets = 0;
-			kfree(ffl->commit_info.buckets);
-			ffl->commit_info.buckets = NULL;
-		}
+		pnfs_generic_ds_cinfo_release_lseg(&ffl->commit_info, lseg);
 		spin_unlock(&inode->i_lock);
 	}
 	_ff_layout_free_lseg(fls);
 }
 
-/* Return 1 until we have multiple lsegs support */
-static int
-ff_layout_get_lseg_count(struct nfs4_ff_layout_segment *fls)
-{
-	return 1;
-}
-
 static void
 nfs4_ff_start_busy_timer(struct nfs4_ff_busy_timer *timer, ktime_t now)
 {
@@ -746,52 +714,6 @@ nfs4_ff_layout_stat_io_end_write(struct rpc_task *task,
 	spin_unlock(&mirror->lock);
 }
 
-static int
-ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
-			    struct nfs_commit_info *cinfo,
-			    gfp_t gfp_flags)
-{
-	struct nfs4_ff_layout_segment *fls = FF_LAYOUT_LSEG(lseg);
-	struct pnfs_commit_bucket *buckets;
-	int size;
-
-	if (cinfo->ds->nbuckets != 0) {
-		/* This assumes there is only one RW lseg per file.
-		 * To support multiple lseg per file, we need to
-		 * change struct pnfs_commit_bucket to allow dynamic
-		 * increasing nbuckets.
-		 */
-		return 0;
-	}
-
-	size = ff_layout_get_lseg_count(fls) * FF_LAYOUT_MIRROR_COUNT(lseg);
-
-	buckets = kcalloc(size, sizeof(struct pnfs_commit_bucket),
-			  gfp_flags);
-	if (!buckets)
-		return -ENOMEM;
-	else {
-		int i;
-
-		spin_lock(&cinfo->inode->i_lock);
-		if (cinfo->ds->nbuckets != 0)
-			kfree(buckets);
-		else {
-			cinfo->ds->buckets = buckets;
-			cinfo->ds->nbuckets = size;
-			for (i = 0; i < size; i++) {
-				INIT_LIST_HEAD(&buckets[i].written);
-				INIT_LIST_HEAD(&buckets[i].committing);
-				/* mark direct verifier as unset */
-				buckets[i].direct_verf.committed =
-					NFS_INVALID_STABLE_HOW;
-			}
-		}
-		spin_unlock(&cinfo->inode->i_lock);
-		return 0;
-	}
-}
-
 static void
 ff_layout_mark_ds_unreachable(struct pnfs_layout_segment *lseg, int idx)
 {
@@ -876,8 +798,8 @@ ff_layout_pg_get_read(struct nfs_pageio_descriptor *pgio,
 	pnfs_put_lseg(pgio->pg_lseg);
 	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 					   nfs_req_openctx(req),
-					   0,
-					   NFS4_MAX_UINT64,
+					   req_offset(req),
+					   req->wb_bytes,
 					   IOMODE_READ,
 					   strict_iomode,
 					   GFP_KERNEL);
@@ -888,6 +810,14 @@ ff_layout_pg_get_read(struct nfs_pageio_descriptor *pgio,
 }
 
 static void
+ff_layout_pg_check_layout(struct nfs_pageio_descriptor *pgio,
+			  struct nfs_page *req)
+{
+	pnfs_generic_pg_check_layout(pgio);
+	pnfs_generic_pg_check_range(pgio, req);
+}
+
+static void
 ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 			struct nfs_page *req)
 {
@@ -897,7 +827,7 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 	int ds_idx;
 
 retry:
-	pnfs_generic_pg_check_layout(pgio);
+	ff_layout_pg_check_layout(pgio, req);
 	/* Use full layout for now */
 	if (!pgio->pg_lseg) {
 		ff_layout_pg_get_read(pgio, req, false);
@@ -953,18 +883,16 @@ ff_layout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 {
 	struct nfs4_ff_layout_mirror *mirror;
 	struct nfs_pgio_mirror *pgm;
-	struct nfs_commit_info cinfo;
 	struct nfs4_pnfs_ds *ds;
 	int i;
-	int status;
 
 retry:
-	pnfs_generic_pg_check_layout(pgio);
+	ff_layout_pg_check_layout(pgio, req);
 	if (!pgio->pg_lseg) {
 		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 						   nfs_req_openctx(req),
-						   0,
-						   NFS4_MAX_UINT64,
+						   req_offset(req),
+						   req->wb_bytes,
 						   IOMODE_RW,
 						   false,
 						   GFP_NOFS);
@@ -978,11 +906,6 @@ retry:
 	if (pgio->pg_lseg == NULL)
 		goto out_mds;
 
-	nfs_init_cinfo(&cinfo, pgio->pg_inode, pgio->pg_dreq);
-	status = ff_layout_alloc_commit_info(pgio->pg_lseg, &cinfo, GFP_NOFS);
-	if (status < 0)
-		goto out_mds;
-
 	/* Use a direct mapping of ds_idx to pgio mirror_idx */
 	if (WARN_ON_ONCE(pgio->pg_mirror_count !=
 	    FF_LAYOUT_MIRROR_COUNT(pgio->pg_lseg)))
@@ -1297,21 +1220,23 @@ static void ff_layout_io_track_ds_error(struct pnfs_layout_segment *lseg,
 		}
 	}
 
+	mirror = FF_LAYOUT_COMP(lseg, idx);
+	err = ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
+				       mirror, offset, length, status, opnum,
+				       GFP_NOIO);
+
 	switch (status) {
 	case NFS4ERR_DELAY:
 	case NFS4ERR_GRACE:
-		return;
-	default:
 		break;
+	case NFS4ERR_NXIO:
+		ff_layout_mark_ds_unreachable(lseg, idx);
+		/* Fallthrough */
+	default:
+		pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode,
+						  lseg);
 	}
 
-	mirror = FF_LAYOUT_COMP(lseg, idx);
-	err = ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
-				       mirror, offset, length, status, opnum,
-				       GFP_NOIO);
-	if (status == NFS4ERR_NXIO)
-		ff_layout_mark_ds_unreachable(lseg, idx);
-	pnfs_error_mark_layout_for_return(lseg->pls_layout->plh_inode, lseg);
 	dprintk("%s: err %d op %d status %u\n", __func__, err, opnum, status);
 }
 
@@ -2012,6 +1937,33 @@ ff_layout_get_ds_info(struct inode *inode)
 }
 
 static void
+ff_layout_setup_ds_info(struct pnfs_ds_commit_info *fl_cinfo,
+		struct pnfs_layout_segment *lseg)
+{
+	struct nfs4_ff_layout_segment *flseg = FF_LAYOUT_LSEG(lseg);
+	struct inode *inode = lseg->pls_layout->plh_inode;
+	struct pnfs_commit_array *array, *new;
+
+	new = pnfs_alloc_commit_array(flseg->mirror_array_cnt, GFP_NOIO);
+	if (new) {
+		spin_lock(&inode->i_lock);
+		array = pnfs_add_commit_array(fl_cinfo, new, lseg);
+		spin_unlock(&inode->i_lock);
+		if (array != new)
+			pnfs_free_commit_array(new);
+	}
+}
+
+static void
+ff_layout_release_ds_info(struct pnfs_ds_commit_info *fl_cinfo,
+		struct inode *inode)
+{
+	spin_lock(&inode->i_lock);
+	pnfs_generic_ds_cinfo_destroy(fl_cinfo);
+	spin_unlock(&inode->i_lock);
+}
+
+static void
 ff_layout_free_deviceid_node(struct nfs4_deviceid_node *d)
 {
 	nfs4_ff_layout_free_deviceid(container_of(d, struct nfs4_ff_layout_ds,
@@ -2496,6 +2448,16 @@ ff_layout_set_layoutdriver(struct nfs_server *server,
 	return 0;
 }
 
+static const struct pnfs_commit_ops ff_layout_commit_ops = {
+	.setup_ds_info		= ff_layout_setup_ds_info,
+	.release_ds_info	= ff_layout_release_ds_info,
+	.mark_request_commit	= pnfs_layout_mark_request_commit,
+	.clear_request_commit	= pnfs_generic_clear_request_commit,
+	.scan_commit_lists	= pnfs_generic_scan_commit_lists,
+	.recover_commit_reqs	= pnfs_generic_recover_commit_reqs,
+	.commit_pagelist	= ff_layout_commit_pagelist,
+};
+
 static struct pnfs_layoutdriver_type flexfilelayout_type = {
 	.id			= LAYOUT_FLEX_FILES,
 	.name			= "LAYOUT_FLEX_FILES",
@@ -2512,11 +2474,6 @@ static struct pnfs_layoutdriver_type flexfilelayout_type = {
 	.pg_write_ops		= &ff_layout_pg_write_ops,
 	.get_ds_info		= ff_layout_get_ds_info,
 	.free_deviceid_node	= ff_layout_free_deviceid_node,
-	.mark_request_commit	= pnfs_layout_mark_request_commit,
-	.clear_request_commit	= pnfs_generic_clear_request_commit,
-	.scan_commit_lists	= pnfs_generic_scan_commit_lists,
-	.recover_commit_reqs	= pnfs_generic_recover_commit_reqs,
-	.commit_pagelist	= ff_layout_commit_pagelist,
 	.read_pagelist		= ff_layout_read_pagelist,
 	.write_pagelist		= ff_layout_write_pagelist,
 	.alloc_deviceid_node    = ff_layout_alloc_deviceid_node,
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.h b/fs/nfs/flexfilelayout/flexfilelayout.h
index 2f369966abf7..354a031c69b1 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.h
+++ b/fs/nfs/flexfilelayout/flexfilelayout.h
@@ -99,7 +99,7 @@ struct nfs4_ff_layout_segment {
 	u64				stripe_unit;
 	u32				flags;
 	u32				mirror_array_cnt;
-	struct nfs4_ff_layout_mirror	**mirror_array;
+	struct nfs4_ff_layout_mirror	*mirror_array[];
 };
 
 struct nfs4_flexfile_layout {
diff --git a/fs/nfs/fs_context.c b/fs/nfs/fs_context.c
index e113fcb4bb4c..ccc88be88d6a 100644
--- a/fs/nfs/fs_context.c
+++ b/fs/nfs/fs_context.c
@@ -190,6 +190,7 @@ static const struct constant_table nfs_vers_tokens[] = {
 	{ "4.0",	Opt_vers_4_0 },
 	{ "4.1",	Opt_vers_4_1 },
 	{ "4.2",	Opt_vers_4_2 },
+	{}
 };
 
 enum {
@@ -202,13 +203,14 @@ enum {
 	nr__Opt_xprt
 };
 
-static const struct constant_table nfs_xprt_protocol_tokens[nr__Opt_xprt] = {
+static const struct constant_table nfs_xprt_protocol_tokens[] = {
 	{ "rdma",	Opt_xprt_rdma },
 	{ "rdma6",	Opt_xprt_rdma6 },
 	{ "tcp",	Opt_xprt_tcp },
 	{ "tcp6",	Opt_xprt_tcp6 },
 	{ "udp",	Opt_xprt_udp },
 	{ "udp6",	Opt_xprt_udp6 },
+	{}
 };
 
 enum {
@@ -239,6 +241,7 @@ static const struct constant_table nfs_secflavor_tokens[] = {
 	{ "spkm3i",	Opt_sec_spkmi },
 	{ "spkm3p",	Opt_sec_spkmp },
 	{ "sys",	Opt_sec_sys },
+	{}
 };
 
 /*
@@ -1135,7 +1138,7 @@ out_no_address:
 	return nfs_invalf(fc, "NFS4: mount program didn't pass remote address");
 
 out_invalid_transport_udp:
-	return nfs_invalf(fc, "NFSv4: Unsupported transport protocol udp");
+	return nfs_invalf(fc, "NFS: Unsupported transport protocol udp");
 }
 #endif
 
@@ -1257,7 +1260,7 @@ out_v4_not_compiled:
 	nfs_errorf(fc, "NFS: NFSv4 is not compiled into kernel");
 	return -EPROTONOSUPPORT;
 out_invalid_transport_udp:
-	return nfs_invalf(fc, "NFSv4: Unsupported transport protocol udp");
+	return nfs_invalf(fc, "NFS: Unsupported transport protocol udp");
 out_no_address:
 	return nfs_invalf(fc, "NFS: mount program didn't pass remote address");
 out_mountproto_mismatch:
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index 11bf15800ac9..b9d0921cb4fe 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -62,7 +62,6 @@
 /* Default is to see 64-bit inode numbers */
 static bool enable_ino64 = NFS_64_BIT_INODE_NUMBERS_ENABLED;
 
-static void nfs_invalidate_inode(struct inode *);
 static int nfs_update_inode(struct inode *, struct nfs_fattr *);
 
 static struct kmem_cache * nfs_inode_cachep;
@@ -284,10 +283,18 @@ EXPORT_SYMBOL_GPL(nfs_invalidate_atime);
  * Invalidate, but do not unhash, the inode.
  * NB: must be called with inode->i_lock held!
  */
-static void nfs_invalidate_inode(struct inode *inode)
+static void nfs_set_inode_stale_locked(struct inode *inode)
 {
 	set_bit(NFS_INO_STALE, &NFS_I(inode)->flags);
 	nfs_zap_caches_locked(inode);
+	trace_nfs_set_inode_stale(inode);
+}
+
+void nfs_set_inode_stale(struct inode *inode)
+{
+	spin_lock(&inode->i_lock);
+	nfs_set_inode_stale_locked(inode);
+	spin_unlock(&inode->i_lock);
 }
 
 struct nfs_find_desc {
@@ -959,16 +966,16 @@ struct nfs_open_context *alloc_nfs_open_context(struct dentry *dentry,
 						struct file *filp)
 {
 	struct nfs_open_context *ctx;
-	const struct cred *cred = get_current_cred();
 
 	ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
-	if (!ctx) {
-		put_cred(cred);
+	if (!ctx)
 		return ERR_PTR(-ENOMEM);
-	}
 	nfs_sb_active(dentry->d_sb);
 	ctx->dentry = dget(dentry);
-	ctx->cred = cred;
+	if (filp)
+		ctx->cred = get_cred(filp->f_cred);
+	else
+		ctx->cred = get_current_cred();
 	ctx->ll_cred = NULL;
 	ctx->state = NULL;
 	ctx->mode = f_mode;
@@ -1163,9 +1170,10 @@ __nfs_revalidate_inode(struct nfs_server *server, struct inode *inode)
 				status = 0;
 			break;
 		case -ESTALE:
-			nfs_zap_caches(inode);
 			if (!S_ISDIR(inode->i_mode))
-				set_bit(NFS_INO_STALE, &NFS_I(inode)->flags);
+				nfs_set_inode_stale(inode);
+			else
+				nfs_zap_caches(inode);
 		}
 		goto err_out;
 	}
@@ -2064,7 +2072,7 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 	 * lookup validation will know that the inode is bad.
 	 * (But we fall through to invalidate the caches.)
 	 */
-	nfs_invalidate_inode(inode);
+	nfs_set_inode_stale_locked(inode);
 	return -ESTALE;
 }
 
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index f80c47d5ff27..1f32a9fbfdaf 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -274,12 +274,6 @@ void nfs_free_request(struct nfs_page *req);
 struct nfs_pgio_mirror *
 nfs_pgio_current_mirror(struct nfs_pageio_descriptor *desc);
 
-static inline bool nfs_pgio_has_mirroring(struct nfs_pageio_descriptor *desc)
-{
-	WARN_ON_ONCE(desc->pg_mirror_count < 1);
-	return desc->pg_mirror_count > 1;
-}
-
 static inline bool nfs_match_open_context(const struct nfs_open_context *ctx1,
 		const struct nfs_open_context *ctx2)
 {
@@ -417,7 +411,9 @@ extern int __init register_nfs_fs(void);
 extern void __exit unregister_nfs_fs(void);
 extern bool nfs_sb_active(struct super_block *sb);
 extern void nfs_sb_deactive(struct super_block *sb);
-
+extern int nfs_client_for_each_server(struct nfs_client *clp,
+				      int (*fn)(struct nfs_server *, void *),
+				      void *data);
 /* io.c */
 extern void nfs_start_io_read(struct inode *inode);
 extern void nfs_end_io_read(struct inode *inode);
@@ -515,13 +511,25 @@ int nfs_filemap_write_and_wait_range(struct address_space *mapping,
 		loff_t lstart, loff_t lend);
 
 #ifdef CONFIG_NFS_V4_1
+static inline void
+pnfs_bucket_clear_pnfs_ds_commit_verifiers(struct pnfs_commit_bucket *buckets,
+		unsigned int nbuckets)
+{
+	unsigned int i;
+
+	for (i = 0; i < nbuckets; i++)
+		buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW;
+}
 static inline
 void nfs_clear_pnfs_ds_commit_verifiers(struct pnfs_ds_commit_info *cinfo)
 {
-	int i;
+	struct pnfs_commit_array *array;
 
-	for (i = 0; i < cinfo->nbuckets; i++)
-		cinfo->buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW;
+	rcu_read_lock();
+	list_for_each_entry_rcu(array, &cinfo->commits, cinfo_list)
+		pnfs_bucket_clear_pnfs_ds_commit_verifiers(array->buckets,
+				array->nbuckets);
+	rcu_read_unlock();
 }
 #else
 static inline
@@ -542,6 +550,14 @@ nfs_write_verifier_cmp(const struct nfs_write_verifier *v1,
 	return memcmp(v1->data, v2->data, sizeof(v1->data));
 }
 
+static inline bool
+nfs_write_match_verf(const struct nfs_writeverf *verf,
+		struct nfs_page *req)
+{
+	return verf->committed > NFS_UNSTABLE &&
+		!nfs_write_verifier_cmp(&req->wb_verf, &verf->verifier);
+}
+
 /* unlink.c */
 extern struct rpc_task *
 nfs_async_rename(struct inode *old_dir, struct inode *new_dir,
diff --git a/fs/nfs/namespace.c b/fs/nfs/namespace.c
index f3ece8ed3203..6b063227e34e 100644
--- a/fs/nfs/namespace.c
+++ b/fs/nfs/namespace.c
@@ -145,6 +145,7 @@ struct vfsmount *nfs_d_automount(struct path *path)
 	struct vfsmount *mnt = ERR_PTR(-ENOMEM);
 	struct nfs_server *server = NFS_SERVER(d_inode(path->dentry));
 	struct nfs_client *client = server->nfs_client;
+	int timeout = READ_ONCE(nfs_mountpoint_expiry_timeout);
 	int ret;
 
 	if (IS_ROOT(path->dentry))
@@ -190,12 +191,12 @@ struct vfsmount *nfs_d_automount(struct path *path)
 	if (IS_ERR(mnt))
 		goto out_fc;
 
-	if (nfs_mountpoint_expiry_timeout < 0)
+	mntget(mnt); /* prevent immediate expiration */
+	if (timeout <= 0)
 		goto out_fc;
 
-	mntget(mnt); /* prevent immediate expiration */
 	mnt_set_expiry(mnt, &nfs_automount_list);
-	schedule_delayed_work(&nfs_automount_task, nfs_mountpoint_expiry_timeout);
+	schedule_delayed_work(&nfs_automount_task, timeout);
 
 out_fc:
 	put_fs_context(fc);
@@ -233,10 +234,11 @@ const struct inode_operations nfs_referral_inode_operations = {
 static void nfs_expire_automounts(struct work_struct *work)
 {
 	struct list_head *list = &nfs_automount_list;
+	int timeout = READ_ONCE(nfs_mountpoint_expiry_timeout);
 
 	mark_mounts_for_expiry(list);
-	if (!list_empty(list))
-		schedule_delayed_work(&nfs_automount_task, nfs_mountpoint_expiry_timeout);
+	if (!list_empty(list) && timeout > 0)
+		schedule_delayed_work(&nfs_automount_task, timeout);
 }
 
 void nfs_release_automount_timer(void)
@@ -247,10 +249,7 @@ void nfs_release_automount_timer(void)
 
 /**
  * nfs_do_submount - set up mountpoint when crossing a filesystem boundary
- * @dentry: parent directory
- * @fh: filehandle for new root dentry
- * @fattr: attributes for new root inode
- * @authflavor: security flavor to use when performing the mount
+ * @fc: pointer to struct nfs_fs_context
  *
  */
 int nfs_do_submount(struct fs_context *fc)
@@ -312,3 +311,53 @@ int nfs_submount(struct fs_context *fc, struct nfs_server *server)
 	return nfs_do_submount(fc);
 }
 EXPORT_SYMBOL_GPL(nfs_submount);
+
+static int param_set_nfs_timeout(const char *val, const struct kernel_param *kp)
+{
+	long num;
+	int ret;
+
+	if (!val)
+		return -EINVAL;
+	ret = kstrtol(val, 0, &num);
+	if (ret)
+		return -EINVAL;
+	if (num > 0) {
+		if (num >= INT_MAX / HZ)
+			num = INT_MAX;
+		else
+			num *= HZ;
+		*((int *)kp->arg) = num;
+		if (!list_empty(&nfs_automount_list))
+			mod_delayed_work(system_wq, &nfs_automount_task, num);
+	} else {
+		*((int *)kp->arg) = -1*HZ;
+		cancel_delayed_work(&nfs_automount_task);
+	}
+	return 0;
+}
+
+static int param_get_nfs_timeout(char *buffer, const struct kernel_param *kp)
+{
+	long num = *((int *)kp->arg);
+
+	if (num > 0) {
+		if (num >= INT_MAX - (HZ - 1))
+			num = INT_MAX / HZ;
+		else
+			num = (num + (HZ - 1)) / HZ;
+	} else
+		num = -1;
+	return scnprintf(buffer, PAGE_SIZE, "%li\n", num);
+}
+
+static const struct kernel_param_ops param_ops_nfs_timeout = {
+	.set = param_set_nfs_timeout,
+	.get = param_get_nfs_timeout,
+};
+#define param_check_nfs_timeout(name, p) __param_check(name, p, int);
+
+module_param(nfs_mountpoint_expiry_timeout, nfs_timeout, 0644);
+MODULE_PARM_DESC(nfs_mountpoint_expiry_timeout,
+		"Set the NFS automounted mountpoint timeout value (seconds)."
+		"Values <= 0 turn expiration off.");
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 8be1ba7c62bb..2b7f6dcd2eb8 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -42,7 +42,9 @@ enum nfs4_client_state {
 	NFS4CLNT_LEASE_MOVED,
 	NFS4CLNT_DELEGATION_EXPIRED,
 	NFS4CLNT_RUN_MANAGER,
-	NFS4CLNT_DELEGRETURN_RUNNING,
+	NFS4CLNT_RECALL_RUNNING,
+	NFS4CLNT_RECALL_ANY_LAYOUT_READ,
+	NFS4CLNT_RECALL_ANY_LAYOUT_RW,
 };
 
 #define NFS4_RENEW_TIMEOUT		0x01
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index 1297919e0fce..8e5d6223ddd3 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -252,6 +252,9 @@ static loff_t nfs42_remap_file_range(struct file *src_file, loff_t src_off,
 	if (remap_flags & ~REMAP_FILE_ADVISORY)
 		return -EINVAL;
 
+	if (IS_SWAPFILE(dst_inode) || IS_SWAPFILE(src_inode))
+		return -ETXTBSY;
+
 	/* check alignment w.r.t. clone_blksize */
 	ret = -EINVAL;
 	if (bs) {
diff --git a/fs/nfs/nfs4namespace.c b/fs/nfs/nfs4namespace.c
index 84026e7b8a5f..a3ab6e219061 100644
--- a/fs/nfs/nfs4namespace.c
+++ b/fs/nfs/nfs4namespace.c
@@ -354,7 +354,7 @@ static int try_location(struct fs_context *fc,
 
 /**
  * nfs_follow_referral - set up mountpoint when hitting a referral on moved error
- * @dentry: parent directory
+ * @fc: pointer to struct nfs_fs_context
  * @locations: array of NFSv4 server location information
  *
  */
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index cb34e840e4fb..512afb1c7867 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -2346,7 +2346,7 @@ static int _nfs4_proc_open_confirm(struct nfs4_opendata *data)
 		.callback_ops = &nfs4_open_confirm_ops,
 		.callback_data = data,
 		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF,
 	};
 	int status;
 
@@ -2511,7 +2511,7 @@ static int nfs4_run_open_task(struct nfs4_opendata *data,
 		.callback_ops = &nfs4_open_ops,
 		.callback_data = data,
 		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF,
 	};
 	int status;
 
@@ -2790,16 +2790,19 @@ static int nfs41_check_delegation_stateid(struct nfs4_state *state)
 		return NFS_OK;
 	}
 
+	spin_lock(&delegation->lock);
 	nfs4_stateid_copy(&stateid, &delegation->stateid);
 
 	if (!test_and_clear_bit(NFS_DELEGATION_TEST_EXPIRED,
 				&delegation->flags)) {
+		spin_unlock(&delegation->lock);
 		rcu_read_unlock();
 		return NFS_OK;
 	}
 
 	if (delegation->cred)
 		cred = get_cred(delegation->cred);
+	spin_unlock(&delegation->lock);
 	rcu_read_unlock();
 	status = nfs41_test_and_free_expired_stateid(server, &stateid, cred);
 	trace_nfs4_test_delegation_stateid(state, NULL, status);
@@ -3651,7 +3654,7 @@ int nfs4_do_close(struct nfs4_state *state, gfp_t gfp_mask, int wait)
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_close_ops,
 		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF,
 	};
 	int status = -ENOMEM;
 
@@ -5544,7 +5547,7 @@ unwind:
 struct nfs4_cached_acl {
 	int cached;
 	size_t len;
-	char data[0];
+	char data[];
 };
 
 static void nfs4_set_cached_acl(struct inode *inode, struct nfs4_cached_acl *acl)
@@ -6253,6 +6256,7 @@ static void nfs4_delegreturn_done(struct rpc_task *task, void *calldata)
 		/* Fallthrough */
 	case -NFS4ERR_BAD_STATEID:
 	case -NFS4ERR_STALE_STATEID:
+	case -ETIMEDOUT:
 		task->tk_status = 0;
 		break;
 	case -NFS4ERR_OLD_STATEID:
@@ -6343,7 +6347,7 @@ static int _nfs4_proc_delegreturn(struct inode *inode, const struct cred *cred,
 		.rpc_client = server->client,
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_delegreturn_ops,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF | RPC_TASK_TIMEOUT,
 	};
 	int status = 0;
 
@@ -6926,7 +6930,7 @@ static int _nfs4_do_setlk(struct nfs4_state *state, int cmd, struct file_lock *f
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_lock_ops,
 		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF,
 	};
 	int ret;
 
@@ -9170,7 +9174,7 @@ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, long *timeout)
 		.rpc_message = &msg,
 		.callback_ops = &nfs4_layoutget_call_ops,
 		.callback_data = lgp,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF,
 	};
 	struct pnfs_layout_segment *lseg = NULL;
 	struct nfs4_exception exception = {
@@ -9287,6 +9291,7 @@ static void nfs4_layoutreturn_release(void *calldata)
 		lrp->ld_private.ops->free(&lrp->ld_private);
 	pnfs_put_layout_hdr(lrp->args.layout);
 	nfs_iput_and_deactive(lrp->inode);
+	put_cred(lrp->cred);
 	kfree(calldata);
 	dprintk("<-- %s\n", __func__);
 }
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index f7723d221945..ac93715c05a4 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -2524,6 +2524,21 @@ static int nfs4_bind_conn_to_session(struct nfs_client *clp)
 	}
 	return 0;
 }
+
+static void nfs4_layoutreturn_any_run(struct nfs_client *clp)
+{
+	int iomode = 0;
+
+	if (test_and_clear_bit(NFS4CLNT_RECALL_ANY_LAYOUT_READ, &clp->cl_state))
+		iomode += IOMODE_READ;
+	if (test_and_clear_bit(NFS4CLNT_RECALL_ANY_LAYOUT_RW, &clp->cl_state))
+		iomode += IOMODE_RW;
+	/* Note: IOMODE_READ + IOMODE_RW == IOMODE_ANY */
+	if (iomode) {
+		pnfs_layout_return_unused_byclid(clp, iomode);
+		set_bit(NFS4CLNT_RUN_MANAGER, &clp->cl_state);
+	}
+}
 #else /* CONFIG_NFS_V4_1 */
 static int nfs4_reset_session(struct nfs_client *clp) { return 0; }
 
@@ -2531,6 +2546,10 @@ static int nfs4_bind_conn_to_session(struct nfs_client *clp)
 {
 	return 0;
 }
+
+static void nfs4_layoutreturn_any_run(struct nfs_client *clp)
+{
+}
 #endif /* CONFIG_NFS_V4_1 */
 
 static void nfs4_state_manager(struct nfs_client *clp)
@@ -2635,12 +2654,13 @@ static void nfs4_state_manager(struct nfs_client *clp)
 		nfs4_end_drain_session(clp);
 		nfs4_clear_state_manager_bit(clp);
 
-		if (!test_and_set_bit(NFS4CLNT_DELEGRETURN_RUNNING, &clp->cl_state)) {
+		if (!test_and_set_bit(NFS4CLNT_RECALL_RUNNING, &clp->cl_state)) {
 			if (test_and_clear_bit(NFS4CLNT_DELEGRETURN, &clp->cl_state)) {
 				nfs_client_return_marked_delegations(clp);
 				set_bit(NFS4CLNT_RUN_MANAGER, &clp->cl_state);
 			}
-			clear_bit(NFS4CLNT_DELEGRETURN_RUNNING, &clp->cl_state);
+			nfs4_layoutreturn_any_run(clp);
+			clear_bit(NFS4CLNT_RECALL_RUNNING, &clp->cl_state);
 		}
 
 		/* Did we race with an attempt to give us more work? */
diff --git a/fs/nfs/nfs4trace.h b/fs/nfs/nfs4trace.h
index 1e97e5e04cb4..543541173a3d 100644
--- a/fs/nfs/nfs4trace.h
+++ b/fs/nfs/nfs4trace.h
@@ -584,7 +584,9 @@ TRACE_DEFINE_ENUM(NFS4CLNT_MOVED);
 TRACE_DEFINE_ENUM(NFS4CLNT_LEASE_MOVED);
 TRACE_DEFINE_ENUM(NFS4CLNT_DELEGATION_EXPIRED);
 TRACE_DEFINE_ENUM(NFS4CLNT_RUN_MANAGER);
-TRACE_DEFINE_ENUM(NFS4CLNT_DELEGRETURN_RUNNING);
+TRACE_DEFINE_ENUM(NFS4CLNT_RECALL_RUNNING);
+TRACE_DEFINE_ENUM(NFS4CLNT_RECALL_ANY_LAYOUT_READ);
+TRACE_DEFINE_ENUM(NFS4CLNT_RECALL_ANY_LAYOUT_RW);
 
 #define show_nfs4_clp_state(state) \
 	__print_flags(state, "|", \
@@ -605,7 +607,9 @@ TRACE_DEFINE_ENUM(NFS4CLNT_DELEGRETURN_RUNNING);
 		{ NFS4CLNT_LEASE_MOVED,		"LEASE_MOVED" }, \
 		{ NFS4CLNT_DELEGATION_EXPIRED,	"DELEGATION_EXPIRED" }, \
 		{ NFS4CLNT_RUN_MANAGER,		"RUN_MANAGER" }, \
-		{ NFS4CLNT_DELEGRETURN_RUNNING,	"DELEGRETURN_RUNNING" })
+		{ NFS4CLNT_RECALL_RUNNING,	"RECALL_RUNNING" }, \
+		{ NFS4CLNT_RECALL_ANY_LAYOUT_READ, "RECALL_ANY_LAYOUT_READ" }, \
+		{ NFS4CLNT_RECALL_ANY_LAYOUT_RW, "RECALL_ANY_LAYOUT_RW" })
 
 TRACE_EVENT(nfs4_state_mgr,
 		TP_PROTO(
diff --git a/fs/nfs/nfsroot.c b/fs/nfs/nfsroot.c
index effaa4247b91..8d3278805602 100644
--- a/fs/nfs/nfsroot.c
+++ b/fs/nfs/nfsroot.c
@@ -88,7 +88,7 @@
 #define NFS_ROOT		"/tftpboot/%s"
 
 /* Default NFSROOT mount options. */
-#define NFS_DEF_OPTIONS		"vers=2,udp,rsize=4096,wsize=4096"
+#define NFS_DEF_OPTIONS		"vers=2,tcp,rsize=4096,wsize=4096"
 
 /* Parameters passed from the kernel command line */
 static char nfs_root_parms[NFS_MAXPATHLEN + 1] __initdata = "";
diff --git a/fs/nfs/nfstrace.h b/fs/nfs/nfstrace.h
index a9588d19a5ae..7e7a97ae21ed 100644
--- a/fs/nfs/nfstrace.h
+++ b/fs/nfs/nfstrace.h
@@ -181,6 +181,7 @@ DECLARE_EVENT_CLASS(nfs_inode_event_done,
 				int error \
 			), \
 			TP_ARGS(inode, error))
+DEFINE_NFS_INODE_EVENT(nfs_set_inode_stale);
 DEFINE_NFS_INODE_EVENT(nfs_refresh_inode_enter);
 DEFINE_NFS_INODE_EVENT_DONE(nfs_refresh_inode_exit);
 DEFINE_NFS_INODE_EVENT(nfs_revalidate_inode_enter);
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 20b3717cd7ca..f61f96603df7 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -33,9 +33,7 @@ static const struct rpc_call_ops nfs_pgio_common_ops;
 struct nfs_pgio_mirror *
 nfs_pgio_current_mirror(struct nfs_pageio_descriptor *desc)
 {
-	return nfs_pgio_has_mirroring(desc) ?
-		&desc->pg_mirrors[desc->pg_mirror_idx] :
-		&desc->pg_mirrors[0];
+	return &desc->pg_mirrors[desc->pg_mirror_idx];
 }
 EXPORT_SYMBOL_GPL(nfs_pgio_current_mirror);
 
@@ -133,47 +131,166 @@ nfs_async_iocounter_wait(struct rpc_task *task, struct nfs_lock_context *l_ctx)
 EXPORT_SYMBOL_GPL(nfs_async_iocounter_wait);
 
 /*
- * nfs_page_group_lock - lock the head of the page group
- * @req - request in group that is to be locked
+ * nfs_page_lock_head_request - page lock the head of the page group
+ * @req: any member of the page group
+ */
+struct nfs_page *
+nfs_page_group_lock_head(struct nfs_page *req)
+{
+	struct nfs_page *head = req->wb_head;
+
+	while (!nfs_lock_request(head)) {
+		int ret = nfs_wait_on_request(head);
+		if (ret < 0)
+			return ERR_PTR(ret);
+	}
+	if (head != req)
+		kref_get(&head->wb_kref);
+	return head;
+}
+
+/*
+ * nfs_unroll_locks -  unlock all newly locked reqs and wait on @req
+ * @head: head request of page group, must be holding head lock
+ * @req: request that couldn't lock and needs to wait on the req bit lock
  *
- * this lock must be held when traversing or modifying the page
- * group list
+ * This is a helper function for nfs_lock_and_join_requests
+ * returns 0 on success, < 0 on error.
+ */
+static void
+nfs_unroll_locks(struct nfs_page *head, struct nfs_page *req)
+{
+	struct nfs_page *tmp;
+
+	/* relinquish all the locks successfully grabbed this run */
+	for (tmp = head->wb_this_page ; tmp != req; tmp = tmp->wb_this_page) {
+		if (!kref_read(&tmp->wb_kref))
+			continue;
+		nfs_unlock_and_release_request(tmp);
+	}
+}
+
+/*
+ * nfs_page_group_lock_subreq -  try to lock a subrequest
+ * @head: head request of page group
+ * @subreq: request to lock
  *
- * return 0 on success, < 0 on error
+ * This is a helper function for nfs_lock_and_join_requests which
+ * must be called with the head request and page group both locked.
+ * On error, it returns with the page group unlocked.
  */
-int
-nfs_page_group_lock(struct nfs_page *req)
+static int
+nfs_page_group_lock_subreq(struct nfs_page *head, struct nfs_page *subreq)
 {
-	struct nfs_page *head = req->wb_head;
+	int ret;
+
+	if (!kref_get_unless_zero(&subreq->wb_kref))
+		return 0;
+	while (!nfs_lock_request(subreq)) {
+		nfs_page_group_unlock(head);
+		ret = nfs_wait_on_request(subreq);
+		if (!ret)
+			ret = nfs_page_group_lock(head);
+		if (ret < 0) {
+			nfs_unroll_locks(head, subreq);
+			nfs_release_request(subreq);
+			return ret;
+		}
+	}
+	return 0;
+}
+
+/*
+ * nfs_page_group_lock_subrequests -  try to lock the subrequests
+ * @head: head request of page group
+ *
+ * This is a helper function for nfs_lock_and_join_requests which
+ * must be called with the head request locked.
+ */
+int nfs_page_group_lock_subrequests(struct nfs_page *head)
+{
+	struct nfs_page *subreq;
+	int ret;
 
-	WARN_ON_ONCE(head != head->wb_head);
+	ret = nfs_page_group_lock(head);
+	if (ret < 0)
+		return ret;
+	/* lock each request in the page group */
+	for (subreq = head->wb_this_page; subreq != head;
+			subreq = subreq->wb_this_page) {
+		ret = nfs_page_group_lock_subreq(head, subreq);
+		if (ret < 0)
+			return ret;
+	}
+	nfs_page_group_unlock(head);
+	return 0;
+}
 
-	if (!test_and_set_bit(PG_HEADLOCK, &head->wb_flags))
+/*
+ * nfs_page_set_headlock - set the request PG_HEADLOCK
+ * @req: request that is to be locked
+ *
+ * this lock must be held when modifying req->wb_head
+ *
+ * return 0 on success, < 0 on error
+ */
+int
+nfs_page_set_headlock(struct nfs_page *req)
+{
+	if (!test_and_set_bit(PG_HEADLOCK, &req->wb_flags))
 		return 0;
 
-	set_bit(PG_CONTENDED1, &head->wb_flags);
+	set_bit(PG_CONTENDED1, &req->wb_flags);
 	smp_mb__after_atomic();
-	return wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
+	return wait_on_bit_lock(&req->wb_flags, PG_HEADLOCK,
 				TASK_UNINTERRUPTIBLE);
 }
 
 /*
- * nfs_page_group_unlock - unlock the head of the page group
- * @req - request in group that is to be unlocked
+ * nfs_page_clear_headlock - clear the request PG_HEADLOCK
+ * @req: request that is to be locked
  */
 void
-nfs_page_group_unlock(struct nfs_page *req)
+nfs_page_clear_headlock(struct nfs_page *req)
 {
-	struct nfs_page *head = req->wb_head;
-
-	WARN_ON_ONCE(head != head->wb_head);
-
 	smp_mb__before_atomic();
-	clear_bit(PG_HEADLOCK, &head->wb_flags);
+	clear_bit(PG_HEADLOCK, &req->wb_flags);
 	smp_mb__after_atomic();
-	if (!test_bit(PG_CONTENDED1, &head->wb_flags))
+	if (!test_bit(PG_CONTENDED1, &req->wb_flags))
 		return;
-	wake_up_bit(&head->wb_flags, PG_HEADLOCK);
+	wake_up_bit(&req->wb_flags, PG_HEADLOCK);
+}
+
+/*
+ * nfs_page_group_lock - lock the head of the page group
+ * @req: request in group that is to be locked
+ *
+ * this lock must be held when traversing or modifying the page
+ * group list
+ *
+ * return 0 on success, < 0 on error
+ */
+int
+nfs_page_group_lock(struct nfs_page *req)
+{
+	int ret;
+
+	ret = nfs_page_set_headlock(req);
+	if (ret || req->wb_head == req)
+		return ret;
+	return nfs_page_set_headlock(req->wb_head);
+}
+
+/*
+ * nfs_page_group_unlock - unlock the head of the page group
+ * @req: request in group that is to be unlocked
+ */
+void
+nfs_page_group_unlock(struct nfs_page *req)
+{
+	if (req != req->wb_head)
+		nfs_page_clear_headlock(req->wb_head);
+	nfs_page_clear_headlock(req);
 }
 
 /*
@@ -359,15 +476,23 @@ nfs_create_request(struct nfs_open_context *ctx, struct page *page,
 }
 
 static struct nfs_page *
-nfs_create_subreq(struct nfs_page *req, struct nfs_page *last,
-		  unsigned int pgbase, unsigned int offset,
+nfs_create_subreq(struct nfs_page *req,
+		  unsigned int pgbase,
+		  unsigned int offset,
 		  unsigned int count)
 {
+	struct nfs_page *last;
 	struct nfs_page *ret;
 
 	ret = __nfs_create_request(req->wb_lock_context, req->wb_page,
 			pgbase, offset, count);
 	if (!IS_ERR(ret)) {
+		/* find the last request */
+		for (last = req->wb_head;
+		     last->wb_this_page != req->wb_head;
+		     last = last->wb_this_page)
+			;
+
 		nfs_lock_request(ret);
 		ret->wb_index = req->wb_index;
 		nfs_page_group_init(ret, last);
@@ -627,9 +752,8 @@ int nfs_initiate_pgio(struct rpc_clnt *clnt, struct nfs_pgio_header *hdr,
 		.callback_ops = call_ops,
 		.callback_data = hdr,
 		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC | flags,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF | flags,
 	};
-	int ret = 0;
 
 	hdr->rw_ops->rw_initiate(hdr, &msg, rpc_ops, &task_setup_data, how);
 
@@ -641,18 +765,10 @@ int nfs_initiate_pgio(struct rpc_clnt *clnt, struct nfs_pgio_header *hdr,
 		(unsigned long long)hdr->args.offset);
 
 	task = rpc_run_task(&task_setup_data);
-	if (IS_ERR(task)) {
-		ret = PTR_ERR(task);
-		goto out;
-	}
-	if (how & FLUSH_SYNC) {
-		ret = rpc_wait_for_completion_task(task);
-		if (ret == 0)
-			ret = task->tk_status;
-	}
+	if (IS_ERR(task))
+		return PTR_ERR(task);
 	rpc_put_task(task);
-out:
-	return ret;
+	return 0;
 }
 EXPORT_SYMBOL_GPL(nfs_initiate_pgio);
 
@@ -886,15 +1002,6 @@ static void nfs_pageio_setup_mirroring(struct nfs_pageio_descriptor *pgio,
 	pgio->pg_mirror_count = mirror_count;
 }
 
-/*
- * nfs_pageio_stop_mirroring - stop using mirroring (set mirror count to 1)
- */
-void nfs_pageio_stop_mirroring(struct nfs_pageio_descriptor *pgio)
-{
-	pgio->pg_mirror_count = 1;
-	pgio->pg_mirror_idx = 0;
-}
-
 static void nfs_pageio_cleanup_mirroring(struct nfs_pageio_descriptor *pgio)
 {
 	pgio->pg_mirror_count = 1;
@@ -911,7 +1018,7 @@ static bool nfs_match_lock_context(const struct nfs_lock_context *l1,
 }
 
 /**
- * nfs_can_coalesce_requests - test two requests for compatibility
+ * nfs_coalesce_size - test two requests for compatibility
  * @prev: pointer to nfs_page
  * @req: pointer to nfs_page
  * @pgio: pointer to nfs_pagio_descriptor
@@ -920,41 +1027,36 @@ static bool nfs_match_lock_context(const struct nfs_lock_context *l1,
  * page data area they describe is contiguous, and that their RPC
  * credentials, NFSv4 open state, and lockowners are the same.
  *
- * Return 'true' if this is the case, else return 'false'.
+ * Returns size of the request that can be coalesced
  */
-static bool nfs_can_coalesce_requests(struct nfs_page *prev,
+static unsigned int nfs_coalesce_size(struct nfs_page *prev,
 				      struct nfs_page *req,
 				      struct nfs_pageio_descriptor *pgio)
 {
-	size_t size;
 	struct file_lock_context *flctx;
 
 	if (prev) {
 		if (!nfs_match_open_context(nfs_req_openctx(req), nfs_req_openctx(prev)))
-			return false;
+			return 0;
 		flctx = d_inode(nfs_req_openctx(req)->dentry)->i_flctx;
 		if (flctx != NULL &&
 		    !(list_empty_careful(&flctx->flc_posix) &&
 		      list_empty_careful(&flctx->flc_flock)) &&
 		    !nfs_match_lock_context(req->wb_lock_context,
 					    prev->wb_lock_context))
-			return false;
+			return 0;
 		if (req_offset(req) != req_offset(prev) + prev->wb_bytes)
-			return false;
+			return 0;
 		if (req->wb_page == prev->wb_page) {
 			if (req->wb_pgbase != prev->wb_pgbase + prev->wb_bytes)
-				return false;
+				return 0;
 		} else {
 			if (req->wb_pgbase != 0 ||
 			    prev->wb_pgbase + prev->wb_bytes != PAGE_SIZE)
-				return false;
+				return 0;
 		}
 	}
-	size = pgio->pg_ops->pg_test(pgio, prev, req);
-	WARN_ON_ONCE(size > req->wb_bytes);
-	if (size && size < req->wb_bytes)
-		req->wb_bytes = size;
-	return size > 0;
+	return pgio->pg_ops->pg_test(pgio, prev, req);
 }
 
 /**
@@ -962,15 +1064,16 @@ static bool nfs_can_coalesce_requests(struct nfs_page *prev,
  * @desc: destination io descriptor
  * @req: request
  *
- * Returns true if the request 'req' was successfully coalesced into the
- * existing list of pages 'desc'.
+ * If the request 'req' was successfully coalesced into the existing list
+ * of pages 'desc', it returns the size of req.
  */
-static int nfs_pageio_do_add_request(struct nfs_pageio_descriptor *desc,
-				     struct nfs_page *req)
+static unsigned int
+nfs_pageio_do_add_request(struct nfs_pageio_descriptor *desc,
+		struct nfs_page *req)
 {
 	struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
-
 	struct nfs_page *prev = NULL;
+	unsigned int size;
 
 	if (mirror->pg_count != 0) {
 		prev = nfs_list_entry(mirror->pg_list.prev);
@@ -990,11 +1093,12 @@ static int nfs_pageio_do_add_request(struct nfs_pageio_descriptor *desc,
 		return 0;
 	}
 
-	if (!nfs_can_coalesce_requests(prev, req, desc))
-		return 0;
+	size = nfs_coalesce_size(prev, req, desc);
+	if (size < req->wb_bytes)
+		return size;
 	nfs_list_move_request(req, &mirror->pg_list);
 	mirror->pg_count += req->wb_bytes;
-	return 1;
+	return req->wb_bytes;
 }
 
 /*
@@ -1034,7 +1138,8 @@ nfs_pageio_cleanup_request(struct nfs_pageio_descriptor *desc,
  * @req: request
  *
  * This may split a request into subrequests which are all part of the
- * same page group.
+ * same page group. If so, it will submit @req as the last one, to ensure
+ * the pointer to @req is still valid in case of failure.
  *
  * Returns true if the request 'req' was successfully coalesced into the
  * existing list of pages 'desc'.
@@ -1043,51 +1148,50 @@ static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 			   struct nfs_page *req)
 {
 	struct nfs_pgio_mirror *mirror = nfs_pgio_current_mirror(desc);
-
 	struct nfs_page *subreq;
-	unsigned int bytes_left = 0;
-	unsigned int offset, pgbase;
+	unsigned int size, subreq_size;
 
 	nfs_page_group_lock(req);
 
 	subreq = req;
-	bytes_left = subreq->wb_bytes;
-	offset = subreq->wb_offset;
-	pgbase = subreq->wb_pgbase;
-
-	do {
-		if (!nfs_pageio_do_add_request(desc, subreq)) {
-			/* make sure pg_test call(s) did nothing */
-			WARN_ON_ONCE(subreq->wb_bytes != bytes_left);
-			WARN_ON_ONCE(subreq->wb_offset != offset);
-			WARN_ON_ONCE(subreq->wb_pgbase != pgbase);
-
+	subreq_size = subreq->wb_bytes;
+	for(;;) {
+		size = nfs_pageio_do_add_request(desc, subreq);
+		if (size == subreq_size) {
+			/* We successfully submitted a request */
+			if (subreq == req)
+				break;
+			req->wb_pgbase += size;
+			req->wb_bytes -= size;
+			req->wb_offset += size;
+			subreq_size = req->wb_bytes;
+			subreq = req;
+			continue;
+		}
+		if (WARN_ON_ONCE(subreq != req)) {
+			nfs_page_group_unlock(req);
+			nfs_pageio_cleanup_request(desc, subreq);
+			subreq = req;
+			subreq_size = req->wb_bytes;
+			nfs_page_group_lock(req);
+		}
+		if (!size) {
+			/* Can't coalesce any more, so do I/O */
 			nfs_page_group_unlock(req);
 			desc->pg_moreio = 1;
 			nfs_pageio_doio(desc);
 			if (desc->pg_error < 0 || mirror->pg_recoalesce)
-				goto out_cleanup_subreq;
+				return 0;
 			/* retry add_request for this subreq */
 			nfs_page_group_lock(req);
 			continue;
 		}
-
-		/* check for buggy pg_test call(s) */
-		WARN_ON_ONCE(subreq->wb_bytes + subreq->wb_pgbase > PAGE_SIZE);
-		WARN_ON_ONCE(subreq->wb_bytes > bytes_left);
-		WARN_ON_ONCE(subreq->wb_bytes == 0);
-
-		bytes_left -= subreq->wb_bytes;
-		offset += subreq->wb_bytes;
-		pgbase += subreq->wb_bytes;
-
-		if (bytes_left) {
-			subreq = nfs_create_subreq(req, subreq, pgbase,
-					offset, bytes_left);
-			if (IS_ERR(subreq))
-				goto err_ptr;
-		}
-	} while (bytes_left > 0);
+		subreq = nfs_create_subreq(req, req->wb_pgbase,
+				req->wb_offset, size);
+		if (IS_ERR(subreq))
+			goto err_ptr;
+		subreq_size = size;
+	}
 
 	nfs_page_group_unlock(req);
 	return 1;
@@ -1095,10 +1199,6 @@ err_ptr:
 	desc->pg_error = PTR_ERR(subreq);
 	nfs_page_group_unlock(req);
 	return 0;
-out_cleanup_subreq:
-	if (req != subreq)
-		nfs_pageio_cleanup_request(desc, subreq);
-	return 0;
 }
 
 static int nfs_do_recoalesce(struct nfs_pageio_descriptor *desc)
@@ -1167,7 +1267,7 @@ int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 {
 	u32 midx;
 	unsigned int pgbase, offset, bytes;
-	struct nfs_page *dupreq, *lastreq;
+	struct nfs_page *dupreq;
 
 	pgbase = req->wb_pgbase;
 	offset = req->wb_offset;
@@ -1177,38 +1277,32 @@ int nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 	if (desc->pg_error < 0)
 		goto out_failed;
 
-	for (midx = 0; midx < desc->pg_mirror_count; midx++) {
-		if (midx) {
-			nfs_page_group_lock(req);
+	/* Create the mirror instances first, and fire them off */
+	for (midx = 1; midx < desc->pg_mirror_count; midx++) {
+		nfs_page_group_lock(req);
 
-			/* find the last request */
-			for (lastreq = req->wb_head;
-			     lastreq->wb_this_page != req->wb_head;
-			     lastreq = lastreq->wb_this_page)
-				;
+		dupreq = nfs_create_subreq(req,
+				pgbase, offset, bytes);
 
-			dupreq = nfs_create_subreq(req, lastreq,
-					pgbase, offset, bytes);
-
-			nfs_page_group_unlock(req);
-			if (IS_ERR(dupreq)) {
-				desc->pg_error = PTR_ERR(dupreq);
-				goto out_failed;
-			}
-		} else
-			dupreq = req;
+		nfs_page_group_unlock(req);
+		if (IS_ERR(dupreq)) {
+			desc->pg_error = PTR_ERR(dupreq);
+			goto out_failed;
+		}
 
-		if (nfs_pgio_has_mirroring(desc))
-			desc->pg_mirror_idx = midx;
+		desc->pg_mirror_idx = midx;
 		if (!nfs_pageio_add_request_mirror(desc, dupreq))
 			goto out_cleanup_subreq;
 	}
 
+	desc->pg_mirror_idx = 0;
+	if (!nfs_pageio_add_request_mirror(desc, req))
+		goto out_failed;
+
 	return 1;
 
 out_cleanup_subreq:
-	if (req != dupreq)
-		nfs_pageio_cleanup_request(desc, dupreq);
+	nfs_pageio_cleanup_request(desc, dupreq);
 out_failed:
 	nfs_pageio_error_cleanup(desc);
 	return 0;
@@ -1226,8 +1320,7 @@ static void nfs_pageio_complete_mirror(struct nfs_pageio_descriptor *desc,
 	struct nfs_pgio_mirror *mirror = &desc->pg_mirrors[mirror_idx];
 	u32 restore_idx = desc->pg_mirror_idx;
 
-	if (nfs_pgio_has_mirroring(desc))
-		desc->pg_mirror_idx = mirror_idx;
+	desc->pg_mirror_idx = mirror_idx;
 	for (;;) {
 		nfs_pageio_doio(desc);
 		if (desc->pg_error < 0 || !mirror->pg_recoalesce)
@@ -1320,6 +1413,14 @@ void nfs_pageio_cond_complete(struct nfs_pageio_descriptor *desc, pgoff_t index)
 	}
 }
 
+/*
+ * nfs_pageio_stop_mirroring - stop using mirroring (set mirror count to 1)
+ */
+void nfs_pageio_stop_mirroring(struct nfs_pageio_descriptor *pgio)
+{
+	nfs_pageio_complete(pgio);
+}
+
 int __init nfs_init_nfspagecache(void)
 {
 	nfs_page_cachep = kmem_cache_create("nfs_page",
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 542ea8dfd1bc..f2dc35c22964 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -268,11 +268,11 @@ pnfs_free_layout_hdr(struct pnfs_layout_hdr *lo)
 	struct nfs_server *server = NFS_SERVER(lo->plh_inode);
 	struct pnfs_layoutdriver_type *ld = server->pnfs_curr_ld;
 
-	if (!list_empty(&lo->plh_layouts)) {
+	if (test_and_clear_bit(NFS_LAYOUT_HASHED, &lo->plh_flags)) {
 		struct nfs_client *clp = server->nfs_client;
 
 		spin_lock(&clp->cl_lock);
-		list_del_init(&lo->plh_layouts);
+		list_del_rcu(&lo->plh_layouts);
 		spin_unlock(&clp->cl_lock);
 	}
 	put_cred(lo->plh_lc_cred);
@@ -309,6 +309,16 @@ pnfs_put_layout_hdr(struct pnfs_layout_hdr *lo)
 	}
 }
 
+static struct inode *
+pnfs_grab_inode_layout_hdr(struct pnfs_layout_hdr *lo)
+{
+	struct inode *inode = igrab(lo->plh_inode);
+	if (inode)
+		return inode;
+	set_bit(NFS_LAYOUT_INODE_FREEING, &lo->plh_flags);
+	return NULL;
+}
+
 static void
 pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode,
 			 u32 seq)
@@ -496,6 +506,7 @@ pnfs_init_lseg(struct pnfs_layout_hdr *lo, struct pnfs_layout_segment *lseg,
 {
 	INIT_LIST_HEAD(&lseg->pls_list);
 	INIT_LIST_HEAD(&lseg->pls_lc_list);
+	INIT_LIST_HEAD(&lseg->pls_commits);
 	refcount_set(&lseg->pls_refcount, 1);
 	set_bit(NFS_LSEG_VALID, &lseg->pls_flags);
 	lseg->pls_layout = lo;
@@ -782,9 +793,10 @@ pnfs_layout_bulk_destroy_byserver_locked(struct nfs_client *clp,
 		/* If the sb is being destroyed, just bail */
 		if (!nfs_sb_active(server->super))
 			break;
-		inode = igrab(lo->plh_inode);
+		inode = pnfs_grab_inode_layout_hdr(lo);
 		if (inode != NULL) {
-			list_del_init(&lo->plh_layouts);
+			if (test_and_clear_bit(NFS_LAYOUT_HASHED, &lo->plh_flags))
+				list_del_rcu(&lo->plh_layouts);
 			if (pnfs_layout_add_bulk_destroy_list(inode,
 						layout_list))
 				continue;
@@ -794,7 +806,6 @@ pnfs_layout_bulk_destroy_byserver_locked(struct nfs_client *clp,
 		} else {
 			rcu_read_unlock();
 			spin_unlock(&clp->cl_lock);
-			set_bit(NFS_LAYOUT_INODE_FREEING, &lo->plh_flags);
 		}
 		nfs_sb_deactive(server->super);
 		spin_lock(&clp->cl_lock);
@@ -903,10 +914,21 @@ pnfs_destroy_all_layouts(struct nfs_client *clp)
 	pnfs_destroy_layouts_byclid(clp, false);
 }
 
+static void
+pnfs_set_layout_cred(struct pnfs_layout_hdr *lo, const struct cred *cred)
+{
+	const struct cred *old;
+
+	if (cred && cred_fscmp(lo->plh_lc_cred, cred) != 0) {
+		old = xchg(&lo->plh_lc_cred, get_cred(cred));
+		put_cred(old);
+	}
+}
+
 /* update lo->plh_stateid with new if is more recent */
 void
 pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
-			bool update_barrier)
+			const struct cred *cred, bool update_barrier)
 {
 	u32 oldseq, newseq, new_barrier = 0;
 
@@ -914,6 +936,7 @@ pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
 	newseq = be32_to_cpu(new->seqid);
 
 	if (!pnfs_layout_is_valid(lo)) {
+		pnfs_set_layout_cred(lo, cred);
 		nfs4_stateid_copy(&lo->plh_stateid, new);
 		lo->plh_barrier = newseq;
 		pnfs_clear_layoutreturn_info(lo);
@@ -1061,7 +1084,7 @@ pnfs_alloc_init_layoutget_args(struct inode *ino,
 	lgp->args.ctx = get_nfs_open_context(ctx);
 	nfs4_stateid_copy(&lgp->args.stateid, stateid);
 	lgp->gfp_flags = gfp_flags;
-	lgp->cred = get_cred(ctx->cred);
+	lgp->cred = ctx->cred;
 	return lgp;
 }
 
@@ -1072,7 +1095,6 @@ void pnfs_layoutget_free(struct nfs4_layoutget *lgp)
 	nfs4_free_pages(lgp->args.layout.pages, max_pages);
 	if (lgp->args.inode)
 		pnfs_put_layout_hdr(NFS_I(lgp->args.inode)->layout);
-	put_cred(lgp->cred);
 	put_nfs_open_context(lgp->args.ctx);
 	kfree(lgp);
 }
@@ -1109,7 +1131,7 @@ void pnfs_layoutreturn_free_lsegs(struct pnfs_layout_hdr *lo,
 
 		pnfs_mark_matching_lsegs_invalid(lo, &freeme, range, seq);
 		pnfs_free_returned_lsegs(lo, &freeme, range, seq);
-		pnfs_set_layout_stateid(lo, stateid, true);
+		pnfs_set_layout_stateid(lo, stateid, NULL, true);
 	} else
 		pnfs_mark_layout_stateid_invalid(lo, &freeme);
 out_unlock:
@@ -1122,6 +1144,7 @@ out_unlock:
 static bool
 pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo,
 		nfs4_stateid *stateid,
+		const struct cred **cred,
 		enum pnfs_iomode *iomode)
 {
 	/* Serialise LAYOUTGET/LAYOUTRETURN */
@@ -1132,18 +1155,17 @@ pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo,
 	set_bit(NFS_LAYOUT_RETURN, &lo->plh_flags);
 	pnfs_get_layout_hdr(lo);
 	if (test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags)) {
-		if (stateid != NULL) {
-			nfs4_stateid_copy(stateid, &lo->plh_stateid);
-			if (lo->plh_return_seq != 0)
-				stateid->seqid = cpu_to_be32(lo->plh_return_seq);
-		}
+		nfs4_stateid_copy(stateid, &lo->plh_stateid);
+		*cred = get_cred(lo->plh_lc_cred);
+		if (lo->plh_return_seq != 0)
+			stateid->seqid = cpu_to_be32(lo->plh_return_seq);
 		if (iomode != NULL)
 			*iomode = lo->plh_return_iomode;
 		pnfs_clear_layoutreturn_info(lo);
 		return true;
 	}
-	if (stateid != NULL)
-		nfs4_stateid_copy(stateid, &lo->plh_stateid);
+	nfs4_stateid_copy(stateid, &lo->plh_stateid);
+	*cred = get_cred(lo->plh_lc_cred);
 	if (iomode != NULL)
 		*iomode = IOMODE_ANY;
 	return true;
@@ -1167,20 +1189,26 @@ pnfs_init_layoutreturn_args(struct nfs4_layoutreturn_args *args,
 }
 
 static int
-pnfs_send_layoutreturn(struct pnfs_layout_hdr *lo, const nfs4_stateid *stateid,
-		       enum pnfs_iomode iomode, bool sync)
+pnfs_send_layoutreturn(struct pnfs_layout_hdr *lo,
+		       const nfs4_stateid *stateid,
+		       const struct cred **pcred,
+		       enum pnfs_iomode iomode,
+		       bool sync)
 {
 	struct inode *ino = lo->plh_inode;
 	struct pnfs_layoutdriver_type *ld = NFS_SERVER(ino)->pnfs_curr_ld;
 	struct nfs4_layoutreturn *lrp;
+	const struct cred *cred = *pcred;
 	int status = 0;
 
+	*pcred = NULL;
 	lrp = kzalloc(sizeof(*lrp), GFP_NOFS);
 	if (unlikely(lrp == NULL)) {
 		status = -ENOMEM;
 		spin_lock(&ino->i_lock);
 		pnfs_clear_layoutreturn_waitbit(lo);
 		spin_unlock(&ino->i_lock);
+		put_cred(cred);
 		pnfs_put_layout_hdr(lo);
 		goto out;
 	}
@@ -1188,7 +1216,7 @@ pnfs_send_layoutreturn(struct pnfs_layout_hdr *lo, const nfs4_stateid *stateid,
 	pnfs_init_layoutreturn_args(&lrp->args, lo, stateid, iomode);
 	lrp->args.ld_private = &lrp->ld_private;
 	lrp->clp = NFS_SERVER(ino)->nfs_client;
-	lrp->cred = lo->plh_lc_cred;
+	lrp->cred = cred;
 	if (ld->prepare_layoutreturn)
 		ld->prepare_layoutreturn(&lrp->args);
 
@@ -1233,15 +1261,16 @@ static void pnfs_layoutreturn_before_put_layout_hdr(struct pnfs_layout_hdr *lo)
 		return;
 	spin_lock(&inode->i_lock);
 	if (pnfs_layout_need_return(lo)) {
+		const struct cred *cred;
 		nfs4_stateid stateid;
 		enum pnfs_iomode iomode;
 		bool send;
 
-		send = pnfs_prepare_layoutreturn(lo, &stateid, &iomode);
+		send = pnfs_prepare_layoutreturn(lo, &stateid, &cred, &iomode);
 		spin_unlock(&inode->i_lock);
 		if (send) {
 			/* Send an async layoutreturn so we dont deadlock */
-			pnfs_send_layoutreturn(lo, &stateid, iomode, false);
+			pnfs_send_layoutreturn(lo, &stateid, &cred, iomode, false);
 		}
 	} else
 		spin_unlock(&inode->i_lock);
@@ -1261,6 +1290,7 @@ _pnfs_return_layout(struct inode *ino)
 	struct pnfs_layout_hdr *lo = NULL;
 	struct nfs_inode *nfsi = NFS_I(ino);
 	LIST_HEAD(tmp_list);
+	const struct cred *cred;
 	nfs4_stateid stateid;
 	int status = 0;
 	bool send, valid_layout;
@@ -1305,10 +1335,10 @@ _pnfs_return_layout(struct inode *ino)
 		goto out_put_layout_hdr;
 	}
 
-	send = pnfs_prepare_layoutreturn(lo, &stateid, NULL);
+	send = pnfs_prepare_layoutreturn(lo, &stateid, &cred, NULL);
 	spin_unlock(&ino->i_lock);
 	if (send)
-		status = pnfs_send_layoutreturn(lo, &stateid, IOMODE_ANY, true);
+		status = pnfs_send_layoutreturn(lo, &stateid, &cred, IOMODE_ANY, true);
 out_put_layout_hdr:
 	pnfs_free_lseg_list(&tmp_list);
 	pnfs_put_layout_hdr(lo);
@@ -1354,6 +1384,7 @@ bool pnfs_roc(struct inode *ino,
 	struct nfs4_state *state;
 	struct pnfs_layout_hdr *lo;
 	struct pnfs_layout_segment *lseg, *next;
+	const struct cred *lc_cred;
 	nfs4_stateid stateid;
 	enum pnfs_iomode iomode = 0;
 	bool layoutreturn = false, roc = false;
@@ -1423,16 +1454,20 @@ retry:
 	 * 2. we don't send layoutreturn
 	 */
 	/* lo ref dropped in pnfs_roc_release() */
-	layoutreturn = pnfs_prepare_layoutreturn(lo, &stateid, &iomode);
+	layoutreturn = pnfs_prepare_layoutreturn(lo, &stateid, &lc_cred, &iomode);
 	/* If the creds don't match, we can't compound the layoutreturn */
-	if (!layoutreturn || cred_fscmp(cred, lo->plh_lc_cred) != 0)
+	if (!layoutreturn)
 		goto out_noroc;
+	if (cred_fscmp(cred, lc_cred) != 0)
+		goto out_noroc_put_cred;
 
 	roc = layoutreturn;
 	pnfs_init_layoutreturn_args(args, lo, &stateid, iomode);
 	res->lrs_present = 0;
 	layoutreturn = false;
 
+out_noroc_put_cred:
+	put_cred(lc_cred);
 out_noroc:
 	spin_unlock(&ino->i_lock);
 	rcu_read_unlock();
@@ -1445,7 +1480,7 @@ out_noroc:
 		return true;
 	}
 	if (layoutreturn)
-		pnfs_send_layoutreturn(lo, &stateid, iomode, true);
+		pnfs_send_layoutreturn(lo, &stateid, &lc_cred, iomode, true);
 	pnfs_put_layout_hdr(lo);
 	return false;
 }
@@ -1859,15 +1894,14 @@ static void pnfs_clear_first_layoutget(struct pnfs_layout_hdr *lo)
 static void _add_to_server_list(struct pnfs_layout_hdr *lo,
 				struct nfs_server *server)
 {
-	if (list_empty(&lo->plh_layouts)) {
+	if (!test_and_set_bit(NFS_LAYOUT_HASHED, &lo->plh_flags)) {
 		struct nfs_client *clp = server->nfs_client;
 
 		/* The lo must be on the clp list if there is any
 		 * chance of a CB_LAYOUTRECALL(FILE) coming in.
 		 */
 		spin_lock(&clp->cl_lock);
-		if (list_empty(&lo->plh_layouts))
-			list_add_tail(&lo->plh_layouts, &server->layouts);
+		list_add_tail_rcu(&lo->plh_layouts, &server->layouts);
 		spin_unlock(&clp->cl_lock);
 	}
 }
@@ -2323,14 +2357,14 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
 
 	if (!pnfs_layout_is_valid(lo)) {
 		/* We have a completely new layout */
-		pnfs_set_layout_stateid(lo, &res->stateid, true);
+		pnfs_set_layout_stateid(lo, &res->stateid, lgp->cred, true);
 	} else if (nfs4_stateid_match_other(&lo->plh_stateid, &res->stateid)) {
 		/* existing state ID, make sure the sequence number matches. */
 		if (pnfs_layout_stateid_blocked(lo, &res->stateid)) {
 			dprintk("%s forget reply due to sequence\n", __func__);
 			goto out_forget;
 		}
-		pnfs_set_layout_stateid(lo, &res->stateid, false);
+		pnfs_set_layout_stateid(lo, &res->stateid, lgp->cred, false);
 	} else {
 		/*
 		 * We got an entirely new state ID.  Mark all segments for the
@@ -2423,43 +2457,159 @@ pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
 	return -ENOENT;
 }
 
-void pnfs_error_mark_layout_for_return(struct inode *inode,
-				       struct pnfs_layout_segment *lseg)
+static void
+pnfs_mark_layout_for_return(struct inode *inode,
+			    const struct pnfs_layout_range *range)
 {
-	struct pnfs_layout_hdr *lo = NFS_I(inode)->layout;
-	struct pnfs_layout_range range = {
-		.iomode = lseg->pls_range.iomode,
-		.offset = 0,
-		.length = NFS4_MAX_UINT64,
-	};
+	struct pnfs_layout_hdr *lo;
 	bool return_now = false;
 
 	spin_lock(&inode->i_lock);
+	lo = NFS_I(inode)->layout;
 	if (!pnfs_layout_is_valid(lo)) {
 		spin_unlock(&inode->i_lock);
 		return;
 	}
-	pnfs_set_plh_return_info(lo, range.iomode, 0);
+	pnfs_set_plh_return_info(lo, range->iomode, 0);
 	/*
 	 * mark all matching lsegs so that we are sure to have no live
 	 * segments at hand when sending layoutreturn. See pnfs_put_lseg()
 	 * for how it works.
 	 */
-	if (pnfs_mark_matching_lsegs_return(lo, &lo->plh_return_segs, &range, 0) != -EBUSY) {
+	if (pnfs_mark_matching_lsegs_return(lo, &lo->plh_return_segs, range, 0) != -EBUSY) {
+		const struct cred *cred;
 		nfs4_stateid stateid;
 		enum pnfs_iomode iomode;
 
-		return_now = pnfs_prepare_layoutreturn(lo, &stateid, &iomode);
+		return_now = pnfs_prepare_layoutreturn(lo, &stateid, &cred, &iomode);
 		spin_unlock(&inode->i_lock);
 		if (return_now)
-			pnfs_send_layoutreturn(lo, &stateid, iomode, false);
+			pnfs_send_layoutreturn(lo, &stateid, &cred, iomode, false);
 	} else {
 		spin_unlock(&inode->i_lock);
 		nfs_commit_inode(inode, 0);
 	}
 }
+
+void pnfs_error_mark_layout_for_return(struct inode *inode,
+				       struct pnfs_layout_segment *lseg)
+{
+	struct pnfs_layout_range range = {
+		.iomode = lseg->pls_range.iomode,
+		.offset = 0,
+		.length = NFS4_MAX_UINT64,
+	};
+
+	pnfs_mark_layout_for_return(inode, &range);
+}
 EXPORT_SYMBOL_GPL(pnfs_error_mark_layout_for_return);
 
+static bool
+pnfs_layout_can_be_returned(struct pnfs_layout_hdr *lo)
+{
+	return pnfs_layout_is_valid(lo) &&
+		!test_bit(NFS_LAYOUT_INODE_FREEING, &lo->plh_flags) &&
+		!test_bit(NFS_LAYOUT_RETURN, &lo->plh_flags);
+}
+
+static struct pnfs_layout_segment *
+pnfs_find_first_lseg(struct pnfs_layout_hdr *lo,
+		     const struct pnfs_layout_range *range,
+		     enum pnfs_iomode iomode)
+{
+	struct pnfs_layout_segment *lseg;
+
+	list_for_each_entry(lseg, &lo->plh_segs, pls_list) {
+		if (!test_bit(NFS_LSEG_VALID, &lseg->pls_flags))
+			continue;
+		if (test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags))
+			continue;
+		if (lseg->pls_range.iomode != iomode && iomode != IOMODE_ANY)
+			continue;
+		if (pnfs_lseg_range_intersecting(&lseg->pls_range, range))
+			return lseg;
+	}
+	return NULL;
+}
+
+/* Find open file states whose mode matches that of the range */
+static bool
+pnfs_should_return_unused_layout(struct pnfs_layout_hdr *lo,
+				 const struct pnfs_layout_range *range)
+{
+	struct list_head *head;
+	struct nfs_open_context *ctx;
+	fmode_t mode = 0;
+
+	if (!pnfs_layout_can_be_returned(lo) ||
+	    !pnfs_find_first_lseg(lo, range, range->iomode))
+		return false;
+
+	head = &NFS_I(lo->plh_inode)->open_files;
+	list_for_each_entry_rcu(ctx, head, list) {
+		if (ctx->state)
+			mode |= ctx->state->state & (FMODE_READ|FMODE_WRITE);
+	}
+
+	switch (range->iomode) {
+	default:
+		break;
+	case IOMODE_READ:
+		mode &= ~FMODE_WRITE;
+		break;
+	case IOMODE_RW:
+		if (pnfs_find_first_lseg(lo, range, IOMODE_READ))
+			mode &= ~FMODE_READ;
+	}
+	return mode == 0;
+}
+
+static int
+pnfs_layout_return_unused_byserver(struct nfs_server *server, void *data)
+{
+	const struct pnfs_layout_range *range = data;
+	struct pnfs_layout_hdr *lo;
+	struct inode *inode;
+restart:
+	rcu_read_lock();
+	list_for_each_entry_rcu(lo, &server->layouts, plh_layouts) {
+		if (!pnfs_layout_can_be_returned(lo) ||
+		    test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags))
+			continue;
+		inode = lo->plh_inode;
+		spin_lock(&inode->i_lock);
+		if (!pnfs_should_return_unused_layout(lo, range)) {
+			spin_unlock(&inode->i_lock);
+			continue;
+		}
+		spin_unlock(&inode->i_lock);
+		inode = pnfs_grab_inode_layout_hdr(lo);
+		if (!inode)
+			continue;
+		rcu_read_unlock();
+		pnfs_mark_layout_for_return(inode, range);
+		iput(inode);
+		cond_resched();
+		goto restart;
+	}
+	rcu_read_unlock();
+	return 0;
+}
+
+void
+pnfs_layout_return_unused_byclid(struct nfs_client *clp,
+				 enum pnfs_iomode iomode)
+{
+	struct pnfs_layout_range range = {
+		.iomode = iomode,
+		.offset = 0,
+		.length = NFS4_MAX_UINT64,
+	};
+
+	nfs_client_for_each_server(clp, pnfs_layout_return_unused_byserver,
+			&range);
+}
+
 void
 pnfs_generic_pg_check_layout(struct nfs_pageio_descriptor *pgio)
 {
@@ -2475,7 +2625,7 @@ EXPORT_SYMBOL_GPL(pnfs_generic_pg_check_layout);
  * Check for any intersection between the request and the pgio->pg_lseg,
  * and if none, put this pgio->pg_lseg away.
  */
-static void
+void
 pnfs_generic_pg_check_range(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 {
 	if (pgio->pg_lseg && !pnfs_lseg_request_intersecting(pgio->pg_lseg, req)) {
@@ -2483,6 +2633,7 @@ pnfs_generic_pg_check_range(struct nfs_pageio_descriptor *pgio, struct nfs_page
 		pgio->pg_lseg = NULL;
 	}
 }
+EXPORT_SYMBOL_GPL(pnfs_generic_pg_check_range);
 
 void
 pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
@@ -3000,10 +3151,10 @@ pnfs_layoutcommit_inode(struct inode *inode, bool sync)
 	end_pos = nfsi->layout->plh_lwb;
 
 	nfs4_stateid_copy(&data->args.stateid, &nfsi->layout->plh_stateid);
+	data->cred = get_cred(nfsi->layout->plh_lc_cred);
 	spin_unlock(&inode->i_lock);
 
 	data->args.inode = inode;
-	data->cred = get_cred(nfsi->layout->plh_lc_cred);
 	nfs_fattr_init(&data->fattr);
 	data->args.bitmask = NFS_SERVER(inode)->cache_consistency_bitmask;
 	data->res.fattr = &data->fattr;
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 0fafdadc9c8d..8e0ada581b92 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -66,6 +66,7 @@ struct nfs4_pnfs_ds {
 struct pnfs_layout_segment {
 	struct list_head pls_list;
 	struct list_head pls_lc_list;
+	struct list_head pls_commits;
 	struct pnfs_layout_range pls_range;
 	refcount_t pls_refcount;
 	u32 pls_seq;
@@ -105,6 +106,7 @@ enum {
 	NFS_LAYOUT_INVALID_STID,	/* layout stateid id is invalid */
 	NFS_LAYOUT_FIRST_LAYOUTGET,	/* Serialize first layoutget */
 	NFS_LAYOUT_INODE_FREEING,	/* The inode is being freed */
+	NFS_LAYOUT_HASHED,		/* The layout visible */
 };
 
 enum layoutdriver_policy_flags {
@@ -148,22 +150,6 @@ struct pnfs_layoutdriver_type {
 	const struct nfs_pageio_ops *pg_write_ops;
 
 	struct pnfs_ds_commit_info *(*get_ds_info) (struct inode *inode);
-	void (*mark_request_commit) (struct nfs_page *req,
-				     struct pnfs_layout_segment *lseg,
-				     struct nfs_commit_info *cinfo,
-				     u32 ds_commit_idx);
-	void (*clear_request_commit) (struct nfs_page *req,
-				      struct nfs_commit_info *cinfo);
-	int (*scan_commit_lists) (struct nfs_commit_info *cinfo,
-				  int max);
-	void (*recover_commit_reqs) (struct list_head *list,
-				     struct nfs_commit_info *cinfo);
-	struct nfs_page * (*search_commit_reqs)(struct nfs_commit_info *cinfo,
-						struct page *page);
-	int (*commit_pagelist)(struct inode *inode,
-			       struct list_head *mds_pages,
-			       int how,
-			       struct nfs_commit_info *cinfo);
 
 	int (*sync)(struct inode *inode, bool datasync);
 
@@ -186,6 +172,29 @@ struct pnfs_layoutdriver_type {
 	int (*prepare_layoutstats) (struct nfs42_layoutstat_args *args);
 };
 
+struct pnfs_commit_ops {
+	void (*setup_ds_info)(struct pnfs_ds_commit_info *,
+			      struct pnfs_layout_segment *);
+	void (*release_ds_info)(struct pnfs_ds_commit_info *,
+				struct inode *inode);
+	int (*commit_pagelist)(struct inode *inode,
+			       struct list_head *mds_pages,
+			       int how,
+			       struct nfs_commit_info *cinfo);
+	void (*mark_request_commit) (struct nfs_page *req,
+				     struct pnfs_layout_segment *lseg,
+				     struct nfs_commit_info *cinfo,
+				     u32 ds_commit_idx);
+	void (*clear_request_commit) (struct nfs_page *req,
+				      struct nfs_commit_info *cinfo);
+	int (*scan_commit_lists) (struct nfs_commit_info *cinfo,
+				  int max);
+	void (*recover_commit_reqs) (struct list_head *list,
+				     struct nfs_commit_info *cinfo);
+	struct nfs_page * (*search_commit_reqs)(struct nfs_commit_info *cinfo,
+						struct page *page);
+};
+
 struct pnfs_layout_hdr {
 	refcount_t		plh_refcount;
 	atomic_t		plh_outstanding; /* number of RPCs out */
@@ -203,6 +212,7 @@ struct pnfs_layout_hdr {
 	loff_t			plh_lwb; /* last write byte for layoutcommit */
 	const struct cred	*plh_lc_cred; /* layoutcommit cred */
 	struct inode		*plh_inode;
+	struct rcu_head		plh_rcu;
 };
 
 struct pnfs_device {
@@ -242,6 +252,7 @@ void pnfs_put_lseg(struct pnfs_layout_segment *lseg);
 void set_pnfs_layoutdriver(struct nfs_server *, const struct nfs_fh *, struct nfs_fsinfo *);
 void unset_pnfs_layoutdriver(struct nfs_server *);
 void pnfs_generic_pg_check_layout(struct nfs_pageio_descriptor *pgio);
+void pnfs_generic_pg_check_range(struct nfs_pageio_descriptor *pgio, struct nfs_page *req);
 void pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *, struct nfs_page *);
 int pnfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc);
 void pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio,
@@ -267,6 +278,7 @@ bool nfs4_layout_refresh_old_stateid(nfs4_stateid *dst,
 void pnfs_put_layout_hdr(struct pnfs_layout_hdr *lo);
 void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
 			     const nfs4_stateid *new,
+			     const struct cred *cred,
 			     bool update_barrier);
 int pnfs_mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo,
 				struct list_head *tmp_list,
@@ -326,6 +338,9 @@ int pnfs_write_done_resend_to_mds(struct nfs_pgio_header *);
 struct nfs4_threshold *pnfs_mdsthreshold_alloc(void);
 void pnfs_error_mark_layout_for_return(struct inode *inode,
 				       struct pnfs_layout_segment *lseg);
+void pnfs_layout_return_unused_byclid(struct nfs_client *clp,
+				      enum pnfs_iomode iomode);
+
 /* nfs4_deviceid_flags */
 enum {
 	NFS_DEVICEID_INVALID = 0,       /* set when MDS clientid recalled */
@@ -360,6 +375,16 @@ bool nfs4_test_deviceid_unavailable(struct nfs4_deviceid_node *node);
 void nfs4_deviceid_purge_client(const struct nfs_client *);
 
 /* pnfs_nfs.c */
+struct pnfs_commit_array *pnfs_alloc_commit_array(size_t n, gfp_t gfp_flags);
+void pnfs_free_commit_array(struct pnfs_commit_array *p);
+struct pnfs_commit_array *pnfs_add_commit_array(struct pnfs_ds_commit_info *,
+						struct pnfs_commit_array *,
+						struct pnfs_layout_segment *);
+
+void pnfs_generic_ds_cinfo_release_lseg(struct pnfs_ds_commit_info *fl_cinfo,
+		struct pnfs_layout_segment *lseg);
+void pnfs_generic_ds_cinfo_destroy(struct pnfs_ds_commit_info *fl_cinfo);
+
 void pnfs_generic_clear_request_commit(struct nfs_page *req,
 				       struct nfs_commit_info *cinfo);
 void pnfs_generic_commit_release(void *calldata);
@@ -367,6 +392,8 @@ void pnfs_generic_prepare_to_resend_writes(struct nfs_commit_data *data);
 void pnfs_generic_rw_release(void *data);
 void pnfs_generic_recover_commit_reqs(struct list_head *dst,
 				      struct nfs_commit_info *cinfo);
+struct nfs_page *pnfs_generic_search_commit_reqs(struct nfs_commit_info *cinfo,
+						 struct page *page);
 int pnfs_generic_commit_pagelist(struct inode *inode,
 				 struct list_head *mds_pages,
 				 int how,
@@ -438,9 +465,11 @@ static inline int
 pnfs_commit_list(struct inode *inode, struct list_head *mds_pages, int how,
 		 struct nfs_commit_info *cinfo)
 {
-	if (cinfo->ds == NULL || cinfo->ds->ncommitting == 0)
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
+
+	if (fl_cinfo == NULL || fl_cinfo->ncommitting == 0)
 		return PNFS_NOT_ATTEMPTED;
-	return NFS_SERVER(inode)->pnfs_curr_ld->commit_pagelist(inode, mds_pages, how, cinfo);
+	return fl_cinfo->ops->commit_pagelist(inode, mds_pages, how, cinfo);
 }
 
 static inline struct pnfs_ds_commit_info *
@@ -454,6 +483,28 @@ pnfs_get_ds_info(struct inode *inode)
 }
 
 static inline void
+pnfs_init_ds_commit_info_ops(struct pnfs_ds_commit_info *fl_cinfo, struct inode *inode)
+{
+	struct pnfs_ds_commit_info *inode_cinfo = pnfs_get_ds_info(inode);
+	if (inode_cinfo != NULL)
+		fl_cinfo->ops = inode_cinfo->ops;
+}
+
+static inline void
+pnfs_init_ds_commit_info(struct pnfs_ds_commit_info *fl_cinfo)
+{
+	INIT_LIST_HEAD(&fl_cinfo->commits);
+	fl_cinfo->ops = NULL;
+}
+
+static inline void
+pnfs_release_ds_info(struct pnfs_ds_commit_info *fl_cinfo, struct inode *inode)
+{
+	if (fl_cinfo->ops != NULL && fl_cinfo->ops->release_ds_info != NULL)
+		fl_cinfo->ops->release_ds_info(fl_cinfo, inode);
+}
+
+static inline void
 pnfs_generic_mark_devid_invalid(struct nfs4_deviceid_node *node)
 {
 	set_bit(NFS_DEVICEID_INVALID, &node->flags);
@@ -463,24 +514,22 @@ static inline bool
 pnfs_mark_request_commit(struct nfs_page *req, struct pnfs_layout_segment *lseg,
 			 struct nfs_commit_info *cinfo, u32 ds_commit_idx)
 {
-	struct inode *inode = d_inode(nfs_req_openctx(req)->dentry);
-	struct pnfs_layoutdriver_type *ld = NFS_SERVER(inode)->pnfs_curr_ld;
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
 
-	if (lseg == NULL || ld->mark_request_commit == NULL)
+	if (!lseg || !fl_cinfo->ops->mark_request_commit)
 		return false;
-	ld->mark_request_commit(req, lseg, cinfo, ds_commit_idx);
+	fl_cinfo->ops->mark_request_commit(req, lseg, cinfo, ds_commit_idx);
 	return true;
 }
 
 static inline bool
 pnfs_clear_request_commit(struct nfs_page *req, struct nfs_commit_info *cinfo)
 {
-	struct inode *inode = d_inode(nfs_req_openctx(req)->dentry);
-	struct pnfs_layoutdriver_type *ld = NFS_SERVER(inode)->pnfs_curr_ld;
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
 
-	if (ld == NULL || ld->clear_request_commit == NULL)
+	if (!fl_cinfo || !fl_cinfo->ops || !fl_cinfo->ops->clear_request_commit)
 		return false;
-	ld->clear_request_commit(req, cinfo);
+	fl_cinfo->ops->clear_request_commit(req, cinfo);
 	return true;
 }
 
@@ -488,21 +537,31 @@ static inline int
 pnfs_scan_commit_lists(struct inode *inode, struct nfs_commit_info *cinfo,
 		       int max)
 {
-	if (cinfo->ds == NULL || cinfo->ds->nwritten == 0)
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
+
+	if (!fl_cinfo || fl_cinfo->nwritten == 0)
 		return 0;
-	else
-		return NFS_SERVER(inode)->pnfs_curr_ld->scan_commit_lists(cinfo, max);
+	return fl_cinfo->ops->scan_commit_lists(cinfo, max);
+}
+
+static inline void
+pnfs_recover_commit_reqs(struct list_head *head, struct nfs_commit_info *cinfo)
+{
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
+
+	if (fl_cinfo && fl_cinfo->nwritten != 0)
+		fl_cinfo->ops->recover_commit_reqs(head, cinfo);
 }
 
 static inline struct nfs_page *
 pnfs_search_commit_reqs(struct inode *inode, struct nfs_commit_info *cinfo,
 			struct page *page)
 {
-	struct pnfs_layoutdriver_type *ld = NFS_SERVER(inode)->pnfs_curr_ld;
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
 
-	if (ld == NULL || ld->search_commit_reqs == NULL)
+	if (!fl_cinfo->ops || !fl_cinfo->ops->search_commit_reqs)
 		return NULL;
-	return ld->search_commit_reqs(cinfo, page);
+	return fl_cinfo->ops->search_commit_reqs(cinfo, page);
 }
 
 /* Should the pNFS client commit and return the layout upon a setattr */
@@ -750,6 +809,21 @@ pnfs_get_ds_info(struct inode *inode)
 	return NULL;
 }
 
+static inline void
+pnfs_init_ds_commit_info_ops(struct pnfs_ds_commit_info *fl_cinfo, struct inode *inode)
+{
+}
+
+static inline void
+pnfs_init_ds_commit_info(struct pnfs_ds_commit_info *fl_cinfo)
+{
+}
+
+static inline void
+pnfs_release_ds_info(struct pnfs_ds_commit_info *fl_cinfo, struct inode *inode)
+{
+}
+
 static inline bool
 pnfs_mark_request_commit(struct nfs_page *req, struct pnfs_layout_segment *lseg,
 			 struct nfs_commit_info *cinfo, u32 ds_commit_idx)
@@ -770,6 +844,11 @@ pnfs_scan_commit_lists(struct inode *inode, struct nfs_commit_info *cinfo,
 	return 0;
 }
 
+static inline void
+pnfs_recover_commit_reqs(struct list_head *head, struct nfs_commit_info *cinfo)
+{
+}
+
 static inline struct nfs_page *
 pnfs_search_commit_reqs(struct inode *inode, struct nfs_commit_info *cinfo,
 			struct page *page)
diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c
index 8b37e7f8e789..25f135572fc8 100644
--- a/fs/nfs/pnfs_nfs.c
+++ b/fs/nfs/pnfs_nfs.c
@@ -59,6 +59,17 @@ void pnfs_generic_commit_release(void *calldata)
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_commit_release);
 
+static struct pnfs_layout_segment *
+pnfs_free_bucket_lseg(struct pnfs_commit_bucket *bucket)
+{
+	if (list_empty(&bucket->committing) && list_empty(&bucket->written)) {
+		struct pnfs_layout_segment *freeme = bucket->lseg;
+		bucket->lseg = NULL;
+		return freeme;
+	}
+	return NULL;
+}
+
 /* The generic layer is about to remove the req from the commit list.
  * If this will make the bucket empty, it will need to put the lseg reference.
  * Note this must be called holding nfsi->commit_mutex
@@ -78,8 +89,7 @@ pnfs_generic_clear_request_commit(struct nfs_page *req,
 		bucket = list_first_entry(&req->wb_list,
 					  struct pnfs_commit_bucket,
 					  written);
-		freeme = bucket->wlseg;
-		bucket->wlseg = NULL;
+		freeme = pnfs_free_bucket_lseg(bucket);
 	}
 out:
 	nfs_request_remove_commit_list(req, cinfo);
@@ -87,10 +97,154 @@ out:
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_clear_request_commit);
 
+struct pnfs_commit_array *
+pnfs_alloc_commit_array(size_t n, gfp_t gfp_flags)
+{
+	struct pnfs_commit_array *p;
+	struct pnfs_commit_bucket *b;
+
+	p = kmalloc(struct_size(p, buckets, n), gfp_flags);
+	if (!p)
+		return NULL;
+	p->nbuckets = n;
+	INIT_LIST_HEAD(&p->cinfo_list);
+	INIT_LIST_HEAD(&p->lseg_list);
+	p->lseg = NULL;
+	for (b = &p->buckets[0]; n != 0; b++, n--) {
+		INIT_LIST_HEAD(&b->written);
+		INIT_LIST_HEAD(&b->committing);
+		b->lseg = NULL;
+		b->direct_verf.committed = NFS_INVALID_STABLE_HOW;
+	}
+	return p;
+}
+EXPORT_SYMBOL_GPL(pnfs_alloc_commit_array);
+
+void
+pnfs_free_commit_array(struct pnfs_commit_array *p)
+{
+	kfree_rcu(p, rcu);
+}
+EXPORT_SYMBOL_GPL(pnfs_free_commit_array);
+
+static struct pnfs_commit_array *
+pnfs_find_commit_array_by_lseg(struct pnfs_ds_commit_info *fl_cinfo,
+		struct pnfs_layout_segment *lseg)
+{
+	struct pnfs_commit_array *array;
+
+	list_for_each_entry_rcu(array, &fl_cinfo->commits, cinfo_list) {
+		if (array->lseg == lseg)
+			return array;
+	}
+	return NULL;
+}
+
+struct pnfs_commit_array *
+pnfs_add_commit_array(struct pnfs_ds_commit_info *fl_cinfo,
+		struct pnfs_commit_array *new,
+		struct pnfs_layout_segment *lseg)
+{
+	struct pnfs_commit_array *array;
+
+	array = pnfs_find_commit_array_by_lseg(fl_cinfo, lseg);
+	if (array)
+		return array;
+	new->lseg = lseg;
+	refcount_set(&new->refcount, 1);
+	list_add_rcu(&new->cinfo_list, &fl_cinfo->commits);
+	list_add(&new->lseg_list, &lseg->pls_commits);
+	return new;
+}
+EXPORT_SYMBOL_GPL(pnfs_add_commit_array);
+
+static struct pnfs_commit_array *
+pnfs_lookup_commit_array(struct pnfs_ds_commit_info *fl_cinfo,
+		struct pnfs_layout_segment *lseg)
+{
+	struct pnfs_commit_array *array;
+
+	rcu_read_lock();
+	array = pnfs_find_commit_array_by_lseg(fl_cinfo, lseg);
+	if (!array) {
+		rcu_read_unlock();
+		fl_cinfo->ops->setup_ds_info(fl_cinfo, lseg);
+		rcu_read_lock();
+		array = pnfs_find_commit_array_by_lseg(fl_cinfo, lseg);
+	}
+	rcu_read_unlock();
+	return array;
+}
+
+static void
+pnfs_release_commit_array_locked(struct pnfs_commit_array *array)
+{
+	list_del_rcu(&array->cinfo_list);
+	list_del(&array->lseg_list);
+	pnfs_free_commit_array(array);
+}
+
+static void
+pnfs_put_commit_array_locked(struct pnfs_commit_array *array)
+{
+	if (refcount_dec_and_test(&array->refcount))
+		pnfs_release_commit_array_locked(array);
+}
+
+static void
+pnfs_put_commit_array(struct pnfs_commit_array *array, struct inode *inode)
+{
+	if (refcount_dec_and_lock(&array->refcount, &inode->i_lock)) {
+		pnfs_release_commit_array_locked(array);
+		spin_unlock(&inode->i_lock);
+	}
+}
+
+static struct pnfs_commit_array *
+pnfs_get_commit_array(struct pnfs_commit_array *array)
+{
+	if (refcount_inc_not_zero(&array->refcount))
+		return array;
+	return NULL;
+}
+
+static void
+pnfs_remove_and_free_commit_array(struct pnfs_commit_array *array)
+{
+	array->lseg = NULL;
+	list_del_init(&array->lseg_list);
+	pnfs_put_commit_array_locked(array);
+}
+
+void
+pnfs_generic_ds_cinfo_release_lseg(struct pnfs_ds_commit_info *fl_cinfo,
+		struct pnfs_layout_segment *lseg)
+{
+	struct pnfs_commit_array *array, *tmp;
+
+	list_for_each_entry_safe(array, tmp, &lseg->pls_commits, lseg_list)
+		pnfs_remove_and_free_commit_array(array);
+}
+EXPORT_SYMBOL_GPL(pnfs_generic_ds_cinfo_release_lseg);
+
+void
+pnfs_generic_ds_cinfo_destroy(struct pnfs_ds_commit_info *fl_cinfo)
+{
+	struct pnfs_commit_array *array, *tmp;
+
+	list_for_each_entry_safe(array, tmp, &fl_cinfo->commits, cinfo_list)
+		pnfs_remove_and_free_commit_array(array);
+}
+EXPORT_SYMBOL_GPL(pnfs_generic_ds_cinfo_destroy);
+
+/*
+ * Locks the nfs_page requests for commit and moves them to
+ * @bucket->committing.
+ */
 static int
-pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
-				 struct nfs_commit_info *cinfo,
-				 int max)
+pnfs_bucket_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
+				struct nfs_commit_info *cinfo,
+				int max)
 {
 	struct list_head *src = &bucket->written;
 	struct list_head *dst = &bucket->committing;
@@ -101,158 +255,253 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
 	if (ret) {
 		cinfo->ds->nwritten -= ret;
 		cinfo->ds->ncommitting += ret;
-		if (bucket->clseg == NULL)
-			bucket->clseg = pnfs_get_lseg(bucket->wlseg);
-		if (list_empty(src)) {
-			pnfs_put_lseg(bucket->wlseg);
-			bucket->wlseg = NULL;
-		}
 	}
 	return ret;
 }
 
+static int pnfs_bucket_scan_array(struct nfs_commit_info *cinfo,
+				  struct pnfs_commit_bucket *buckets,
+				  unsigned int nbuckets,
+				  int max)
+{
+	unsigned int i;
+	int rv = 0, cnt;
+
+	for (i = 0; i < nbuckets && max != 0; i++) {
+		cnt = pnfs_bucket_scan_ds_commit_list(&buckets[i], cinfo, max);
+		rv += cnt;
+		max -= cnt;
+	}
+	return rv;
+}
+
 /* Move reqs from written to committing lists, returning count
  * of number moved.
  */
-int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo,
-				   int max)
+int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo, int max)
 {
-	int i, rv = 0, cnt;
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
+	struct pnfs_commit_array *array;
+	int rv = 0, cnt;
 
-	lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
-	for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) {
-		cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i],
-						       cinfo, max);
-		max -= cnt;
+	rcu_read_lock();
+	list_for_each_entry_rcu(array, &fl_cinfo->commits, cinfo_list) {
+		if (!array->lseg || !pnfs_get_commit_array(array))
+			continue;
+		rcu_read_unlock();
+		cnt = pnfs_bucket_scan_array(cinfo, array->buckets,
+				array->nbuckets, max);
+		rcu_read_lock();
+		pnfs_put_commit_array(array, cinfo->inode);
 		rv += cnt;
+		max -= cnt;
+		if (!max)
+			break;
 	}
+	rcu_read_unlock();
 	return rv;
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_scan_commit_lists);
 
-/* Pull everything off the committing lists and dump into @dst.  */
-void pnfs_generic_recover_commit_reqs(struct list_head *dst,
-				      struct nfs_commit_info *cinfo)
+static unsigned int
+pnfs_bucket_recover_commit_reqs(struct list_head *dst,
+			        struct pnfs_commit_bucket *buckets,
+				unsigned int nbuckets,
+				struct nfs_commit_info *cinfo)
 {
 	struct pnfs_commit_bucket *b;
 	struct pnfs_layout_segment *freeme;
-	int nwritten;
-	int i;
+	unsigned int nwritten, ret = 0;
+	unsigned int i;
 
-	lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
 restart:
-	for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
+	for (i = 0, b = buckets; i < nbuckets; i++, b++) {
 		nwritten = nfs_scan_commit_list(&b->written, dst, cinfo, 0);
 		if (!nwritten)
 			continue;
-		cinfo->ds->nwritten -= nwritten;
-		if (list_empty(&b->written)) {
-			freeme = b->wlseg;
-			b->wlseg = NULL;
+		ret += nwritten;
+		freeme = pnfs_free_bucket_lseg(b);
+		if (freeme) {
 			pnfs_put_lseg(freeme);
 			goto restart;
 		}
 	}
+	return ret;
+}
+
+/* Pull everything off the committing lists and dump into @dst.  */
+void pnfs_generic_recover_commit_reqs(struct list_head *dst,
+				      struct nfs_commit_info *cinfo)
+{
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
+	struct pnfs_commit_array *array;
+	unsigned int nwritten;
+
+	lockdep_assert_held(&NFS_I(cinfo->inode)->commit_mutex);
+	rcu_read_lock();
+	list_for_each_entry_rcu(array, &fl_cinfo->commits, cinfo_list) {
+		if (!array->lseg || !pnfs_get_commit_array(array))
+			continue;
+		rcu_read_unlock();
+		nwritten = pnfs_bucket_recover_commit_reqs(dst,
+							   array->buckets,
+							   array->nbuckets,
+							   cinfo);
+		rcu_read_lock();
+		pnfs_put_commit_array(array, cinfo->inode);
+		fl_cinfo->nwritten -= nwritten;
+	}
+	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_recover_commit_reqs);
 
-static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx)
+static struct nfs_page *
+pnfs_bucket_search_commit_reqs(struct pnfs_commit_bucket *buckets,
+		unsigned int nbuckets, struct page *page)
+{
+	struct nfs_page *req;
+	struct pnfs_commit_bucket *b;
+	unsigned int i;
+
+	/* Linearly search the commit lists for each bucket until a matching
+	 * request is found */
+	for (i = 0, b = buckets; i < nbuckets; i++, b++) {
+		list_for_each_entry(req, &b->written, wb_list) {
+			if (req->wb_page == page)
+				return req->wb_head;
+		}
+		list_for_each_entry(req, &b->committing, wb_list) {
+			if (req->wb_page == page)
+				return req->wb_head;
+		}
+	}
+	return NULL;
+}
+
+/* pnfs_generic_search_commit_reqs - Search lists in @cinfo for the head reqest
+ *				   for @page
+ * @cinfo - commit info for current inode
+ * @page - page to search for matching head request
+ *
+ * Returns a the head request if one is found, otherwise returns NULL.
+ */
+struct nfs_page *
+pnfs_generic_search_commit_reqs(struct nfs_commit_info *cinfo, struct page *page)
 {
 	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
+	struct pnfs_commit_array *array;
+	struct nfs_page *req;
+
+	list_for_each_entry(array, &fl_cinfo->commits, cinfo_list) {
+		req = pnfs_bucket_search_commit_reqs(array->buckets,
+				array->nbuckets, page);
+		if (req)
+			return req;
+	}
+	return NULL;
+}
+EXPORT_SYMBOL_GPL(pnfs_generic_search_commit_reqs);
+
+static struct pnfs_layout_segment *
+pnfs_bucket_get_committing(struct list_head *head,
+			   struct pnfs_commit_bucket *bucket,
+			   struct nfs_commit_info *cinfo)
+{
+	struct list_head *pos;
+
+	list_for_each(pos, &bucket->committing)
+		cinfo->ds->ncommitting--;
+	list_splice_init(&bucket->committing, head);
+	return pnfs_free_bucket_lseg(bucket);
+}
+
+static struct nfs_commit_data *
+pnfs_bucket_fetch_commitdata(struct pnfs_commit_bucket *bucket,
+			     struct nfs_commit_info *cinfo)
+{
+	struct nfs_commit_data *data = nfs_commitdata_alloc(false);
+
+	if (!data)
+		return NULL;
+	data->lseg = pnfs_bucket_get_committing(&data->pages, bucket, cinfo);
+	if (!data->lseg)
+		data->lseg = pnfs_get_lseg(bucket->lseg);
+	return data;
+}
+
+static void pnfs_generic_retry_commit(struct pnfs_commit_bucket *buckets,
+				      unsigned int nbuckets,
+				      struct nfs_commit_info *cinfo,
+				      unsigned int idx)
+{
 	struct pnfs_commit_bucket *bucket;
 	struct pnfs_layout_segment *freeme;
-	struct list_head *pos;
 	LIST_HEAD(pages);
-	int i;
 
-	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
-	for (i = idx; i < fl_cinfo->nbuckets; i++) {
-		bucket = &fl_cinfo->buckets[i];
+	for (bucket = buckets; idx < nbuckets; bucket++, idx++) {
 		if (list_empty(&bucket->committing))
 			continue;
-		freeme = bucket->clseg;
-		bucket->clseg = NULL;
-		list_for_each(pos, &bucket->committing)
-			cinfo->ds->ncommitting--;
-		list_splice_init(&bucket->committing, &pages);
+		mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
+		freeme = pnfs_bucket_get_committing(&pages, bucket, cinfo);
 		mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
-		nfs_retry_commit(&pages, freeme, cinfo, i);
+		nfs_retry_commit(&pages, freeme, cinfo, idx);
 		pnfs_put_lseg(freeme);
-		mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
 	}
-	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 }
 
 static unsigned int
-pnfs_generic_alloc_ds_commits(struct nfs_commit_info *cinfo,
-			      struct list_head *list)
+pnfs_bucket_alloc_ds_commits(struct list_head *list,
+			     struct pnfs_commit_bucket *buckets,
+			     unsigned int nbuckets,
+			     struct nfs_commit_info *cinfo)
 {
-	struct pnfs_ds_commit_info *fl_cinfo;
 	struct pnfs_commit_bucket *bucket;
 	struct nfs_commit_data *data;
-	int i;
+	unsigned int i;
 	unsigned int nreq = 0;
 
-	fl_cinfo = cinfo->ds;
-	bucket = fl_cinfo->buckets;
-	for (i = 0; i < fl_cinfo->nbuckets; i++, bucket++) {
+	for (i = 0, bucket = buckets; i < nbuckets; i++, bucket++) {
 		if (list_empty(&bucket->committing))
 			continue;
-		data = nfs_commitdata_alloc(false);
-		if (!data)
-			break;
-		data->ds_commit_index = i;
-		list_add(&data->pages, list);
-		nreq++;
+		mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
+		if (!list_empty(&bucket->committing)) {
+			data = pnfs_bucket_fetch_commitdata(bucket, cinfo);
+			if (!data)
+				goto out_error;
+			data->ds_commit_index = i;
+			list_add_tail(&data->list, list);
+			atomic_inc(&cinfo->mds->rpcs_out);
+			nreq++;
+		}
+		mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 	}
-
+	return nreq;
+out_error:
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 	/* Clean up on error */
-	pnfs_generic_retry_commit(cinfo, i);
+	pnfs_generic_retry_commit(buckets, nbuckets, cinfo, i);
 	return nreq;
 }
 
-static inline
-void pnfs_fetch_commit_bucket_list(struct list_head *pages,
-		struct nfs_commit_data *data,
-		struct nfs_commit_info *cinfo)
+static unsigned int
+pnfs_alloc_ds_commits_list(struct list_head *list,
+			   struct pnfs_ds_commit_info *fl_cinfo,
+			   struct nfs_commit_info *cinfo)
 {
-	struct pnfs_commit_bucket *bucket;
-	struct list_head *pos;
-
-	bucket = &cinfo->ds->buckets[data->ds_commit_index];
-	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
-	list_for_each(pos, &bucket->committing)
-		cinfo->ds->ncommitting--;
-	list_splice_init(&bucket->committing, pages);
-	data->lseg = bucket->clseg;
-	bucket->clseg = NULL;
-	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
-
-}
+	struct pnfs_commit_array *array;
+	unsigned int ret = 0;
 
-/* Helper function for pnfs_generic_commit_pagelist to catch an empty
- * page list. This can happen when two commits race.
- *
- * This must be called instead of nfs_init_commit - call one or the other, but
- * not both!
- */
-static bool
-pnfs_generic_commit_cancel_empty_pagelist(struct list_head *pages,
-					  struct nfs_commit_data *data,
-					  struct nfs_commit_info *cinfo)
-{
-	if (list_empty(pages)) {
-		if (atomic_dec_and_test(&cinfo->mds->rpcs_out))
-			wake_up_var(&cinfo->mds->rpcs_out);
-		/* don't call nfs_commitdata_release - it tries to put
-		 * the open_context which is not acquired until nfs_init_commit
-		 * which has not been called on @data */
-		WARN_ON_ONCE(data->context);
-		nfs_commit_free(data);
-		return true;
+	rcu_read_lock();
+	list_for_each_entry_rcu(array, &fl_cinfo->commits, cinfo_list) {
+		if (!array->lseg || !pnfs_get_commit_array(array))
+			continue;
+		rcu_read_unlock();
+		ret += pnfs_bucket_alloc_ds_commits(list, array->buckets,
+				array->nbuckets, cinfo);
+		rcu_read_lock();
+		pnfs_put_commit_array(array, cinfo->inode);
 	}
-
-	return false;
+	return ret;
 }
 
 /* This follows nfs_commit_list pretty closely */
@@ -262,6 +511,7 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 			     int (*initiate_commit)(struct nfs_commit_data *data,
 						    int how))
 {
+	struct pnfs_ds_commit_info *fl_cinfo = cinfo->ds;
 	struct nfs_commit_data *data, *tmp;
 	LIST_HEAD(list);
 	unsigned int nreq = 0;
@@ -269,40 +519,25 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
 	if (!list_empty(mds_pages)) {
 		data = nfs_commitdata_alloc(true);
 		data->ds_commit_index = -1;
-		list_add(&data->pages, &list);
+		list_splice_init(mds_pages, &data->pages);
+		list_add_tail(&data->list, &list);
+		atomic_inc(&cinfo->mds->rpcs_out);
 		nreq++;
 	}
 
-	nreq += pnfs_generic_alloc_ds_commits(cinfo, &list);
-
+	nreq += pnfs_alloc_ds_commits_list(&list, fl_cinfo, cinfo);
 	if (nreq == 0)
 		goto out;
 
-	atomic_add(nreq, &cinfo->mds->rpcs_out);
-
-	list_for_each_entry_safe(data, tmp, &list, pages) {
-		list_del_init(&data->pages);
+	list_for_each_entry_safe(data, tmp, &list, list) {
+		list_del(&data->list);
 		if (data->ds_commit_index < 0) {
-			/* another commit raced with us */
-			if (pnfs_generic_commit_cancel_empty_pagelist(mds_pages,
-				data, cinfo))
-				continue;
-
-			nfs_init_commit(data, mds_pages, NULL, cinfo);
+			nfs_init_commit(data, NULL, NULL, cinfo);
 			nfs_initiate_commit(NFS_CLIENT(inode), data,
 					    NFS_PROTO(data->inode),
 					    data->mds_ops, how, 0);
 		} else {
-			LIST_HEAD(pages);
-
-			pnfs_fetch_commit_bucket_list(&pages, data, cinfo);
-
-			/* another commit raced with us */
-			if (pnfs_generic_commit_cancel_empty_pagelist(&pages,
-				data, cinfo))
-				continue;
-
-			nfs_init_commit(data, &pages, data->lseg, cinfo);
+			nfs_init_commit(data, NULL, data->lseg, cinfo);
 			initiate_commit(data, how);
 		}
 	}
@@ -930,32 +1165,33 @@ pnfs_layout_mark_request_commit(struct nfs_page *req,
 				u32 ds_commit_idx)
 {
 	struct list_head *list;
-	struct pnfs_commit_bucket *buckets;
+	struct pnfs_commit_array *array;
+	struct pnfs_commit_bucket *bucket;
 
 	mutex_lock(&NFS_I(cinfo->inode)->commit_mutex);
-	buckets = cinfo->ds->buckets;
-	list = &buckets[ds_commit_idx].written;
-	if (list_empty(list)) {
-		if (!pnfs_is_valid_lseg(lseg)) {
-			mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
-			cinfo->completion_ops->resched_write(cinfo, req);
-			return;
-		}
-		/* Non-empty buckets hold a reference on the lseg.  That ref
-		 * is normally transferred to the COMMIT call and released
-		 * there.  It could also be released if the last req is pulled
-		 * off due to a rewrite, in which case it will be done in
-		 * pnfs_common_clear_request_commit
-		 */
-		WARN_ON_ONCE(buckets[ds_commit_idx].wlseg != NULL);
-		buckets[ds_commit_idx].wlseg = pnfs_get_lseg(lseg);
-	}
+	array = pnfs_lookup_commit_array(cinfo->ds, lseg);
+	if (!array || !pnfs_is_valid_lseg(lseg))
+		goto out_resched;
+	bucket = &array->buckets[ds_commit_idx];
+	list = &bucket->written;
+	/* Non-empty buckets hold a reference on the lseg.  That ref
+	 * is normally transferred to the COMMIT call and released
+	 * there.  It could also be released if the last req is pulled
+	 * off due to a rewrite, in which case it will be done in
+	 * pnfs_common_clear_request_commit
+	 */
+	if (!bucket->lseg)
+		bucket->lseg = pnfs_get_lseg(lseg);
 	set_bit(PG_COMMIT_TO_DS, &req->wb_flags);
 	cinfo->ds->nwritten++;
 
 	nfs_request_add_commit_list_locked(req, list, cinfo);
 	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
 	nfs_mark_page_unstable(req->wb_page, cinfo);
+	return;
+out_resched:
+	mutex_unlock(&NFS_I(cinfo->inode)->commit_mutex);
+	cinfo->completion_ops->resched_write(cinfo, req);
 }
 EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit);
 
diff --git a/fs/nfs/read.c b/fs/nfs/read.c
index 34bb9add2302..13b22e898116 100644
--- a/fs/nfs/read.c
+++ b/fs/nfs/read.c
@@ -250,7 +250,7 @@ static int nfs_readpage_done(struct rpc_task *task,
 	trace_nfs_readpage_done(task, hdr);
 
 	if (task->tk_status == -ESTALE) {
-		set_bit(NFS_INO_STALE, &NFS_I(inode)->flags);
+		nfs_set_inode_stale(inode);
 		nfs_mark_for_revalidate(inode);
 	}
 	return 0;
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index bb14bede6da5..59ef3b13ccca 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -176,6 +176,41 @@ void nfs_sb_deactive(struct super_block *sb)
 }
 EXPORT_SYMBOL_GPL(nfs_sb_deactive);
 
+static int __nfs_list_for_each_server(struct list_head *head,
+		int (*fn)(struct nfs_server *, void *),
+		void *data)
+{
+	struct nfs_server *server, *last = NULL;
+	int ret = 0;
+
+	rcu_read_lock();
+	list_for_each_entry_rcu(server, head, client_link) {
+		if (!nfs_sb_active(server->super))
+			continue;
+		rcu_read_unlock();
+		if (last)
+			nfs_sb_deactive(last->super);
+		last = server;
+		ret = fn(server, data);
+		if (ret)
+			goto out;
+		rcu_read_lock();
+	}
+	rcu_read_unlock();
+out:
+	if (last)
+		nfs_sb_deactive(last->super);
+	return ret;
+}
+
+int nfs_client_for_each_server(struct nfs_client *clp,
+		int (*fn)(struct nfs_server *, void *),
+		void *data)
+{
+	return __nfs_list_for_each_server(&clp->cl_superblocks, fn, data);
+}
+EXPORT_SYMBOL_GPL(nfs_client_for_each_server);
+
 /*
  * Deliver file system statistics to userspace
  */
diff --git a/fs/nfs/unlink.c b/fs/nfs/unlink.c
index 0effeee28352..b27ebdccef70 100644
--- a/fs/nfs/unlink.c
+++ b/fs/nfs/unlink.c
@@ -98,7 +98,7 @@ static void nfs_do_call_unlink(struct inode *inode, struct nfs_unlinkdata *data)
 		.callback_ops = &nfs_unlink_ops,
 		.callback_data = data,
 		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF,
 	};
 	struct rpc_task *task;
 	struct inode *dir = d_inode(data->dentry->d_parent);
@@ -341,7 +341,7 @@ nfs_async_rename(struct inode *old_dir, struct inode *new_dir,
 		.callback_ops = &nfs_rename_ops,
 		.workqueue = nfsiod_workqueue,
 		.rpc_client = NFS_CLIENT(old_dir),
-		.flags = RPC_TASK_ASYNC,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF,
 	};
 
 	data = kzalloc(sizeof(*data), GFP_KERNEL);
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index c478b772cc49..df4b87c30ac9 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -149,6 +149,31 @@ static void nfs_io_completion_put(struct nfs_io_completion *ioc)
 		kref_put(&ioc->refcount, nfs_io_completion_release);
 }
 
+static void
+nfs_page_set_inode_ref(struct nfs_page *req, struct inode *inode)
+{
+	if (!test_and_set_bit(PG_INODE_REF, &req->wb_flags)) {
+		kref_get(&req->wb_kref);
+		atomic_long_inc(&NFS_I(inode)->nrequests);
+	}
+}
+
+static int
+nfs_cancel_remove_inode(struct nfs_page *req, struct inode *inode)
+{
+	int ret;
+
+	if (!test_bit(PG_REMOVE, &req->wb_flags))
+		return 0;
+	ret = nfs_page_group_lock(req);
+	if (ret)
+		return ret;
+	if (test_and_clear_bit(PG_REMOVE, &req->wb_flags))
+		nfs_page_set_inode_ref(req, inode);
+	nfs_page_group_unlock(req);
+	return 0;
+}
+
 static struct nfs_page *
 nfs_page_private_request(struct page *page)
 {
@@ -218,6 +243,36 @@ static struct nfs_page *nfs_page_find_head_request(struct page *page)
 	return req;
 }
 
+static struct nfs_page *nfs_find_and_lock_page_request(struct page *page)
+{
+	struct inode *inode = page_file_mapping(page)->host;
+	struct nfs_page *req, *head;
+	int ret;
+
+	for (;;) {
+		req = nfs_page_find_head_request(page);
+		if (!req)
+			return req;
+		head = nfs_page_group_lock_head(req);
+		if (head != req)
+			nfs_release_request(req);
+		if (IS_ERR(head))
+			return head;
+		ret = nfs_cancel_remove_inode(head, inode);
+		if (ret < 0) {
+			nfs_unlock_and_release_request(head);
+			return ERR_PTR(ret);
+		}
+		/* Ensure that nobody removed the request before we locked it */
+		if (head == nfs_page_private_request(page))
+			break;
+		if (PageSwapCache(page))
+			break;
+		nfs_unlock_and_release_request(head);
+	}
+	return head;
+}
+
 /* Adjust the file length if we're writing beyond the end */
 static void nfs_grow_file(struct page *page, unsigned int offset, unsigned int count)
 {
@@ -380,34 +435,6 @@ static void nfs_end_page_writeback(struct nfs_page *req)
 }
 
 /*
- * nfs_unroll_locks_and_wait -  unlock all newly locked reqs and wait on @req
- *
- * this is a helper function for nfs_lock_and_join_requests
- *
- * @inode - inode associated with request page group, must be holding inode lock
- * @head  - head request of page group, must be holding head lock
- * @req   - request that couldn't lock and needs to wait on the req bit lock
- *
- * NOTE: this must be called holding page_group bit lock
- *       which will be released before returning.
- *
- * returns 0 on success, < 0 on error.
- */
-static void
-nfs_unroll_locks(struct inode *inode, struct nfs_page *head,
-			  struct nfs_page *req)
-{
-	struct nfs_page *tmp;
-
-	/* relinquish all the locks successfully grabbed this run */
-	for (tmp = head->wb_this_page ; tmp != req; tmp = tmp->wb_this_page) {
-		if (!kref_read(&tmp->wb_kref))
-			continue;
-		nfs_unlock_and_release_request(tmp);
-	}
-}
-
-/*
  * nfs_destroy_unlinked_subrequests - destroy recently unlinked subrequests
  *
  * @destroy_list - request list (using wb_this_page) terminated by @old_head
@@ -428,22 +455,29 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
 		destroy_list = (subreq->wb_this_page == old_head) ?
 				   NULL : subreq->wb_this_page;
 
+		/* Note: lock subreq in order to change subreq->wb_head */
+		nfs_page_set_headlock(subreq);
 		WARN_ON_ONCE(old_head != subreq->wb_head);
 
 		/* make sure old group is not used */
 		subreq->wb_this_page = subreq;
+		subreq->wb_head = subreq;
 
 		clear_bit(PG_REMOVE, &subreq->wb_flags);
 
 		/* Note: races with nfs_page_group_destroy() */
 		if (!kref_read(&subreq->wb_kref)) {
 			/* Check if we raced with nfs_page_group_destroy() */
-			if (test_and_clear_bit(PG_TEARDOWN, &subreq->wb_flags))
+			if (test_and_clear_bit(PG_TEARDOWN, &subreq->wb_flags)) {
+				nfs_page_clear_headlock(subreq);
 				nfs_free_request(subreq);
+			} else
+				nfs_page_clear_headlock(subreq);
 			continue;
 		}
+		nfs_page_clear_headlock(subreq);
 
-		subreq->wb_head = subreq;
+		nfs_release_request(old_head);
 
 		if (test_and_clear_bit(PG_INODE_REF, &subreq->wb_flags)) {
 			nfs_release_request(subreq);
@@ -457,105 +491,43 @@ nfs_destroy_unlinked_subrequests(struct nfs_page *destroy_list,
 }
 
 /*
- * nfs_lock_and_join_requests - join all subreqs to the head req and return
- *                              a locked reference, cancelling any pending
- *                              operations for this page.
- *
- * @page - the page used to lookup the "page group" of nfs_page structures
+ * nfs_join_page_group - destroy subrequests of the head req
+ * @head: the page used to lookup the "page group" of nfs_page structures
+ * @inode: Inode to which the request belongs.
  *
  * This function joins all sub requests to the head request by first
  * locking all requests in the group, cancelling any pending operations
  * and finally updating the head request to cover the whole range covered by
  * the (former) group.  All subrequests are removed from any write or commit
  * lists, unlinked from the group and destroyed.
- *
- * Returns a locked, referenced pointer to the head request - which after
- * this call is guaranteed to be the only request associated with the page.
- * Returns NULL if no requests are found for @page, or a ERR_PTR if an
- * error was encountered.
  */
-static struct nfs_page *
-nfs_lock_and_join_requests(struct page *page)
+void
+nfs_join_page_group(struct nfs_page *head, struct inode *inode)
 {
-	struct inode *inode = page_file_mapping(page)->host;
-	struct nfs_page *head, *subreq;
+	struct nfs_page *subreq;
 	struct nfs_page *destroy_list = NULL;
-	unsigned int total_bytes;
-	int ret;
+	unsigned int pgbase, off, bytes;
 
-try_again:
-	/*
-	 * A reference is taken only on the head request which acts as a
-	 * reference to the whole page group - the group will not be destroyed
-	 * until the head reference is released.
-	 */
-	head = nfs_page_find_head_request(page);
-	if (!head)
-		return NULL;
-
-	/* lock the page head first in order to avoid an ABBA inefficiency */
-	if (!nfs_lock_request(head)) {
-		ret = nfs_wait_on_request(head);
-		nfs_release_request(head);
-		if (ret < 0)
-			return ERR_PTR(ret);
-		goto try_again;
-	}
-
-	/* Ensure that nobody removed the request before we locked it */
-	if (head != nfs_page_private_request(page) && !PageSwapCache(page)) {
-		nfs_unlock_and_release_request(head);
-		goto try_again;
-	}
-
-	ret = nfs_page_group_lock(head);
-	if (ret < 0)
-		goto release_request;
-
-	/* lock each request in the page group */
-	total_bytes = head->wb_bytes;
+	pgbase = head->wb_pgbase;
+	bytes = head->wb_bytes;
+	off = head->wb_offset;
 	for (subreq = head->wb_this_page; subreq != head;
 			subreq = subreq->wb_this_page) {
-
-		if (!kref_get_unless_zero(&subreq->wb_kref)) {
-			if (subreq->wb_offset == head->wb_offset + total_bytes)
-				total_bytes += subreq->wb_bytes;
-			continue;
-		}
-
-		while (!nfs_lock_request(subreq)) {
-			/*
-			 * Unlock page to allow nfs_page_group_sync_on_bit()
-			 * to succeed
-			 */
-			nfs_page_group_unlock(head);
-			ret = nfs_wait_on_request(subreq);
-			if (!ret)
-				ret = nfs_page_group_lock(head);
-			if (ret < 0) {
-				nfs_unroll_locks(inode, head, subreq);
-				nfs_release_request(subreq);
-				goto release_request;
-			}
-		}
-		/*
-		 * Subrequests are always contiguous, non overlapping
-		 * and in order - but may be repeated (mirrored writes).
-		 */
-		if (subreq->wb_offset == (head->wb_offset + total_bytes)) {
-			/* keep track of how many bytes this group covers */
-			total_bytes += subreq->wb_bytes;
-		} else if (WARN_ON_ONCE(subreq->wb_offset < head->wb_offset ||
-			    ((subreq->wb_offset + subreq->wb_bytes) >
-			     (head->wb_offset + total_bytes)))) {
-			nfs_page_group_unlock(head);
-			nfs_unroll_locks(inode, head, subreq);
-			nfs_unlock_and_release_request(subreq);
-			ret = -EIO;
-			goto release_request;
+		/* Subrequests should always form a contiguous range */
+		if (pgbase > subreq->wb_pgbase) {
+			off -= pgbase - subreq->wb_pgbase;
+			bytes += pgbase - subreq->wb_pgbase;
+			pgbase = subreq->wb_pgbase;
 		}
+		bytes = max(subreq->wb_pgbase + subreq->wb_bytes
+				- pgbase, bytes);
 	}
 
+	/* Set the head request's range to cover the former page group */
+	head->wb_pgbase = pgbase;
+	head->wb_bytes = bytes;
+	head->wb_offset = off;
+
 	/* Now that all requests are locked, make sure they aren't on any list.
 	 * Commit list removal accounting is done after locks are dropped */
 	subreq = head;
@@ -569,36 +541,52 @@ try_again:
 		/* destroy list will be terminated by head */
 		destroy_list = head->wb_this_page;
 		head->wb_this_page = head;
-
-		/* change head request to cover whole range that
-		 * the former page group covered */
-		head->wb_bytes = total_bytes;
 	}
 
-	/* Postpone destruction of this request */
-	if (test_and_clear_bit(PG_REMOVE, &head->wb_flags)) {
-		set_bit(PG_INODE_REF, &head->wb_flags);
-		kref_get(&head->wb_kref);
-		atomic_long_inc(&NFS_I(inode)->nrequests);
-	}
+	nfs_destroy_unlinked_subrequests(destroy_list, head, inode);
+}
 
-	nfs_page_group_unlock(head);
+/*
+ * nfs_lock_and_join_requests - join all subreqs to the head req
+ * @page: the page used to lookup the "page group" of nfs_page structures
+ *
+ * This function joins all sub requests to the head request by first
+ * locking all requests in the group, cancelling any pending operations
+ * and finally updating the head request to cover the whole range covered by
+ * the (former) group.  All subrequests are removed from any write or commit
+ * lists, unlinked from the group and destroyed.
+ *
+ * Returns a locked, referenced pointer to the head request - which after
+ * this call is guaranteed to be the only request associated with the page.
+ * Returns NULL if no requests are found for @page, or a ERR_PTR if an
+ * error was encountered.
+ */
+static struct nfs_page *
+nfs_lock_and_join_requests(struct page *page)
+{
+	struct inode *inode = page_file_mapping(page)->host;
+	struct nfs_page *head;
+	int ret;
 
-	nfs_destroy_unlinked_subrequests(destroy_list, head, inode);
+	/*
+	 * A reference is taken only on the head request which acts as a
+	 * reference to the whole page group - the group will not be destroyed
+	 * until the head reference is released.
+	 */
+	head = nfs_find_and_lock_page_request(page);
+	if (IS_ERR_OR_NULL(head))
+		return head;
 
-	/* Did we lose a race with nfs_inode_remove_request()? */
-	if (!(PagePrivate(page) || PageSwapCache(page))) {
+	/* lock each request in the page group */
+	ret = nfs_page_group_lock_subrequests(head);
+	if (ret < 0) {
 		nfs_unlock_and_release_request(head);
-		return NULL;
+		return ERR_PTR(ret);
 	}
 
-	/* still holds ref on head from nfs_page_find_head_request
-	 * and still has lock on head from lock loop */
-	return head;
+	nfs_join_page_group(head, inode);
 
-release_request:
-	nfs_unlock_and_release_request(head);
-	return ERR_PTR(ret);
+	return head;
 }
 
 static void nfs_write_error(struct nfs_page *req, int error)
@@ -1707,7 +1695,7 @@ int nfs_initiate_commit(struct rpc_clnt *clnt, struct nfs_commit_data *data,
 		.callback_ops = call_ops,
 		.callback_data = data,
 		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC | flags,
+		.flags = RPC_TASK_ASYNC | RPC_TASK_CRED_NOREF | flags,
 		.priority = priority,
 	};
 	/* Set up the initial task struct.  */
@@ -1746,14 +1734,19 @@ void nfs_init_commit(struct nfs_commit_data *data,
 		     struct pnfs_layout_segment *lseg,
 		     struct nfs_commit_info *cinfo)
 {
-	struct nfs_page *first = nfs_list_entry(head->next);
-	struct nfs_open_context *ctx = nfs_req_openctx(first);
-	struct inode *inode = d_inode(ctx->dentry);
+	struct nfs_page *first;
+	struct nfs_open_context *ctx;
+	struct inode *inode;
 
 	/* Set up the RPC argument and reply structs
 	 * NB: take care not to mess about with data->commit et al. */
 
-	list_splice_init(head, &data->pages);
+	if (head)
+		list_splice_init(head, &data->pages);
+
+	first = nfs_list_entry(data->pages.next);
+	ctx = nfs_req_openctx(first);
+	inode = d_inode(ctx->dentry);
 
 	data->inode	  = inode;
 	data->cred	  = ctx->cred;
@@ -1869,8 +1862,7 @@ static void nfs_commit_release_pages(struct nfs_commit_data *data)
 
 		/* Okay, COMMIT succeeded, apparently. Check the verifier
 		 * returned by the server against all stored verfs. */
-		if (verf->committed > NFS_UNSTABLE &&
-		    !nfs_write_verifier_cmp(&req->wb_verf, &verf->verifier)) {
+		if (nfs_write_match_verf(verf, req)) {
 			/* We have a match */
 			if (req->wb_page)
 				nfs_inode_remove_request(req);