summary refs log tree commit diff
path: root/fs
diff options
context:
space:
mode:
Diffstat (limited to 'fs')
-rw-r--r--fs/nfs/Makefile4
-rw-r--r--fs/nfs/blocklayout/blocklayout.c38
-rw-r--r--fs/nfs/direct.c117
-rw-r--r--fs/nfs/filelayout/Makefile5
-rw-r--r--fs/nfs/filelayout/filelayout.c (renamed from fs/nfs/nfs4filelayout.c)203
-rw-r--r--fs/nfs/filelayout/filelayout.h (renamed from fs/nfs/nfs4filelayout.h)2
-rw-r--r--fs/nfs/filelayout/filelayoutdev.c (renamed from fs/nfs/nfs4filelayoutdev.c)6
-rw-r--r--fs/nfs/getroot.c3
-rw-r--r--fs/nfs/inode.c26
-rw-r--r--fs/nfs/internal.h33
-rw-r--r--fs/nfs/nfs2xdr.c14
-rw-r--r--fs/nfs/nfs3proc.c21
-rw-r--r--fs/nfs/nfs3xdr.c16
-rw-r--r--fs/nfs/nfs4_fs.h4
-rw-r--r--fs/nfs/nfs4file.c3
-rw-r--r--fs/nfs/nfs4proc.c56
-rw-r--r--fs/nfs/nfs4state.c6
-rw-r--r--fs/nfs/nfs4trace.h8
-rw-r--r--fs/nfs/nfs4xdr.c19
-rw-r--r--fs/nfs/objlayout/objio_osd.c24
-rw-r--r--fs/nfs/objlayout/objlayout.c24
-rw-r--r--fs/nfs/objlayout/objlayout.h8
-rw-r--r--fs/nfs/pagelist.c633
-rw-r--r--fs/nfs/pnfs.c166
-rw-r--r--fs/nfs/pnfs.h30
-rw-r--r--fs/nfs/proc.c21
-rw-r--r--fs/nfs/read.c414
-rw-r--r--fs/nfs/super.c27
-rw-r--r--fs/nfs/write.c588
29 files changed, 1316 insertions, 1203 deletions
diff --git a/fs/nfs/Makefile b/fs/nfs/Makefile
index 03192a66c143..4782e0840dcc 100644
--- a/fs/nfs/Makefile
+++ b/fs/nfs/Makefile
@@ -29,8 +29,6 @@ nfsv4-$(CONFIG_NFS_USE_LEGACY_DNS) += cache_lib.o
 nfsv4-$(CONFIG_SYSCTL)	+= nfs4sysctl.o
 nfsv4-$(CONFIG_NFS_V4_1)	+= pnfs.o pnfs_dev.o
 
-obj-$(CONFIG_PNFS_FILE_LAYOUT) += nfs_layout_nfsv41_files.o
-nfs_layout_nfsv41_files-y := nfs4filelayout.o nfs4filelayoutdev.o
-
+obj-$(CONFIG_PNFS_FILE_LAYOUT) += filelayout/
 obj-$(CONFIG_PNFS_OBJLAYOUT) += objlayout/
 obj-$(CONFIG_PNFS_BLOCK) += blocklayout/
diff --git a/fs/nfs/blocklayout/blocklayout.c b/fs/nfs/blocklayout/blocklayout.c
index 65d849bdf77a..9b431f44fad9 100644
--- a/fs/nfs/blocklayout/blocklayout.c
+++ b/fs/nfs/blocklayout/blocklayout.c
@@ -210,7 +210,7 @@ static void bl_end_io_read(struct bio *bio, int err)
 			SetPageUptodate(bvec->bv_page);
 
 	if (err) {
-		struct nfs_read_data *rdata = par->data;
+		struct nfs_pgio_data *rdata = par->data;
 		struct nfs_pgio_header *header = rdata->header;
 
 		if (!header->pnfs_error)
@@ -224,17 +224,17 @@ static void bl_end_io_read(struct bio *bio, int err)
 static void bl_read_cleanup(struct work_struct *work)
 {
 	struct rpc_task *task;
-	struct nfs_read_data *rdata;
+	struct nfs_pgio_data *rdata;
 	dprintk("%s enter\n", __func__);
 	task = container_of(work, struct rpc_task, u.tk_work);
-	rdata = container_of(task, struct nfs_read_data, task);
+	rdata = container_of(task, struct nfs_pgio_data, task);
 	pnfs_ld_read_done(rdata);
 }
 
 static void
 bl_end_par_io_read(void *data, int unused)
 {
-	struct nfs_read_data *rdata = data;
+	struct nfs_pgio_data *rdata = data;
 
 	rdata->task.tk_status = rdata->header->pnfs_error;
 	INIT_WORK(&rdata->task.u.tk_work, bl_read_cleanup);
@@ -242,7 +242,7 @@ bl_end_par_io_read(void *data, int unused)
 }
 
 static enum pnfs_try_status
-bl_read_pagelist(struct nfs_read_data *rdata)
+bl_read_pagelist(struct nfs_pgio_data *rdata)
 {
 	struct nfs_pgio_header *header = rdata->header;
 	int i, hole;
@@ -390,7 +390,7 @@ static void bl_end_io_write_zero(struct bio *bio, int err)
 	}
 
 	if (unlikely(err)) {
-		struct nfs_write_data *data = par->data;
+		struct nfs_pgio_data *data = par->data;
 		struct nfs_pgio_header *header = data->header;
 
 		if (!header->pnfs_error)
@@ -405,7 +405,7 @@ static void bl_end_io_write(struct bio *bio, int err)
 {
 	struct parallel_io *par = bio->bi_private;
 	const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
-	struct nfs_write_data *data = par->data;
+	struct nfs_pgio_data *data = par->data;
 	struct nfs_pgio_header *header = data->header;
 
 	if (!uptodate) {
@@ -423,10 +423,10 @@ static void bl_end_io_write(struct bio *bio, int err)
 static void bl_write_cleanup(struct work_struct *work)
 {
 	struct rpc_task *task;
-	struct nfs_write_data *wdata;
+	struct nfs_pgio_data *wdata;
 	dprintk("%s enter\n", __func__);
 	task = container_of(work, struct rpc_task, u.tk_work);
-	wdata = container_of(task, struct nfs_write_data, task);
+	wdata = container_of(task, struct nfs_pgio_data, task);
 	if (likely(!wdata->header->pnfs_error)) {
 		/* Marks for LAYOUTCOMMIT */
 		mark_extents_written(BLK_LSEG2EXT(wdata->header->lseg),
@@ -438,7 +438,7 @@ static void bl_write_cleanup(struct work_struct *work)
 /* Called when last of bios associated with a bl_write_pagelist call finishes */
 static void bl_end_par_io_write(void *data, int num_se)
 {
-	struct nfs_write_data *wdata = data;
+	struct nfs_pgio_data *wdata = data;
 
 	if (unlikely(wdata->header->pnfs_error)) {
 		bl_free_short_extents(&BLK_LSEG2EXT(wdata->header->lseg)->bl_inval,
@@ -673,7 +673,7 @@ check_page:
 }
 
 static enum pnfs_try_status
-bl_write_pagelist(struct nfs_write_data *wdata, int sync)
+bl_write_pagelist(struct nfs_pgio_data *wdata, int sync)
 {
 	struct nfs_pgio_header *header = wdata->header;
 	int i, ret, npg_zero, pg_index, last = 0;
@@ -1189,13 +1189,17 @@ bl_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 		pnfs_generic_pg_init_read(pgio, req);
 }
 
-static bool
+/*
+ * Return 0 if @req cannot be coalesced into @pgio, otherwise return the number
+ * of bytes (maximum @req->wb_bytes) that can be coalesced.
+ */
+static size_t
 bl_pg_test_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
 		struct nfs_page *req)
 {
 	if (pgio->pg_dreq != NULL &&
 	    !is_aligned_req(req, SECTOR_SIZE))
-		return false;
+		return 0;
 
 	return pnfs_generic_pg_test(pgio, prev, req);
 }
@@ -1241,13 +1245,17 @@ bl_pg_init_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
 	}
 }
 
-static bool
+/*
+ * Return 0 if @req cannot be coalesced into @pgio, otherwise return the number
+ * of bytes (maximum @req->wb_bytes) that can be coalesced.
+ */
+static size_t
 bl_pg_test_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
 		 struct nfs_page *req)
 {
 	if (pgio->pg_dreq != NULL &&
 	    !is_aligned_req(req, PAGE_CACHE_SIZE))
-		return false;
+		return 0;
 
 	return pnfs_generic_pg_test(pgio, prev, req);
 }
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index b8797ae6831f..4ad7bc388679 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -108,6 +108,97 @@ static inline int put_dreq(struct nfs_direct_req *dreq)
 	return atomic_dec_and_test(&dreq->io_count);
 }
 
+/*
+ * nfs_direct_select_verf - select the right verifier
+ * @dreq - direct request possibly spanning multiple servers
+ * @ds_clp - nfs_client of data server or NULL if MDS / non-pnfs
+ * @ds_idx - index of data server in data server list, only valid if ds_clp set
+ *
+ * returns the correct verifier to use given the role of the server
+ */
+static struct nfs_writeverf *
+nfs_direct_select_verf(struct nfs_direct_req *dreq,
+		       struct nfs_client *ds_clp,
+		       int ds_idx)
+{
+	struct nfs_writeverf *verfp = &dreq->verf;
+
+#ifdef CONFIG_NFS_V4_1
+	if (ds_clp) {
+		/* pNFS is in use, use the DS verf */
+		if (ds_idx >= 0 && ds_idx < dreq->ds_cinfo.nbuckets)
+			verfp = &dreq->ds_cinfo.buckets[ds_idx].direct_verf;
+		else
+			WARN_ON_ONCE(1);
+	}
+#endif
+	return verfp;
+}
+
+
+/*
+ * nfs_direct_set_hdr_verf - set the write/commit verifier
+ * @dreq - direct request possibly spanning multiple servers
+ * @hdr - pageio header to validate against previously seen verfs
+ *
+ * Set the server's (MDS or DS) "seen" verifier
+ */
+static void nfs_direct_set_hdr_verf(struct nfs_direct_req *dreq,
+				    struct nfs_pgio_header *hdr)
+{
+	struct nfs_writeverf *verfp;
+
+	verfp = nfs_direct_select_verf(dreq, hdr->data->ds_clp,
+				      hdr->data->ds_idx);
+	WARN_ON_ONCE(verfp->committed >= 0);
+	memcpy(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
+	WARN_ON_ONCE(verfp->committed < 0);
+}
+
+/*
+ * nfs_direct_cmp_hdr_verf - compare verifier for pgio header
+ * @dreq - direct request possibly spanning multiple servers
+ * @hdr - pageio header to validate against previously seen verf
+ *
+ * set the server's "seen" verf if not initialized.
+ * returns result of comparison between @hdr->verf and the "seen"
+ * verf of the server used by @hdr (DS or MDS)
+ */
+static int nfs_direct_set_or_cmp_hdr_verf(struct nfs_direct_req *dreq,
+					  struct nfs_pgio_header *hdr)
+{
+	struct nfs_writeverf *verfp;
+
+	verfp = nfs_direct_select_verf(dreq, hdr->data->ds_clp,
+					 hdr->data->ds_idx);
+	if (verfp->committed < 0) {
+		nfs_direct_set_hdr_verf(dreq, hdr);
+		return 0;
+	}
+	return memcmp(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
+}
+
+#if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
+/*
+ * nfs_direct_cmp_commit_data_verf - compare verifier for commit data
+ * @dreq - direct request possibly spanning multiple servers
+ * @data - commit data to validate against previously seen verf
+ *
+ * returns result of comparison between @data->verf and the verf of
+ * the server used by @data (DS or MDS)
+ */
+static int nfs_direct_cmp_commit_data_verf(struct nfs_direct_req *dreq,
+					   struct nfs_commit_data *data)
+{
+	struct nfs_writeverf *verfp;
+
+	verfp = nfs_direct_select_verf(dreq, data->ds_clp,
+					 data->ds_commit_index);
+	WARN_ON_ONCE(verfp->committed < 0);
+	return memcmp(verfp, &data->verf, sizeof(struct nfs_writeverf));
+}
+#endif
+
 /**
  * nfs_direct_IO - NFS address space operation for direct I/O
  * @rw: direction (read or write)
@@ -168,6 +259,7 @@ static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
 	kref_get(&dreq->kref);
 	init_completion(&dreq->completion);
 	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
+	dreq->verf.committed = NFS_INVALID_STABLE_HOW;	/* not set yet */
 	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
 	spin_lock_init(&dreq->lock);
 
@@ -380,8 +472,7 @@ static ssize_t nfs_direct_read_schedule_segment(struct nfs_pageio_descriptor *de
 			struct nfs_page *req;
 			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 			/* XXX do we need to do the eof zeroing found in async_filler? */
-			req = nfs_create_request(dreq->ctx, dreq->inode,
-						 pagevec[i],
+			req = nfs_create_request(dreq->ctx, pagevec[i], NULL,
 						 pgbase, req_len);
 			if (IS_ERR(req)) {
 				result = PTR_ERR(req);
@@ -424,7 +515,7 @@ static ssize_t nfs_direct_read_schedule_iovec(struct nfs_direct_req *dreq,
 	size_t requested_bytes = 0;
 	unsigned long seg;
 
-	NFS_PROTO(dreq->inode)->read_pageio_init(&desc, dreq->inode,
+	nfs_pageio_init_read(&desc, dreq->inode, false,
 			     &nfs_direct_read_completion_ops);
 	get_dreq(dreq);
 	desc.pg_dreq = dreq;
@@ -564,7 +655,7 @@ static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq)
 	dreq->count = 0;
 	get_dreq(dreq);
 
-	NFS_PROTO(dreq->inode)->write_pageio_init(&desc, dreq->inode, FLUSH_STABLE,
+	nfs_pageio_init_write(&desc, dreq->inode, FLUSH_STABLE, false,
 			      &nfs_direct_write_completion_ops);
 	desc.pg_dreq = dreq;
 
@@ -603,7 +694,7 @@ static void nfs_direct_commit_complete(struct nfs_commit_data *data)
 		dprintk("NFS: %5u commit failed with error %d.\n",
 			data->task.tk_pid, status);
 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
-	} else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
+	} else if (nfs_direct_cmp_commit_data_verf(dreq, data)) {
 		dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 	}
@@ -750,8 +841,7 @@ static ssize_t nfs_direct_write_schedule_segment(struct nfs_pageio_descriptor *d
 			struct nfs_page *req;
 			unsigned int req_len = min_t(size_t, bytes, PAGE_SIZE - pgbase);
 
-			req = nfs_create_request(dreq->ctx, dreq->inode,
-						 pagevec[i],
+			req = nfs_create_request(dreq->ctx, pagevec[i], NULL,
 						 pgbase, req_len);
 			if (IS_ERR(req)) {
 				result = PTR_ERR(req);
@@ -813,13 +903,13 @@ static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
 			if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
 				bit = NFS_IOHDR_NEED_RESCHED;
 			else if (dreq->flags == 0) {
-				memcpy(&dreq->verf, hdr->verf,
-				       sizeof(dreq->verf));
+				nfs_direct_set_hdr_verf(dreq, hdr);
 				bit = NFS_IOHDR_NEED_COMMIT;
 				dreq->flags = NFS_ODIRECT_DO_COMMIT;
 			} else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
-				if (memcmp(&dreq->verf, hdr->verf, sizeof(dreq->verf))) {
-					dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
+				if (nfs_direct_set_or_cmp_hdr_verf(dreq, hdr)) {
+					dreq->flags =
+						NFS_ODIRECT_RESCHED_WRITES;
 					bit = NFS_IOHDR_NEED_RESCHED;
 				} else
 					bit = NFS_IOHDR_NEED_COMMIT;
@@ -829,6 +919,8 @@ static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
 	spin_unlock(&dreq->lock);
 
 	while (!list_empty(&hdr->pages)) {
+		bool do_destroy = true;
+
 		req = nfs_list_entry(hdr->pages.next);
 		nfs_list_remove_request(req);
 		switch (bit) {
@@ -836,6 +928,7 @@ static void nfs_direct_write_completion(struct nfs_pgio_header *hdr)
 		case NFS_IOHDR_NEED_COMMIT:
 			kref_get(&req->wb_kref);
 			nfs_mark_request_commit(req, hdr->lseg, &cinfo);
+			do_destroy = false;
 		}
 		nfs_unlock_and_release_request(req);
 	}
@@ -874,7 +967,7 @@ static ssize_t nfs_direct_write_schedule_iovec(struct nfs_direct_req *dreq,
 	size_t requested_bytes = 0;
 	unsigned long seg;
 
-	NFS_PROTO(inode)->write_pageio_init(&desc, inode, FLUSH_COND_STABLE,
+	nfs_pageio_init_write(&desc, inode, FLUSH_COND_STABLE, false,
 			      &nfs_direct_write_completion_ops);
 	desc.pg_dreq = dreq;
 	get_dreq(dreq);
diff --git a/fs/nfs/filelayout/Makefile b/fs/nfs/filelayout/Makefile
new file mode 100644
index 000000000000..8516cdffb9e9
--- /dev/null
+++ b/fs/nfs/filelayout/Makefile
@@ -0,0 +1,5 @@
+#
+# Makefile for the pNFS Files Layout Driver kernel module
+#
+obj-$(CONFIG_PNFS_FILE_LAYOUT) += nfs_layout_nfsv41_files.o
+nfs_layout_nfsv41_files-y := filelayout.o filelayoutdev.o
diff --git a/fs/nfs/nfs4filelayout.c b/fs/nfs/filelayout/filelayout.c
index b9a35c05b60f..d2eba1c13b7e 100644
--- a/fs/nfs/nfs4filelayout.c
+++ b/fs/nfs/filelayout/filelayout.c
@@ -35,11 +35,11 @@
 
 #include <linux/sunrpc/metrics.h>
 
-#include "nfs4session.h"
-#include "internal.h"
-#include "delegation.h"
-#include "nfs4filelayout.h"
-#include "nfs4trace.h"
+#include "../nfs4session.h"
+#include "../internal.h"
+#include "../delegation.h"
+#include "filelayout.h"
+#include "../nfs4trace.h"
 
 #define NFSDBG_FACILITY         NFSDBG_PNFS_LD
 
@@ -84,7 +84,7 @@ filelayout_get_dserver_offset(struct pnfs_layout_segment *lseg, loff_t offset)
 	BUG();
 }
 
-static void filelayout_reset_write(struct nfs_write_data *data)
+static void filelayout_reset_write(struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 	struct rpc_task *task = &data->task;
@@ -105,7 +105,7 @@ static void filelayout_reset_write(struct nfs_write_data *data)
 	}
 }
 
-static void filelayout_reset_read(struct nfs_read_data *data)
+static void filelayout_reset_read(struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 	struct rpc_task *task = &data->task;
@@ -243,7 +243,7 @@ wait_on_recovery:
 /* NFS_PROTO call done callback routines */
 
 static int filelayout_read_done_cb(struct rpc_task *task,
-				struct nfs_read_data *data)
+				struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 	int err;
@@ -270,7 +270,7 @@ static int filelayout_read_done_cb(struct rpc_task *task,
  * rfc5661 is not clear about which credential should be used.
  */
 static void
-filelayout_set_layoutcommit(struct nfs_write_data *wdata)
+filelayout_set_layoutcommit(struct nfs_pgio_data *wdata)
 {
 	struct nfs_pgio_header *hdr = wdata->header;
 
@@ -279,7 +279,7 @@ filelayout_set_layoutcommit(struct nfs_write_data *wdata)
 		return;
 
 	pnfs_set_layoutcommit(wdata);
-	dprintk("%s ionde %lu pls_end_pos %lu\n", __func__, hdr->inode->i_ino,
+	dprintk("%s inode %lu pls_end_pos %lu\n", __func__, hdr->inode->i_ino,
 		(unsigned long) NFS_I(hdr->inode)->layout->plh_lwb);
 }
 
@@ -305,7 +305,7 @@ filelayout_reset_to_mds(struct pnfs_layout_segment *lseg)
  */
 static void filelayout_read_prepare(struct rpc_task *task, void *data)
 {
-	struct nfs_read_data *rdata = data;
+	struct nfs_pgio_data *rdata = data;
 
 	if (unlikely(test_bit(NFS_CONTEXT_BAD, &rdata->args.context->flags))) {
 		rpc_exit(task, -EIO);
@@ -317,7 +317,7 @@ static void filelayout_read_prepare(struct rpc_task *task, void *data)
 		rpc_exit(task, 0);
 		return;
 	}
-	rdata->read_done_cb = filelayout_read_done_cb;
+	rdata->pgio_done_cb = filelayout_read_done_cb;
 
 	if (nfs41_setup_sequence(rdata->ds_clp->cl_session,
 			&rdata->args.seq_args,
@@ -331,7 +331,7 @@ static void filelayout_read_prepare(struct rpc_task *task, void *data)
 
 static void filelayout_read_call_done(struct rpc_task *task, void *data)
 {
-	struct nfs_read_data *rdata = data;
+	struct nfs_pgio_data *rdata = data;
 
 	dprintk("--> %s task->tk_status %d\n", __func__, task->tk_status);
 
@@ -347,14 +347,14 @@ static void filelayout_read_call_done(struct rpc_task *task, void *data)
 
 static void filelayout_read_count_stats(struct rpc_task *task, void *data)
 {
-	struct nfs_read_data *rdata = data;
+	struct nfs_pgio_data *rdata = data;
 
 	rpc_count_iostats(task, NFS_SERVER(rdata->header->inode)->client->cl_metrics);
 }
 
 static void filelayout_read_release(void *data)
 {
-	struct nfs_read_data *rdata = data;
+	struct nfs_pgio_data *rdata = data;
 	struct pnfs_layout_hdr *lo = rdata->header->lseg->pls_layout;
 
 	filelayout_fenceme(lo->plh_inode, lo);
@@ -363,7 +363,7 @@ static void filelayout_read_release(void *data)
 }
 
 static int filelayout_write_done_cb(struct rpc_task *task,
-				struct nfs_write_data *data)
+				struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 	int err;
@@ -419,7 +419,7 @@ static int filelayout_commit_done_cb(struct rpc_task *task,
 
 static void filelayout_write_prepare(struct rpc_task *task, void *data)
 {
-	struct nfs_write_data *wdata = data;
+	struct nfs_pgio_data *wdata = data;
 
 	if (unlikely(test_bit(NFS_CONTEXT_BAD, &wdata->args.context->flags))) {
 		rpc_exit(task, -EIO);
@@ -443,7 +443,7 @@ static void filelayout_write_prepare(struct rpc_task *task, void *data)
 
 static void filelayout_write_call_done(struct rpc_task *task, void *data)
 {
-	struct nfs_write_data *wdata = data;
+	struct nfs_pgio_data *wdata = data;
 
 	if (test_bit(NFS_IOHDR_REDO, &wdata->header->flags) &&
 	    task->tk_status == 0) {
@@ -457,14 +457,14 @@ static void filelayout_write_call_done(struct rpc_task *task, void *data)
 
 static void filelayout_write_count_stats(struct rpc_task *task, void *data)
 {
-	struct nfs_write_data *wdata = data;
+	struct nfs_pgio_data *wdata = data;
 
 	rpc_count_iostats(task, NFS_SERVER(wdata->header->inode)->client->cl_metrics);
 }
 
 static void filelayout_write_release(void *data)
 {
-	struct nfs_write_data *wdata = data;
+	struct nfs_pgio_data *wdata = data;
 	struct pnfs_layout_hdr *lo = wdata->header->lseg->pls_layout;
 
 	filelayout_fenceme(lo->plh_inode, lo);
@@ -529,7 +529,7 @@ static const struct rpc_call_ops filelayout_commit_call_ops = {
 };
 
 static enum pnfs_try_status
-filelayout_read_pagelist(struct nfs_read_data *data)
+filelayout_read_pagelist(struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 	struct pnfs_layout_segment *lseg = hdr->lseg;
@@ -560,6 +560,7 @@ filelayout_read_pagelist(struct nfs_read_data *data)
 	/* No multipath support. Use first DS */
 	atomic_inc(&ds->ds_clp->cl_count);
 	data->ds_clp = ds->ds_clp;
+	data->ds_idx = idx;
 	fh = nfs4_fl_select_ds_fh(lseg, j);
 	if (fh)
 		data->args.fh = fh;
@@ -568,14 +569,14 @@ filelayout_read_pagelist(struct nfs_read_data *data)
 	data->mds_offset = offset;
 
 	/* Perform an asynchronous read to ds */
-	nfs_initiate_read(ds_clnt, data,
-				  &filelayout_read_call_ops, RPC_TASK_SOFTCONN);
+	nfs_initiate_pgio(ds_clnt, data,
+			    &filelayout_read_call_ops, 0, RPC_TASK_SOFTCONN);
 	return PNFS_ATTEMPTED;
 }
 
 /* Perform async writes. */
 static enum pnfs_try_status
-filelayout_write_pagelist(struct nfs_write_data *data, int sync)
+filelayout_write_pagelist(struct nfs_pgio_data *data, int sync)
 {
 	struct nfs_pgio_header *hdr = data->header;
 	struct pnfs_layout_segment *lseg = hdr->lseg;
@@ -600,20 +601,18 @@ filelayout_write_pagelist(struct nfs_write_data *data, int sync)
 		__func__, hdr->inode->i_ino, sync, (size_t) data->args.count,
 		offset, ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count));
 
-	data->write_done_cb = filelayout_write_done_cb;
+	data->pgio_done_cb = filelayout_write_done_cb;
 	atomic_inc(&ds->ds_clp->cl_count);
 	data->ds_clp = ds->ds_clp;
+	data->ds_idx = idx;
 	fh = nfs4_fl_select_ds_fh(lseg, j);
 	if (fh)
 		data->args.fh = fh;
-	/*
-	 * Get the file offset on the dserver. Set the write offset to
-	 * this offset and save the original offset.
-	 */
+
 	data->args.offset = filelayout_get_dserver_offset(lseg, offset);
 
 	/* Perform an asynchronous write */
-	nfs_initiate_write(ds_clnt, data,
+	nfs_initiate_pgio(ds_clnt, data,
 				    &filelayout_write_call_ops, sync,
 				    RPC_TASK_SOFTCONN);
 	return PNFS_ATTEMPTED;
@@ -637,7 +636,6 @@ filelayout_check_layout(struct pnfs_layout_hdr *lo,
 	struct nfs4_deviceid_node *d;
 	struct nfs4_file_layout_dsaddr *dsaddr;
 	int status = -EINVAL;
-	struct nfs_server *nfss = NFS_SERVER(lo->plh_inode);
 
 	dprintk("--> %s\n", __func__);
 
@@ -655,7 +653,7 @@ filelayout_check_layout(struct pnfs_layout_hdr *lo,
 		goto out;
 	}
 
-	if (!fl->stripe_unit || fl->stripe_unit % PAGE_SIZE) {
+	if (!fl->stripe_unit) {
 		dprintk("%s Invalid stripe unit (%u)\n",
 			__func__, fl->stripe_unit);
 		goto out;
@@ -692,12 +690,6 @@ filelayout_check_layout(struct pnfs_layout_hdr *lo,
 		goto out_put;
 	}
 
-	if (fl->stripe_unit % nfss->rsize || fl->stripe_unit % nfss->wsize) {
-		dprintk("%s Stripe unit (%u) not aligned with rsize %u "
-			"wsize %u\n", __func__, fl->stripe_unit, nfss->rsize,
-			nfss->wsize);
-	}
-
 	status = 0;
 out:
 	dprintk("--> %s returns %d\n", __func__, status);
@@ -850,11 +842,15 @@ filelayout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 {
 	struct nfs4_filelayout_segment *fl = FILELAYOUT_LSEG(lseg);
 	struct pnfs_commit_bucket *buckets;
-	int size;
+	int size, i;
 
 	if (fl->commit_through_mds)
 		return 0;
-	if (cinfo->ds->nbuckets != 0) {
+
+	size = (fl->stripe_type == STRIPE_SPARSE) ?
+		fl->dsaddr->ds_num : fl->dsaddr->stripe_count;
+
+	if (cinfo->ds->nbuckets >= size) {
 		/* This assumes there is only one IOMODE_RW lseg.  What
 		 * we really want to do is have a layout_hdr level
 		 * dictionary of <multipath_list4, fh> keys, each
@@ -864,30 +860,36 @@ filelayout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 		return 0;
 	}
 
-	size = (fl->stripe_type == STRIPE_SPARSE) ?
-		fl->dsaddr->ds_num : fl->dsaddr->stripe_count;
-
 	buckets = kcalloc(size, sizeof(struct pnfs_commit_bucket),
 			  gfp_flags);
 	if (!buckets)
 		return -ENOMEM;
-	else {
-		int i;
+	for (i = 0; i < size; i++) {
+		INIT_LIST_HEAD(&buckets[i].written);
+		INIT_LIST_HEAD(&buckets[i].committing);
+		/* mark direct verifier as unset */
+		buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW;
+	}
 
-		spin_lock(cinfo->lock);
-		if (cinfo->ds->nbuckets != 0)
-			kfree(buckets);
-		else {
-			cinfo->ds->buckets = buckets;
-			cinfo->ds->nbuckets = size;
-			for (i = 0; i < size; i++) {
-				INIT_LIST_HEAD(&buckets[i].written);
-				INIT_LIST_HEAD(&buckets[i].committing);
-			}
-		}
-		spin_unlock(cinfo->lock);
-		return 0;
+	spin_lock(cinfo->lock);
+	if (cinfo->ds->nbuckets >= size)
+		goto out;
+	for (i = 0; i < cinfo->ds->nbuckets; i++) {
+		list_splice(&cinfo->ds->buckets[i].written,
+			    &buckets[i].written);
+		list_splice(&cinfo->ds->buckets[i].committing,
+			    &buckets[i].committing);
+		buckets[i].direct_verf.committed =
+			cinfo->ds->buckets[i].direct_verf.committed;
+		buckets[i].wlseg = cinfo->ds->buckets[i].wlseg;
+		buckets[i].clseg = cinfo->ds->buckets[i].clseg;
 	}
+	swap(cinfo->ds->buckets, buckets);
+	cinfo->ds->nbuckets = size;
+out:
+	spin_unlock(cinfo->lock);
+	kfree(buckets);
+	return 0;
 }
 
 static struct pnfs_layout_segment *
@@ -915,47 +917,51 @@ filelayout_alloc_lseg(struct pnfs_layout_hdr *layoutid,
 /*
  * filelayout_pg_test(). Called by nfs_can_coalesce_requests()
  *
- * return true  : coalesce page
- * return false : don't coalesce page
+ * Return 0 if @req cannot be coalesced into @pgio, otherwise return the number
+ * of bytes (maximum @req->wb_bytes) that can be coalesced.
  */
-static bool
+static size_t
 filelayout_pg_test(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
 		   struct nfs_page *req)
 {
+	unsigned int size;
 	u64 p_stripe, r_stripe;
-	u32 stripe_unit;
+	u32 stripe_offset;
+	u64 segment_offset = pgio->pg_lseg->pls_range.offset;
+	u32 stripe_unit = FILELAYOUT_LSEG(pgio->pg_lseg)->stripe_unit;
 
-	if (!pnfs_generic_pg_test(pgio, prev, req) ||
-	    !nfs_generic_pg_test(pgio, prev, req))
-		return false;
+	/* calls nfs_generic_pg_test */
+	size = pnfs_generic_pg_test(pgio, prev, req);
+	if (!size)
+		return 0;
 
-	p_stripe = (u64)req_offset(prev);
-	r_stripe = (u64)req_offset(req);
-	stripe_unit = FILELAYOUT_LSEG(pgio->pg_lseg)->stripe_unit;
+	/* see if req and prev are in the same stripe */
+	if (prev) {
+		p_stripe = (u64)req_offset(prev) - segment_offset;
+		r_stripe = (u64)req_offset(req) - segment_offset;
+		do_div(p_stripe, stripe_unit);
+		do_div(r_stripe, stripe_unit);
 
-	do_div(p_stripe, stripe_unit);
-	do_div(r_stripe, stripe_unit);
+		if (p_stripe != r_stripe)
+			return 0;
+	}
 
-	return (p_stripe == r_stripe);
+	/* calculate remaining bytes in the current stripe */
+	div_u64_rem((u64)req_offset(req) - segment_offset,
+			stripe_unit,
+			&stripe_offset);
+	WARN_ON_ONCE(stripe_offset > stripe_unit);
+	if (stripe_offset >= stripe_unit)
+		return 0;
+	return min(stripe_unit - (unsigned int)stripe_offset, size);
 }
 
 static void
 filelayout_pg_init_read(struct nfs_pageio_descriptor *pgio,
 			struct nfs_page *req)
 {
-	WARN_ON_ONCE(pgio->pg_lseg != NULL);
-
-	if (req->wb_offset != req->wb_pgbase) {
-		/*
-		 * Handling unaligned pages is difficult, because have to
-		 * somehow split a req in two in certain cases in the
-		 * pg.test code.  Avoid this by just not using pnfs
-		 * in this case.
-		 */
-		nfs_pageio_reset_read_mds(pgio);
-		return;
-	}
-	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
+	if (!pgio->pg_lseg)
+		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 					   req->wb_context,
 					   0,
 					   NFS4_MAX_UINT64,
@@ -973,11 +979,8 @@ filelayout_pg_init_write(struct nfs_pageio_descriptor *pgio,
 	struct nfs_commit_info cinfo;
 	int status;
 
-	WARN_ON_ONCE(pgio->pg_lseg != NULL);
-
-	if (req->wb_offset != req->wb_pgbase)
-		goto out_mds;
-	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
+	if (!pgio->pg_lseg)
+		pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 					   req->wb_context,
 					   0,
 					   NFS4_MAX_UINT64,
@@ -1067,6 +1070,7 @@ filelayout_choose_commit_list(struct nfs_page *req,
 	 */
 	j = nfs4_fl_calc_j_index(lseg, req_offset(req));
 	i = select_bucket_index(fl, j);
+	spin_lock(cinfo->lock);
 	buckets = cinfo->ds->buckets;
 	list = &buckets[i].written;
 	if (list_empty(list)) {
@@ -1080,6 +1084,7 @@ filelayout_choose_commit_list(struct nfs_page *req,
 	}
 	set_bit(PG_COMMIT_TO_DS, &req->wb_flags);
 	cinfo->ds->nwritten++;
+	spin_unlock(cinfo->lock);
 	return list;
 }
 
@@ -1176,6 +1181,7 @@ transfer_commit_list(struct list_head *src, struct list_head *dst,
 	return ret;
 }
 
+/* Note called with cinfo->lock held. */
 static int
 filelayout_scan_ds_commit_list(struct pnfs_commit_bucket *bucket,
 			       struct nfs_commit_info *cinfo,
@@ -1220,15 +1226,18 @@ static void filelayout_recover_commit_reqs(struct list_head *dst,
 					   struct nfs_commit_info *cinfo)
 {
 	struct pnfs_commit_bucket *b;
+	struct pnfs_layout_segment *freeme;
 	int i;
 
+restart:
 	spin_lock(cinfo->lock);
 	for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) {
 		if (transfer_commit_list(&b->written, dst, cinfo, 0)) {
-			spin_unlock(cinfo->lock);
-			pnfs_put_lseg(b->wlseg);
+			freeme = b->wlseg;
 			b->wlseg = NULL;
-			spin_lock(cinfo->lock);
+			spin_unlock(cinfo->lock);
+			pnfs_put_lseg(freeme);
+			goto restart;
 		}
 	}
 	cinfo->ds->nwritten = 0;
@@ -1243,6 +1252,7 @@ alloc_ds_commits(struct nfs_commit_info *cinfo, struct list_head *list)
 	struct nfs_commit_data *data;
 	int i, j;
 	unsigned int nreq = 0;
+	struct pnfs_layout_segment *freeme;
 
 	fl_cinfo = cinfo->ds;
 	bucket = fl_cinfo->buckets;
@@ -1253,8 +1263,10 @@ alloc_ds_commits(struct nfs_commit_info *cinfo, struct list_head *list)
 		if (!data)
 			break;
 		data->ds_commit_index = i;
+		spin_lock(cinfo->lock);
 		data->lseg = bucket->clseg;
 		bucket->clseg = NULL;
+		spin_unlock(cinfo->lock);
 		list_add(&data->pages, list);
 		nreq++;
 	}
@@ -1264,8 +1276,11 @@ alloc_ds_commits(struct nfs_commit_info *cinfo, struct list_head *list)
 		if (list_empty(&bucket->committing))
 			continue;
 		nfs_retry_commit(&bucket->committing, bucket->clseg, cinfo);
-		pnfs_put_lseg(bucket->clseg);
+		spin_lock(cinfo->lock);
+		freeme = bucket->clseg;
 		bucket->clseg = NULL;
+		spin_unlock(cinfo->lock);
+		pnfs_put_lseg(freeme);
 	}
 	/* Caller will clean up entries put on list */
 	return nreq;
@@ -1330,7 +1345,7 @@ filelayout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
 	struct nfs4_filelayout *flo;
 
 	flo = kzalloc(sizeof(*flo), gfp_flags);
-	return &flo->generic_hdr;
+	return flo != NULL ? &flo->generic_hdr : NULL;
 }
 
 static void
diff --git a/fs/nfs/nfs4filelayout.h b/fs/nfs/filelayout/filelayout.h
index cebd20e7e923..ffbddf2219ea 100644
--- a/fs/nfs/nfs4filelayout.h
+++ b/fs/nfs/filelayout/filelayout.h
@@ -30,7 +30,7 @@
 #ifndef FS_NFS_NFS4FILELAYOUT_H
 #define FS_NFS_NFS4FILELAYOUT_H
 
-#include "pnfs.h"
+#include "../pnfs.h"
 
 /*
  * Default data server connection timeout and retrans vaules.
diff --git a/fs/nfs/nfs4filelayoutdev.c b/fs/nfs/filelayout/filelayoutdev.c
index b9c61efe9660..44bf0140a4c7 100644
--- a/fs/nfs/nfs4filelayoutdev.c
+++ b/fs/nfs/filelayout/filelayoutdev.c
@@ -33,9 +33,9 @@
 #include <linux/module.h>
 #include <linux/sunrpc/addr.h>
 
-#include "internal.h"
-#include "nfs4session.h"
-#include "nfs4filelayout.h"
+#include "../internal.h"
+#include "../nfs4session.h"
+#include "filelayout.h"
 
 #define NFSDBG_FACILITY		NFSDBG_PNFS_LD
 
diff --git a/fs/nfs/getroot.c b/fs/nfs/getroot.c
index 66984a9aafaa..b94f80420a58 100644
--- a/fs/nfs/getroot.c
+++ b/fs/nfs/getroot.c
@@ -120,7 +120,8 @@ struct dentry *nfs_get_root(struct super_block *sb, struct nfs_fh *mntfh,
 
 	security_d_instantiate(ret, inode);
 	spin_lock(&ret->d_lock);
-	if (IS_ROOT(ret) && !(ret->d_flags & DCACHE_NFSFS_RENAMED)) {
+	if (IS_ROOT(ret) && !ret->d_fsdata &&
+	    !(ret->d_flags & DCACHE_NFSFS_RENAMED)) {
 		ret->d_fsdata = name;
 		name = NULL;
 	}
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index e6f7398d2b3c..c496f8a74639 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -1575,18 +1575,20 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 			inode->i_version = fattr->change_attr;
 		}
 	} else if (server->caps & NFS_CAP_CHANGE_ATTR)
-		invalid |= save_cache_validity;
+		nfsi->cache_validity |= save_cache_validity;
 
 	if (fattr->valid & NFS_ATTR_FATTR_MTIME) {
 		memcpy(&inode->i_mtime, &fattr->mtime, sizeof(inode->i_mtime));
 	} else if (server->caps & NFS_CAP_MTIME)
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATTR
 				| NFS_INO_REVAL_FORCED);
 
 	if (fattr->valid & NFS_ATTR_FATTR_CTIME) {
 		memcpy(&inode->i_ctime, &fattr->ctime, sizeof(inode->i_ctime));
 	} else if (server->caps & NFS_CAP_CTIME)
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATTR
 				| NFS_INO_REVAL_FORCED);
 
 	/* Check if our cached file size is stale */
@@ -1608,7 +1610,8 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 					(long long)new_isize);
 		}
 	} else
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATTR
 				| NFS_INO_REVAL_PAGECACHE
 				| NFS_INO_REVAL_FORCED);
 
@@ -1616,7 +1619,8 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 	if (fattr->valid & NFS_ATTR_FATTR_ATIME)
 		memcpy(&inode->i_atime, &fattr->atime, sizeof(inode->i_atime));
 	else if (server->caps & NFS_CAP_ATIME)
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATIME
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATIME
 				| NFS_INO_REVAL_FORCED);
 
 	if (fattr->valid & NFS_ATTR_FATTR_MODE) {
@@ -1627,7 +1631,8 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 			invalid |= NFS_INO_INVALID_ATTR|NFS_INO_INVALID_ACCESS|NFS_INO_INVALID_ACL;
 		}
 	} else if (server->caps & NFS_CAP_MODE)
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATTR
 				| NFS_INO_INVALID_ACCESS
 				| NFS_INO_INVALID_ACL
 				| NFS_INO_REVAL_FORCED);
@@ -1638,7 +1643,8 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 			inode->i_uid = fattr->uid;
 		}
 	} else if (server->caps & NFS_CAP_OWNER)
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATTR
 				| NFS_INO_INVALID_ACCESS
 				| NFS_INO_INVALID_ACL
 				| NFS_INO_REVAL_FORCED);
@@ -1649,7 +1655,8 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 			inode->i_gid = fattr->gid;
 		}
 	} else if (server->caps & NFS_CAP_OWNER_GROUP)
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATTR
 				| NFS_INO_INVALID_ACCESS
 				| NFS_INO_INVALID_ACL
 				| NFS_INO_REVAL_FORCED);
@@ -1662,7 +1669,8 @@ static int nfs_update_inode(struct inode *inode, struct nfs_fattr *fattr)
 			set_nlink(inode, fattr->nlink);
 		}
 	} else if (server->caps & NFS_CAP_NLINK)
-		invalid |= save_cache_validity & (NFS_INO_INVALID_ATTR
+		nfsi->cache_validity |= save_cache_validity &
+				(NFS_INO_INVALID_ATTR
 				| NFS_INO_REVAL_FORCED);
 
 	if (fattr->valid & NFS_ATTR_FATTR_SPACE_USED) {
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index dd8bfc2e2464..8b69cba1bb04 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -231,13 +231,20 @@ extern void nfs_destroy_writepagecache(void);
 
 extern int __init nfs_init_directcache(void);
 extern void nfs_destroy_directcache(void);
-extern bool nfs_pgarray_set(struct nfs_page_array *p, unsigned int pagecount);
 extern void nfs_pgheader_init(struct nfs_pageio_descriptor *desc,
 			      struct nfs_pgio_header *hdr,
 			      void (*release)(struct nfs_pgio_header *hdr));
 void nfs_set_pgio_error(struct nfs_pgio_header *hdr, int error, loff_t pos);
 int nfs_iocounter_wait(struct nfs_io_counter *c);
 
+extern const struct nfs_pageio_ops nfs_pgio_rw_ops;
+struct nfs_rw_header *nfs_rw_header_alloc(const struct nfs_rw_ops *);
+void nfs_rw_header_free(struct nfs_pgio_header *);
+void nfs_pgio_data_release(struct nfs_pgio_data *);
+int nfs_generic_pgio(struct nfs_pageio_descriptor *, struct nfs_pgio_header *);
+int nfs_initiate_pgio(struct rpc_clnt *, struct nfs_pgio_data *,
+		      const struct rpc_call_ops *, int, int);
+
 static inline void nfs_iocounter_init(struct nfs_io_counter *c)
 {
 	c->flags = 0;
@@ -395,19 +402,11 @@ extern int nfs4_get_rootfh(struct nfs_server *server, struct nfs_fh *mntfh, bool
 
 struct nfs_pgio_completion_ops;
 /* read.c */
-extern struct nfs_read_header *nfs_readhdr_alloc(void);
-extern void nfs_readhdr_free(struct nfs_pgio_header *hdr);
 extern void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
-			struct inode *inode,
+			struct inode *inode, bool force_mds,
 			const struct nfs_pgio_completion_ops *compl_ops);
-extern int nfs_initiate_read(struct rpc_clnt *clnt,
-			     struct nfs_read_data *data,
-			     const struct rpc_call_ops *call_ops, int flags);
 extern void nfs_read_prepare(struct rpc_task *task, void *calldata);
-extern int nfs_generic_pagein(struct nfs_pageio_descriptor *desc,
-			      struct nfs_pgio_header *hdr);
 extern void nfs_pageio_reset_read_mds(struct nfs_pageio_descriptor *pgio);
-extern void nfs_readdata_release(struct nfs_read_data *rdata);
 
 /* super.c */
 void nfs_clone_super(struct super_block *, struct nfs_mount_info *);
@@ -422,19 +421,10 @@ int nfs_remount(struct super_block *sb, int *flags, char *raw_data);
 
 /* write.c */
 extern void nfs_pageio_init_write(struct nfs_pageio_descriptor *pgio,
-			struct inode *inode, int ioflags,
+			struct inode *inode, int ioflags, bool force_mds,
 			const struct nfs_pgio_completion_ops *compl_ops);
-extern struct nfs_write_header *nfs_writehdr_alloc(void);
-extern void nfs_writehdr_free(struct nfs_pgio_header *hdr);
-extern int nfs_generic_flush(struct nfs_pageio_descriptor *desc,
-			     struct nfs_pgio_header *hdr);
 extern void nfs_pageio_reset_write_mds(struct nfs_pageio_descriptor *pgio);
-extern void nfs_writedata_release(struct nfs_write_data *wdata);
 extern void nfs_commit_free(struct nfs_commit_data *p);
-extern int nfs_initiate_write(struct rpc_clnt *clnt,
-			      struct nfs_write_data *data,
-			      const struct rpc_call_ops *call_ops,
-			      int how, int flags);
 extern void nfs_write_prepare(struct rpc_task *task, void *calldata);
 extern void nfs_commit_prepare(struct rpc_task *task, void *calldata);
 extern int nfs_initiate_commit(struct rpc_clnt *clnt,
@@ -447,6 +437,7 @@ extern void nfs_init_commit(struct nfs_commit_data *data,
 			    struct nfs_commit_info *cinfo);
 int nfs_scan_commit_list(struct list_head *src, struct list_head *dst,
 			 struct nfs_commit_info *cinfo, int max);
+unsigned long nfs_reqs_to_commit(struct nfs_commit_info *);
 int nfs_scan_commit(struct inode *inode, struct list_head *dst,
 		    struct nfs_commit_info *cinfo);
 void nfs_mark_request_commit(struct nfs_page *req,
@@ -492,7 +483,7 @@ static inline void nfs_inode_dio_wait(struct inode *inode)
 extern ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq);
 
 /* nfs4proc.c */
-extern void __nfs4_read_done_cb(struct nfs_read_data *);
+extern void __nfs4_read_done_cb(struct nfs_pgio_data *);
 extern struct nfs_client *nfs4_init_client(struct nfs_client *clp,
 			    const struct rpc_timeout *timeparms,
 			    const char *ip_addr);
diff --git a/fs/nfs/nfs2xdr.c b/fs/nfs/nfs2xdr.c
index 62db136339ea..5f61b83f4a1c 100644
--- a/fs/nfs/nfs2xdr.c
+++ b/fs/nfs/nfs2xdr.c
@@ -103,7 +103,7 @@ static void print_overflow_msg(const char *func, const struct xdr_stream *xdr)
 /*
  *	typedef opaque	nfsdata<>;
  */
-static int decode_nfsdata(struct xdr_stream *xdr, struct nfs_readres *result)
+static int decode_nfsdata(struct xdr_stream *xdr, struct nfs_pgio_res *result)
 {
 	u32 recvd, count;
 	__be32 *p;
@@ -613,7 +613,7 @@ static void nfs2_xdr_enc_readlinkargs(struct rpc_rqst *req,
  *	};
  */
 static void encode_readargs(struct xdr_stream *xdr,
-			    const struct nfs_readargs *args)
+			    const struct nfs_pgio_args *args)
 {
 	u32 offset = args->offset;
 	u32 count = args->count;
@@ -629,7 +629,7 @@ static void encode_readargs(struct xdr_stream *xdr,
 
 static void nfs2_xdr_enc_readargs(struct rpc_rqst *req,
 				  struct xdr_stream *xdr,
-				  const struct nfs_readargs *args)
+				  const struct nfs_pgio_args *args)
 {
 	encode_readargs(xdr, args);
 	prepare_reply_buffer(req, args->pages, args->pgbase,
@@ -649,7 +649,7 @@ static void nfs2_xdr_enc_readargs(struct rpc_rqst *req,
  *	};
  */
 static void encode_writeargs(struct xdr_stream *xdr,
-			     const struct nfs_writeargs *args)
+			     const struct nfs_pgio_args *args)
 {
 	u32 offset = args->offset;
 	u32 count = args->count;
@@ -669,7 +669,7 @@ static void encode_writeargs(struct xdr_stream *xdr,
 
 static void nfs2_xdr_enc_writeargs(struct rpc_rqst *req,
 				   struct xdr_stream *xdr,
-				   const struct nfs_writeargs *args)
+				   const struct nfs_pgio_args *args)
 {
 	encode_writeargs(xdr, args);
 	xdr->buf->flags |= XDRBUF_WRITE;
@@ -857,7 +857,7 @@ out_default:
  *	};
  */
 static int nfs2_xdr_dec_readres(struct rpc_rqst *req, struct xdr_stream *xdr,
-				struct nfs_readres *result)
+				struct nfs_pgio_res *result)
 {
 	enum nfs_stat status;
 	int error;
@@ -878,7 +878,7 @@ out_default:
 }
 
 static int nfs2_xdr_dec_writeres(struct rpc_rqst *req, struct xdr_stream *xdr,
-				 struct nfs_writeres *result)
+				 struct nfs_pgio_res *result)
 {
 	/* All NFSv2 writes are "file sync" writes */
 	result->verf->committed = NFS_FILE_SYNC;
diff --git a/fs/nfs/nfs3proc.c b/fs/nfs/nfs3proc.c
index db60149c4579..e7daa42bbc86 100644
--- a/fs/nfs/nfs3proc.c
+++ b/fs/nfs/nfs3proc.c
@@ -795,7 +795,7 @@ nfs3_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle,
 	return status;
 }
 
-static int nfs3_read_done(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs3_read_done(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	struct inode *inode = data->header->inode;
 
@@ -807,18 +807,18 @@ static int nfs3_read_done(struct rpc_task *task, struct nfs_read_data *data)
 	return 0;
 }
 
-static void nfs3_proc_read_setup(struct nfs_read_data *data, struct rpc_message *msg)
+static void nfs3_proc_read_setup(struct nfs_pgio_data *data, struct rpc_message *msg)
 {
 	msg->rpc_proc = &nfs3_procedures[NFS3PROC_READ];
 }
 
-static int nfs3_proc_read_rpc_prepare(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs3_proc_pgio_rpc_prepare(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	rpc_call_start(task);
 	return 0;
 }
 
-static int nfs3_write_done(struct rpc_task *task, struct nfs_write_data *data)
+static int nfs3_write_done(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	struct inode *inode = data->header->inode;
 
@@ -829,17 +829,11 @@ static int nfs3_write_done(struct rpc_task *task, struct nfs_write_data *data)
 	return 0;
 }
 
-static void nfs3_proc_write_setup(struct nfs_write_data *data, struct rpc_message *msg)
+static void nfs3_proc_write_setup(struct nfs_pgio_data *data, struct rpc_message *msg)
 {
 	msg->rpc_proc = &nfs3_procedures[NFS3PROC_WRITE];
 }
 
-static int nfs3_proc_write_rpc_prepare(struct rpc_task *task, struct nfs_write_data *data)
-{
-	rpc_call_start(task);
-	return 0;
-}
-
 static void nfs3_proc_commit_rpc_prepare(struct rpc_task *task, struct nfs_commit_data *data)
 {
 	rpc_call_start(task);
@@ -946,13 +940,10 @@ const struct nfs_rpc_ops nfs_v3_clientops = {
 	.fsinfo		= nfs3_proc_fsinfo,
 	.pathconf	= nfs3_proc_pathconf,
 	.decode_dirent	= nfs3_decode_dirent,
+	.pgio_rpc_prepare = nfs3_proc_pgio_rpc_prepare,
 	.read_setup	= nfs3_proc_read_setup,
-	.read_pageio_init = nfs_pageio_init_read,
-	.read_rpc_prepare = nfs3_proc_read_rpc_prepare,
 	.read_done	= nfs3_read_done,
 	.write_setup	= nfs3_proc_write_setup,
-	.write_pageio_init = nfs_pageio_init_write,
-	.write_rpc_prepare = nfs3_proc_write_rpc_prepare,
 	.write_done	= nfs3_write_done,
 	.commit_setup	= nfs3_proc_commit_setup,
 	.commit_rpc_prepare = nfs3_proc_commit_rpc_prepare,
diff --git a/fs/nfs/nfs3xdr.c b/fs/nfs/nfs3xdr.c
index fa6d72131c19..8f4cbe7f4aa8 100644
--- a/fs/nfs/nfs3xdr.c
+++ b/fs/nfs/nfs3xdr.c
@@ -953,7 +953,7 @@ static void nfs3_xdr_enc_readlink3args(struct rpc_rqst *req,
  *	};
  */
 static void encode_read3args(struct xdr_stream *xdr,
-			     const struct nfs_readargs *args)
+			     const struct nfs_pgio_args *args)
 {
 	__be32 *p;
 
@@ -966,7 +966,7 @@ static void encode_read3args(struct xdr_stream *xdr,
 
 static void nfs3_xdr_enc_read3args(struct rpc_rqst *req,
 				   struct xdr_stream *xdr,
-				   const struct nfs_readargs *args)
+				   const struct nfs_pgio_args *args)
 {
 	encode_read3args(xdr, args);
 	prepare_reply_buffer(req, args->pages, args->pgbase,
@@ -992,7 +992,7 @@ static void nfs3_xdr_enc_read3args(struct rpc_rqst *req,
  *	};
  */
 static void encode_write3args(struct xdr_stream *xdr,
-			      const struct nfs_writeargs *args)
+			      const struct nfs_pgio_args *args)
 {
 	__be32 *p;
 
@@ -1008,7 +1008,7 @@ static void encode_write3args(struct xdr_stream *xdr,
 
 static void nfs3_xdr_enc_write3args(struct rpc_rqst *req,
 				    struct xdr_stream *xdr,
-				    const struct nfs_writeargs *args)
+				    const struct nfs_pgio_args *args)
 {
 	encode_write3args(xdr, args);
 	xdr->buf->flags |= XDRBUF_WRITE;
@@ -1589,7 +1589,7 @@ out_default:
  *	};
  */
 static int decode_read3resok(struct xdr_stream *xdr,
-			     struct nfs_readres *result)
+			     struct nfs_pgio_res *result)
 {
 	u32 eof, count, ocount, recvd;
 	__be32 *p;
@@ -1625,7 +1625,7 @@ out_overflow:
 }
 
 static int nfs3_xdr_dec_read3res(struct rpc_rqst *req, struct xdr_stream *xdr,
-				 struct nfs_readres *result)
+				 struct nfs_pgio_res *result)
 {
 	enum nfs_stat status;
 	int error;
@@ -1673,7 +1673,7 @@ out_status:
  *	};
  */
 static int decode_write3resok(struct xdr_stream *xdr,
-			      struct nfs_writeres *result)
+			      struct nfs_pgio_res *result)
 {
 	__be32 *p;
 
@@ -1697,7 +1697,7 @@ out_eio:
 }
 
 static int nfs3_xdr_dec_write3res(struct rpc_rqst *req, struct xdr_stream *xdr,
-				  struct nfs_writeres *result)
+				  struct nfs_pgio_res *result)
 {
 	enum nfs_stat status;
 	int error;
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index e1d1badbe53c..f63cb87cd730 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -337,7 +337,7 @@ nfs4_state_protect(struct nfs_client *clp, unsigned long sp4_mode,
  */
 static inline void
 nfs4_state_protect_write(struct nfs_client *clp, struct rpc_clnt **clntp,
-			 struct rpc_message *msg, struct nfs_write_data *wdata)
+			 struct rpc_message *msg, struct nfs_pgio_data *wdata)
 {
 	if (_nfs4_state_protect(clp, NFS_SP4_MACH_CRED_WRITE, clntp, msg) &&
 	    !test_bit(NFS_SP4_MACH_CRED_COMMIT, &clp->cl_sp4_flags))
@@ -369,7 +369,7 @@ nfs4_state_protect(struct nfs_client *clp, unsigned long sp4_flags,
 
 static inline void
 nfs4_state_protect_write(struct nfs_client *clp, struct rpc_clnt **clntp,
-			 struct rpc_message *msg, struct nfs_write_data *wdata)
+			 struct rpc_message *msg, struct nfs_pgio_data *wdata)
 {
 }
 #endif /* CONFIG_NFS_V4_1 */
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index 8de3407e0360..464db9dd6318 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -100,8 +100,7 @@ nfs4_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 			break;
 		mutex_lock(&inode->i_mutex);
 		ret = nfs_file_fsync_commit(file, start, end, datasync);
-		if (!ret && !datasync)
-			/* application has asked for meta-data sync */
+		if (!ret)
 			ret = pnfs_layoutcommit_inode(inode, true);
 		mutex_unlock(&inode->i_mutex);
 		/*
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 7f55fed8dc64..285ad5334018 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -2027,7 +2027,7 @@ static int _nfs4_proc_open(struct nfs4_opendata *data)
 			return status;
 	}
 	if (!(o_res->f_attr->valid & NFS_ATTR_FATTR))
-		_nfs4_proc_getattr(server, &o_res->fh, o_res->f_attr, o_res->f_label);
+		nfs4_proc_getattr(server, &o_res->fh, o_res->f_attr, o_res->f_label);
 	return 0;
 }
 
@@ -4033,12 +4033,12 @@ static bool nfs4_error_stateid_expired(int err)
 	return false;
 }
 
-void __nfs4_read_done_cb(struct nfs_read_data *data)
+void __nfs4_read_done_cb(struct nfs_pgio_data *data)
 {
 	nfs_invalidate_atime(data->header->inode);
 }
 
-static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	struct nfs_server *server = NFS_SERVER(data->header->inode);
 
@@ -4055,7 +4055,7 @@ static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_read_data *data)
 }
 
 static bool nfs4_read_stateid_changed(struct rpc_task *task,
-		struct nfs_readargs *args)
+		struct nfs_pgio_args *args)
 {
 
 	if (!nfs4_error_stateid_expired(task->tk_status) ||
@@ -4068,7 +4068,7 @@ static bool nfs4_read_stateid_changed(struct rpc_task *task,
 	return true;
 }
 
-static int nfs4_read_done(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs4_read_done(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 
 	dprintk("--> %s\n", __func__);
@@ -4077,19 +4077,19 @@ static int nfs4_read_done(struct rpc_task *task, struct nfs_read_data *data)
 		return -EAGAIN;
 	if (nfs4_read_stateid_changed(task, &data->args))
 		return -EAGAIN;
-	return data->read_done_cb ? data->read_done_cb(task, data) :
+	return data->pgio_done_cb ? data->pgio_done_cb(task, data) :
 				    nfs4_read_done_cb(task, data);
 }
 
-static void nfs4_proc_read_setup(struct nfs_read_data *data, struct rpc_message *msg)
+static void nfs4_proc_read_setup(struct nfs_pgio_data *data, struct rpc_message *msg)
 {
 	data->timestamp   = jiffies;
-	data->read_done_cb = nfs4_read_done_cb;
+	data->pgio_done_cb = nfs4_read_done_cb;
 	msg->rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_READ];
 	nfs4_init_sequence(&data->args.seq_args, &data->res.seq_res, 0);
 }
 
-static int nfs4_proc_read_rpc_prepare(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs4_proc_pgio_rpc_prepare(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	if (nfs4_setup_sequence(NFS_SERVER(data->header->inode),
 			&data->args.seq_args,
@@ -4097,14 +4097,14 @@ static int nfs4_proc_read_rpc_prepare(struct rpc_task *task, struct nfs_read_dat
 			task))
 		return 0;
 	if (nfs4_set_rw_stateid(&data->args.stateid, data->args.context,
-				data->args.lock_context, FMODE_READ) == -EIO)
+				data->args.lock_context, data->header->rw_ops->rw_mode) == -EIO)
 		return -EIO;
 	if (unlikely(test_bit(NFS_CONTEXT_BAD, &data->args.context->flags)))
 		return -EIO;
 	return 0;
 }
 
-static int nfs4_write_done_cb(struct rpc_task *task, struct nfs_write_data *data)
+static int nfs4_write_done_cb(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	struct inode *inode = data->header->inode;
 	
@@ -4121,7 +4121,7 @@ static int nfs4_write_done_cb(struct rpc_task *task, struct nfs_write_data *data
 }
 
 static bool nfs4_write_stateid_changed(struct rpc_task *task,
-		struct nfs_writeargs *args)
+		struct nfs_pgio_args *args)
 {
 
 	if (!nfs4_error_stateid_expired(task->tk_status) ||
@@ -4134,18 +4134,18 @@ static bool nfs4_write_stateid_changed(struct rpc_task *task,
 	return true;
 }
 
-static int nfs4_write_done(struct rpc_task *task, struct nfs_write_data *data)
+static int nfs4_write_done(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	if (!nfs4_sequence_done(task, &data->res.seq_res))
 		return -EAGAIN;
 	if (nfs4_write_stateid_changed(task, &data->args))
 		return -EAGAIN;
-	return data->write_done_cb ? data->write_done_cb(task, data) :
+	return data->pgio_done_cb ? data->pgio_done_cb(task, data) :
 		nfs4_write_done_cb(task, data);
 }
 
 static
-bool nfs4_write_need_cache_consistency_data(const struct nfs_write_data *data)
+bool nfs4_write_need_cache_consistency_data(const struct nfs_pgio_data *data)
 {
 	const struct nfs_pgio_header *hdr = data->header;
 
@@ -4158,7 +4158,7 @@ bool nfs4_write_need_cache_consistency_data(const struct nfs_write_data *data)
 	return nfs4_have_delegation(hdr->inode, FMODE_READ) == 0;
 }
 
-static void nfs4_proc_write_setup(struct nfs_write_data *data, struct rpc_message *msg)
+static void nfs4_proc_write_setup(struct nfs_pgio_data *data, struct rpc_message *msg)
 {
 	struct nfs_server *server = NFS_SERVER(data->header->inode);
 
@@ -4168,8 +4168,8 @@ static void nfs4_proc_write_setup(struct nfs_write_data *data, struct rpc_messag
 	} else
 		data->args.bitmask = server->cache_consistency_bitmask;
 
-	if (!data->write_done_cb)
-		data->write_done_cb = nfs4_write_done_cb;
+	if (!data->pgio_done_cb)
+		data->pgio_done_cb = nfs4_write_done_cb;
 	data->res.server = server;
 	data->timestamp   = jiffies;
 
@@ -4177,21 +4177,6 @@ static void nfs4_proc_write_setup(struct nfs_write_data *data, struct rpc_messag
 	nfs4_init_sequence(&data->args.seq_args, &data->res.seq_res, 1);
 }
 
-static int nfs4_proc_write_rpc_prepare(struct rpc_task *task, struct nfs_write_data *data)
-{
-	if (nfs4_setup_sequence(NFS_SERVER(data->header->inode),
-			&data->args.seq_args,
-			&data->res.seq_res,
-			task))
-		return 0;
-	if (nfs4_set_rw_stateid(&data->args.stateid, data->args.context,
-				data->args.lock_context, FMODE_WRITE) == -EIO)
-		return -EIO;
-	if (unlikely(test_bit(NFS_CONTEXT_BAD, &data->args.context->flags)))
-		return -EIO;
-	return 0;
-}
-
 static void nfs4_proc_commit_rpc_prepare(struct rpc_task *task, struct nfs_commit_data *data)
 {
 	nfs4_setup_sequence(NFS_SERVER(data->inode),
@@ -8432,13 +8417,10 @@ const struct nfs_rpc_ops nfs_v4_clientops = {
 	.pathconf	= nfs4_proc_pathconf,
 	.set_capabilities = nfs4_server_capabilities,
 	.decode_dirent	= nfs4_decode_dirent,
+	.pgio_rpc_prepare = nfs4_proc_pgio_rpc_prepare,
 	.read_setup	= nfs4_proc_read_setup,
-	.read_pageio_init = pnfs_pageio_init_read,
-	.read_rpc_prepare = nfs4_proc_read_rpc_prepare,
 	.read_done	= nfs4_read_done,
 	.write_setup	= nfs4_proc_write_setup,
-	.write_pageio_init = pnfs_pageio_init_write,
-	.write_rpc_prepare = nfs4_proc_write_rpc_prepare,
 	.write_done	= nfs4_write_done,
 	.commit_setup	= nfs4_proc_commit_setup,
 	.commit_rpc_prepare = nfs4_proc_commit_rpc_prepare,
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index c0583b9bef71..848f6853c59e 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -1456,7 +1456,7 @@ static int nfs4_reclaim_open_state(struct nfs4_state_owner *sp, const struct nfs
 	 * server that doesn't support a grace period.
 	 */
 	spin_lock(&sp->so_lock);
-	write_seqcount_begin(&sp->so_reclaim_seqcount);
+	raw_write_seqcount_begin(&sp->so_reclaim_seqcount);
 restart:
 	list_for_each_entry(state, &sp->so_states, open_states) {
 		if (!test_and_clear_bit(ops->state_flag_bit, &state->flags))
@@ -1519,13 +1519,13 @@ restart:
 		spin_lock(&sp->so_lock);
 		goto restart;
 	}
-	write_seqcount_end(&sp->so_reclaim_seqcount);
+	raw_write_seqcount_end(&sp->so_reclaim_seqcount);
 	spin_unlock(&sp->so_lock);
 	return 0;
 out_err:
 	nfs4_put_open_state(state);
 	spin_lock(&sp->so_lock);
-	write_seqcount_end(&sp->so_reclaim_seqcount);
+	raw_write_seqcount_end(&sp->so_reclaim_seqcount);
 	spin_unlock(&sp->so_lock);
 	return status;
 }
diff --git a/fs/nfs/nfs4trace.h b/fs/nfs/nfs4trace.h
index 849cf146db30..0a744f3a86f6 100644
--- a/fs/nfs/nfs4trace.h
+++ b/fs/nfs/nfs4trace.h
@@ -932,7 +932,7 @@ DEFINE_NFS4_IDMAP_EVENT(nfs4_map_gid_to_group);
 
 DECLARE_EVENT_CLASS(nfs4_read_event,
 		TP_PROTO(
-			const struct nfs_read_data *data,
+			const struct nfs_pgio_data *data,
 			int error
 		),
 
@@ -972,7 +972,7 @@ DECLARE_EVENT_CLASS(nfs4_read_event,
 #define DEFINE_NFS4_READ_EVENT(name) \
 	DEFINE_EVENT(nfs4_read_event, name, \
 			TP_PROTO( \
-				const struct nfs_read_data *data, \
+				const struct nfs_pgio_data *data, \
 				int error \
 			), \
 			TP_ARGS(data, error))
@@ -983,7 +983,7 @@ DEFINE_NFS4_READ_EVENT(nfs4_pnfs_read);
 
 DECLARE_EVENT_CLASS(nfs4_write_event,
 		TP_PROTO(
-			const struct nfs_write_data *data,
+			const struct nfs_pgio_data *data,
 			int error
 		),
 
@@ -1024,7 +1024,7 @@ DECLARE_EVENT_CLASS(nfs4_write_event,
 #define DEFINE_NFS4_WRITE_EVENT(name) \
 	DEFINE_EVENT(nfs4_write_event, name, \
 			TP_PROTO( \
-				const struct nfs_write_data *data, \
+				const struct nfs_pgio_data *data, \
 				int error \
 			), \
 			TP_ARGS(data, error))
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 73ce8d4fe2c8..939ae606cfa4 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -1556,7 +1556,8 @@ static void encode_putrootfh(struct xdr_stream *xdr, struct compound_hdr *hdr)
 	encode_op_hdr(xdr, OP_PUTROOTFH, decode_putrootfh_maxsz, hdr);
 }
 
-static void encode_read(struct xdr_stream *xdr, const struct nfs_readargs *args, struct compound_hdr *hdr)
+static void encode_read(struct xdr_stream *xdr, const struct nfs_pgio_args *args,
+			struct compound_hdr *hdr)
 {
 	__be32 *p;
 
@@ -1701,7 +1702,8 @@ static void encode_setclientid_confirm(struct xdr_stream *xdr, const struct nfs4
 	encode_nfs4_verifier(xdr, &arg->confirm);
 }
 
-static void encode_write(struct xdr_stream *xdr, const struct nfs_writeargs *args, struct compound_hdr *hdr)
+static void encode_write(struct xdr_stream *xdr, const struct nfs_pgio_args *args,
+			 struct compound_hdr *hdr)
 {
 	__be32 *p;
 
@@ -2451,7 +2453,7 @@ static void nfs4_xdr_enc_readdir(struct rpc_rqst *req, struct xdr_stream *xdr,
  * Encode a READ request
  */
 static void nfs4_xdr_enc_read(struct rpc_rqst *req, struct xdr_stream *xdr,
-			      struct nfs_readargs *args)
+			      struct nfs_pgio_args *args)
 {
 	struct compound_hdr hdr = {
 		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
@@ -2513,7 +2515,7 @@ static void nfs4_xdr_enc_getacl(struct rpc_rqst *req, struct xdr_stream *xdr,
  * Encode a WRITE request
  */
 static void nfs4_xdr_enc_write(struct rpc_rqst *req, struct xdr_stream *xdr,
-			       struct nfs_writeargs *args)
+			       struct nfs_pgio_args *args)
 {
 	struct compound_hdr hdr = {
 		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
@@ -5085,7 +5087,8 @@ static int decode_putrootfh(struct xdr_stream *xdr)
 	return decode_op_hdr(xdr, OP_PUTROOTFH);
 }
 
-static int decode_read(struct xdr_stream *xdr, struct rpc_rqst *req, struct nfs_readres *res)
+static int decode_read(struct xdr_stream *xdr, struct rpc_rqst *req,
+		       struct nfs_pgio_res *res)
 {
 	__be32 *p;
 	uint32_t count, eof, recvd;
@@ -5339,7 +5342,7 @@ static int decode_setclientid_confirm(struct xdr_stream *xdr)
 	return decode_op_hdr(xdr, OP_SETCLIENTID_CONFIRM);
 }
 
-static int decode_write(struct xdr_stream *xdr, struct nfs_writeres *res)
+static int decode_write(struct xdr_stream *xdr, struct nfs_pgio_res *res)
 {
 	__be32 *p;
 	int status;
@@ -6636,7 +6639,7 @@ out:
  * Decode Read response
  */
 static int nfs4_xdr_dec_read(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
-			     struct nfs_readres *res)
+			     struct nfs_pgio_res *res)
 {
 	struct compound_hdr hdr;
 	int status;
@@ -6661,7 +6664,7 @@ out:
  * Decode WRITE response
  */
 static int nfs4_xdr_dec_write(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
-			      struct nfs_writeres *res)
+			      struct nfs_pgio_res *res)
 {
 	struct compound_hdr hdr;
 	int status;
diff --git a/fs/nfs/objlayout/objio_osd.c b/fs/nfs/objlayout/objio_osd.c
index 5457745dd4f1..611320753db2 100644
--- a/fs/nfs/objlayout/objio_osd.c
+++ b/fs/nfs/objlayout/objio_osd.c
@@ -439,7 +439,7 @@ static void _read_done(struct ore_io_state *ios, void *private)
 	objlayout_read_done(&objios->oir, status, objios->sync);
 }
 
-int objio_read_pagelist(struct nfs_read_data *rdata)
+int objio_read_pagelist(struct nfs_pgio_data *rdata)
 {
 	struct nfs_pgio_header *hdr = rdata->header;
 	struct objio_state *objios;
@@ -487,7 +487,7 @@ static void _write_done(struct ore_io_state *ios, void *private)
 static struct page *__r4w_get_page(void *priv, u64 offset, bool *uptodate)
 {
 	struct objio_state *objios = priv;
-	struct nfs_write_data *wdata = objios->oir.rpcdata;
+	struct nfs_pgio_data *wdata = objios->oir.rpcdata;
 	struct address_space *mapping = wdata->header->inode->i_mapping;
 	pgoff_t index = offset / PAGE_SIZE;
 	struct page *page;
@@ -531,7 +531,7 @@ static const struct _ore_r4w_op _r4w_op = {
 	.put_page = &__r4w_put_page,
 };
 
-int objio_write_pagelist(struct nfs_write_data *wdata, int how)
+int objio_write_pagelist(struct nfs_pgio_data *wdata, int how)
 {
 	struct nfs_pgio_header *hdr = wdata->header;
 	struct objio_state *objios;
@@ -564,14 +564,22 @@ int objio_write_pagelist(struct nfs_write_data *wdata, int how)
 	return 0;
 }
 
-static bool objio_pg_test(struct nfs_pageio_descriptor *pgio,
+/*
+ * Return 0 if @req cannot be coalesced into @pgio, otherwise return the number
+ * of bytes (maximum @req->wb_bytes) that can be coalesced.
+ */
+static size_t objio_pg_test(struct nfs_pageio_descriptor *pgio,
 			  struct nfs_page *prev, struct nfs_page *req)
 {
-	if (!pnfs_generic_pg_test(pgio, prev, req))
-		return false;
+	unsigned int size;
+
+	size = pnfs_generic_pg_test(pgio, prev, req);
+
+	if (!size || pgio->pg_count + req->wb_bytes >
+	    (unsigned long)pgio->pg_layout_private)
+		return 0;
 
-	return pgio->pg_count + req->wb_bytes <=
-			(unsigned long)pgio->pg_layout_private;
+	return min(size, req->wb_bytes);
 }
 
 static void objio_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
diff --git a/fs/nfs/objlayout/objlayout.c b/fs/nfs/objlayout/objlayout.c
index e4f9cbfec67b..765d3f54e986 100644
--- a/fs/nfs/objlayout/objlayout.c
+++ b/fs/nfs/objlayout/objlayout.c
@@ -53,10 +53,10 @@ objlayout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags)
 	struct objlayout *objlay;
 
 	objlay = kzalloc(sizeof(struct objlayout), gfp_flags);
-	if (objlay) {
-		spin_lock_init(&objlay->lock);
-		INIT_LIST_HEAD(&objlay->err_list);
-	}
+	if (!objlay)
+		return NULL;
+	spin_lock_init(&objlay->lock);
+	INIT_LIST_HEAD(&objlay->err_list);
 	dprintk("%s: Return %p\n", __func__, objlay);
 	return &objlay->pnfs_layout;
 }
@@ -229,11 +229,11 @@ objlayout_io_set_result(struct objlayout_io_res *oir, unsigned index,
 static void _rpc_read_complete(struct work_struct *work)
 {
 	struct rpc_task *task;
-	struct nfs_read_data *rdata;
+	struct nfs_pgio_data *rdata;
 
 	dprintk("%s enter\n", __func__);
 	task = container_of(work, struct rpc_task, u.tk_work);
-	rdata = container_of(task, struct nfs_read_data, task);
+	rdata = container_of(task, struct nfs_pgio_data, task);
 
 	pnfs_ld_read_done(rdata);
 }
@@ -241,7 +241,7 @@ static void _rpc_read_complete(struct work_struct *work)
 void
 objlayout_read_done(struct objlayout_io_res *oir, ssize_t status, bool sync)
 {
-	struct nfs_read_data *rdata = oir->rpcdata;
+	struct nfs_pgio_data *rdata = oir->rpcdata;
 
 	oir->status = rdata->task.tk_status = status;
 	if (status >= 0)
@@ -266,7 +266,7 @@ objlayout_read_done(struct objlayout_io_res *oir, ssize_t status, bool sync)
  * Perform sync or async reads.
  */
 enum pnfs_try_status
-objlayout_read_pagelist(struct nfs_read_data *rdata)
+objlayout_read_pagelist(struct nfs_pgio_data *rdata)
 {
 	struct nfs_pgio_header *hdr = rdata->header;
 	struct inode *inode = hdr->inode;
@@ -312,11 +312,11 @@ objlayout_read_pagelist(struct nfs_read_data *rdata)
 static void _rpc_write_complete(struct work_struct *work)
 {
 	struct rpc_task *task;
-	struct nfs_write_data *wdata;
+	struct nfs_pgio_data *wdata;
 
 	dprintk("%s enter\n", __func__);
 	task = container_of(work, struct rpc_task, u.tk_work);
-	wdata = container_of(task, struct nfs_write_data, task);
+	wdata = container_of(task, struct nfs_pgio_data, task);
 
 	pnfs_ld_write_done(wdata);
 }
@@ -324,7 +324,7 @@ static void _rpc_write_complete(struct work_struct *work)
 void
 objlayout_write_done(struct objlayout_io_res *oir, ssize_t status, bool sync)
 {
-	struct nfs_write_data *wdata = oir->rpcdata;
+	struct nfs_pgio_data *wdata = oir->rpcdata;
 
 	oir->status = wdata->task.tk_status = status;
 	if (status >= 0) {
@@ -351,7 +351,7 @@ objlayout_write_done(struct objlayout_io_res *oir, ssize_t status, bool sync)
  * Perform sync or async writes.
  */
 enum pnfs_try_status
-objlayout_write_pagelist(struct nfs_write_data *wdata,
+objlayout_write_pagelist(struct nfs_pgio_data *wdata,
 			 int how)
 {
 	struct nfs_pgio_header *hdr = wdata->header;
diff --git a/fs/nfs/objlayout/objlayout.h b/fs/nfs/objlayout/objlayout.h
index 87aa1dec6120..01e041029a6c 100644
--- a/fs/nfs/objlayout/objlayout.h
+++ b/fs/nfs/objlayout/objlayout.h
@@ -119,8 +119,8 @@ extern void objio_free_lseg(struct pnfs_layout_segment *lseg);
  */
 extern void objio_free_result(struct objlayout_io_res *oir);
 
-extern int objio_read_pagelist(struct nfs_read_data *rdata);
-extern int objio_write_pagelist(struct nfs_write_data *wdata, int how);
+extern int objio_read_pagelist(struct nfs_pgio_data *rdata);
+extern int objio_write_pagelist(struct nfs_pgio_data *wdata, int how);
 
 /*
  * callback API
@@ -168,10 +168,10 @@ extern struct pnfs_layout_segment *objlayout_alloc_lseg(
 extern void objlayout_free_lseg(struct pnfs_layout_segment *);
 
 extern enum pnfs_try_status objlayout_read_pagelist(
-	struct nfs_read_data *);
+	struct nfs_pgio_data *);
 
 extern enum pnfs_try_status objlayout_write_pagelist(
-	struct nfs_write_data *,
+	struct nfs_pgio_data *,
 	int how);
 
 extern void objlayout_encode_layoutcommit(
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 03ed984ab4d8..b6ee3a6ee96d 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -24,9 +24,14 @@
 #include "internal.h"
 #include "pnfs.h"
 
+#define NFSDBG_FACILITY		NFSDBG_PAGECACHE
+
 static struct kmem_cache *nfs_page_cachep;
+static const struct rpc_call_ops nfs_pgio_common_ops;
+
+static void nfs_free_request(struct nfs_page *);
 
-bool nfs_pgarray_set(struct nfs_page_array *p, unsigned int pagecount)
+static bool nfs_pgarray_set(struct nfs_page_array *p, unsigned int pagecount)
 {
 	p->npages = pagecount;
 	if (pagecount <= ARRAY_SIZE(p->page_array))
@@ -133,11 +138,156 @@ nfs_iocounter_wait(struct nfs_io_counter *c)
 	return __nfs_iocounter_wait(c);
 }
 
+static int nfs_wait_bit_uninterruptible(void *word)
+{
+	io_schedule();
+	return 0;
+}
+
+/*
+ * nfs_page_group_lock - lock the head of the page group
+ * @req - request in group that is to be locked
+ *
+ * this lock must be held if modifying the page group list
+ */
+void
+nfs_page_group_lock(struct nfs_page *req)
+{
+	struct nfs_page *head = req->wb_head;
+
+	WARN_ON_ONCE(head != head->wb_head);
+
+	wait_on_bit_lock(&head->wb_flags, PG_HEADLOCK,
+			nfs_wait_bit_uninterruptible,
+			TASK_UNINTERRUPTIBLE);
+}
+
+/*
+ * nfs_page_group_unlock - unlock the head of the page group
+ * @req - request in group that is to be unlocked
+ */
+void
+nfs_page_group_unlock(struct nfs_page *req)
+{
+	struct nfs_page *head = req->wb_head;
+
+	WARN_ON_ONCE(head != head->wb_head);
+
+	smp_mb__before_atomic();
+	clear_bit(PG_HEADLOCK, &head->wb_flags);
+	smp_mb__after_atomic();
+	wake_up_bit(&head->wb_flags, PG_HEADLOCK);
+}
+
+/*
+ * nfs_page_group_sync_on_bit_locked
+ *
+ * must be called with page group lock held
+ */
+static bool
+nfs_page_group_sync_on_bit_locked(struct nfs_page *req, unsigned int bit)
+{
+	struct nfs_page *head = req->wb_head;
+	struct nfs_page *tmp;
+
+	WARN_ON_ONCE(!test_bit(PG_HEADLOCK, &head->wb_flags));
+	WARN_ON_ONCE(test_and_set_bit(bit, &req->wb_flags));
+
+	tmp = req->wb_this_page;
+	while (tmp != req) {
+		if (!test_bit(bit, &tmp->wb_flags))
+			return false;
+		tmp = tmp->wb_this_page;
+	}
+
+	/* true! reset all bits */
+	tmp = req;
+	do {
+		clear_bit(bit, &tmp->wb_flags);
+		tmp = tmp->wb_this_page;
+	} while (tmp != req);
+
+	return true;
+}
+
+/*
+ * nfs_page_group_sync_on_bit - set bit on current request, but only
+ *   return true if the bit is set for all requests in page group
+ * @req - request in page group
+ * @bit - PG_* bit that is used to sync page group
+ */
+bool nfs_page_group_sync_on_bit(struct nfs_page *req, unsigned int bit)
+{
+	bool ret;
+
+	nfs_page_group_lock(req);
+	ret = nfs_page_group_sync_on_bit_locked(req, bit);
+	nfs_page_group_unlock(req);
+
+	return ret;
+}
+
+/*
+ * nfs_page_group_init - Initialize the page group linkage for @req
+ * @req - a new nfs request
+ * @prev - the previous request in page group, or NULL if @req is the first
+ *         or only request in the group (the head).
+ */
+static inline void
+nfs_page_group_init(struct nfs_page *req, struct nfs_page *prev)
+{
+	WARN_ON_ONCE(prev == req);
+
+	if (!prev) {
+		req->wb_head = req;
+		req->wb_this_page = req;
+	} else {
+		WARN_ON_ONCE(prev->wb_this_page != prev->wb_head);
+		WARN_ON_ONCE(!test_bit(PG_HEADLOCK, &prev->wb_head->wb_flags));
+		req->wb_head = prev->wb_head;
+		req->wb_this_page = prev->wb_this_page;
+		prev->wb_this_page = req;
+
+		/* grab extra ref if head request has extra ref from
+		 * the write/commit path to handle handoff between write
+		 * and commit lists */
+		if (test_bit(PG_INODE_REF, &prev->wb_head->wb_flags))
+			kref_get(&req->wb_kref);
+	}
+}
+
+/*
+ * nfs_page_group_destroy - sync the destruction of page groups
+ * @req - request that no longer needs the page group
+ *
+ * releases the page group reference from each member once all
+ * members have called this function.
+ */
+static void
+nfs_page_group_destroy(struct kref *kref)
+{
+	struct nfs_page *req = container_of(kref, struct nfs_page, wb_kref);
+	struct nfs_page *tmp, *next;
+
+	if (!nfs_page_group_sync_on_bit(req, PG_TEARDOWN))
+		return;
+
+	tmp = req;
+	do {
+		next = tmp->wb_this_page;
+		/* unlink and free */
+		tmp->wb_this_page = tmp;
+		tmp->wb_head = tmp;
+		nfs_free_request(tmp);
+		tmp = next;
+	} while (tmp != req);
+}
+
 /**
  * nfs_create_request - Create an NFS read/write request.
  * @ctx: open context to use
- * @inode: inode to which the request is attached
  * @page: page to write
+ * @last: last nfs request created for this page group or NULL if head
  * @offset: starting offset within the page for the write
  * @count: number of bytes to read/write
  *
@@ -146,9 +296,9 @@ nfs_iocounter_wait(struct nfs_io_counter *c)
  * User should ensure it is safe to sleep in this function.
  */
 struct nfs_page *
-nfs_create_request(struct nfs_open_context *ctx, struct inode *inode,
-		   struct page *page,
-		   unsigned int offset, unsigned int count)
+nfs_create_request(struct nfs_open_context *ctx, struct page *page,
+		   struct nfs_page *last, unsigned int offset,
+		   unsigned int count)
 {
 	struct nfs_page		*req;
 	struct nfs_lock_context *l_ctx;
@@ -180,6 +330,7 @@ nfs_create_request(struct nfs_open_context *ctx, struct inode *inode,
 	req->wb_bytes   = count;
 	req->wb_context = get_nfs_open_context(ctx);
 	kref_init(&req->wb_kref);
+	nfs_page_group_init(req, last);
 	return req;
 }
 
@@ -237,16 +388,22 @@ static void nfs_clear_request(struct nfs_page *req)
 	}
 }
 
-
 /**
  * nfs_release_request - Release the count on an NFS read/write request
  * @req: request to release
  *
  * Note: Should never be called with the spinlock held!
  */
-static void nfs_free_request(struct kref *kref)
+static void nfs_free_request(struct nfs_page *req)
 {
-	struct nfs_page *req = container_of(kref, struct nfs_page, wb_kref);
+	WARN_ON_ONCE(req->wb_this_page != req);
+
+	/* extra debug: make sure no sync bits are still set */
+	WARN_ON_ONCE(test_bit(PG_TEARDOWN, &req->wb_flags));
+	WARN_ON_ONCE(test_bit(PG_UNLOCKPAGE, &req->wb_flags));
+	WARN_ON_ONCE(test_bit(PG_UPTODATE, &req->wb_flags));
+	WARN_ON_ONCE(test_bit(PG_WB_END, &req->wb_flags));
+	WARN_ON_ONCE(test_bit(PG_REMOVE, &req->wb_flags));
 
 	/* Release struct file and open context */
 	nfs_clear_request(req);
@@ -255,13 +412,7 @@ static void nfs_free_request(struct kref *kref)
 
 void nfs_release_request(struct nfs_page *req)
 {
-	kref_put(&req->wb_kref, nfs_free_request);
-}
-
-static int nfs_wait_bit_uninterruptible(void *word)
-{
-	io_schedule();
-	return 0;
+	kref_put(&req->wb_kref, nfs_page_group_destroy);
 }
 
 /**
@@ -279,22 +430,249 @@ nfs_wait_on_request(struct nfs_page *req)
 			TASK_UNINTERRUPTIBLE);
 }
 
-bool nfs_generic_pg_test(struct nfs_pageio_descriptor *desc, struct nfs_page *prev, struct nfs_page *req)
+/*
+ * nfs_generic_pg_test - determine if requests can be coalesced
+ * @desc: pointer to descriptor
+ * @prev: previous request in desc, or NULL
+ * @req: this request
+ *
+ * Returns zero if @req can be coalesced into @desc, otherwise it returns
+ * the size of the request.
+ */
+size_t nfs_generic_pg_test(struct nfs_pageio_descriptor *desc,
+			   struct nfs_page *prev, struct nfs_page *req)
 {
-	/*
-	 * FIXME: ideally we should be able to coalesce all requests
-	 * that are not block boundary aligned, but currently this
-	 * is problematic for the case of bsize < PAGE_CACHE_SIZE,
-	 * since nfs_flush_multi and nfs_pagein_multi assume you
-	 * can have only one struct nfs_page.
-	 */
-	if (desc->pg_bsize < PAGE_SIZE)
+	if (desc->pg_count > desc->pg_bsize) {
+		/* should never happen */
+		WARN_ON_ONCE(1);
 		return 0;
+	}
 
-	return desc->pg_count + req->wb_bytes <= desc->pg_bsize;
+	return min(desc->pg_bsize - desc->pg_count, (size_t)req->wb_bytes);
 }
 EXPORT_SYMBOL_GPL(nfs_generic_pg_test);
 
+static inline struct nfs_rw_header *NFS_RW_HEADER(struct nfs_pgio_header *hdr)
+{
+	return container_of(hdr, struct nfs_rw_header, header);
+}
+
+/**
+ * nfs_rw_header_alloc - Allocate a header for a read or write
+ * @ops: Read or write function vector
+ */
+struct nfs_rw_header *nfs_rw_header_alloc(const struct nfs_rw_ops *ops)
+{
+	struct nfs_rw_header *header = ops->rw_alloc_header();
+
+	if (header) {
+		struct nfs_pgio_header *hdr = &header->header;
+
+		INIT_LIST_HEAD(&hdr->pages);
+		spin_lock_init(&hdr->lock);
+		atomic_set(&hdr->refcnt, 0);
+		hdr->rw_ops = ops;
+	}
+	return header;
+}
+EXPORT_SYMBOL_GPL(nfs_rw_header_alloc);
+
+/*
+ * nfs_rw_header_free - Free a read or write header
+ * @hdr: The header to free
+ */
+void nfs_rw_header_free(struct nfs_pgio_header *hdr)
+{
+	hdr->rw_ops->rw_free_header(NFS_RW_HEADER(hdr));
+}
+EXPORT_SYMBOL_GPL(nfs_rw_header_free);
+
+/**
+ * nfs_pgio_data_alloc - Allocate pageio data
+ * @hdr: The header making a request
+ * @pagecount: Number of pages to create
+ */
+static struct nfs_pgio_data *nfs_pgio_data_alloc(struct nfs_pgio_header *hdr,
+						 unsigned int pagecount)
+{
+	struct nfs_pgio_data *data, *prealloc;
+
+	prealloc = &NFS_RW_HEADER(hdr)->rpc_data;
+	if (prealloc->header == NULL)
+		data = prealloc;
+	else
+		data = kzalloc(sizeof(*data), GFP_KERNEL);
+	if (!data)
+		goto out;
+
+	if (nfs_pgarray_set(&data->pages, pagecount)) {
+		data->header = hdr;
+		atomic_inc(&hdr->refcnt);
+	} else {
+		if (data != prealloc)
+			kfree(data);
+		data = NULL;
+	}
+out:
+	return data;
+}
+
+/**
+ * nfs_pgio_data_release - Properly free pageio data
+ * @data: The data to release
+ */
+void nfs_pgio_data_release(struct nfs_pgio_data *data)
+{
+	struct nfs_pgio_header *hdr = data->header;
+	struct nfs_rw_header *pageio_header = NFS_RW_HEADER(hdr);
+
+	put_nfs_open_context(data->args.context);
+	if (data->pages.pagevec != data->pages.page_array)
+		kfree(data->pages.pagevec);
+	if (data == &pageio_header->rpc_data) {
+		data->header = NULL;
+		data = NULL;
+	}
+	if (atomic_dec_and_test(&hdr->refcnt))
+		hdr->completion_ops->completion(hdr);
+	/* Note: we only free the rpc_task after callbacks are done.
+	 * See the comment in rpc_free_task() for why
+	 */
+	kfree(data);
+}
+EXPORT_SYMBOL_GPL(nfs_pgio_data_release);
+
+/**
+ * nfs_pgio_rpcsetup - Set up arguments for a pageio call
+ * @data: The pageio data
+ * @count: Number of bytes to read
+ * @offset: Initial offset
+ * @how: How to commit data (writes only)
+ * @cinfo: Commit information for the call (writes only)
+ */
+static void nfs_pgio_rpcsetup(struct nfs_pgio_data *data,
+			      unsigned int count, unsigned int offset,
+			      int how, struct nfs_commit_info *cinfo)
+{
+	struct nfs_page *req = data->header->req;
+
+	/* Set up the RPC argument and reply structs
+	 * NB: take care not to mess about with data->commit et al. */
+
+	data->args.fh     = NFS_FH(data->header->inode);
+	data->args.offset = req_offset(req) + offset;
+	/* pnfs_set_layoutcommit needs this */
+	data->mds_offset = data->args.offset;
+	data->args.pgbase = req->wb_pgbase + offset;
+	data->args.pages  = data->pages.pagevec;
+	data->args.count  = count;
+	data->args.context = get_nfs_open_context(req->wb_context);
+	data->args.lock_context = req->wb_lock_context;
+	data->args.stable  = NFS_UNSTABLE;
+	switch (how & (FLUSH_STABLE | FLUSH_COND_STABLE)) {
+	case 0:
+		break;
+	case FLUSH_COND_STABLE:
+		if (nfs_reqs_to_commit(cinfo))
+			break;
+	default:
+		data->args.stable = NFS_FILE_SYNC;
+	}
+
+	data->res.fattr   = &data->fattr;
+	data->res.count   = count;
+	data->res.eof     = 0;
+	data->res.verf    = &data->verf;
+	nfs_fattr_init(&data->fattr);
+}
+
+/**
+ * nfs_pgio_prepare - Prepare pageio data to go over the wire
+ * @task: The current task
+ * @calldata: pageio data to prepare
+ */
+static void nfs_pgio_prepare(struct rpc_task *task, void *calldata)
+{
+	struct nfs_pgio_data *data = calldata;
+	int err;
+	err = NFS_PROTO(data->header->inode)->pgio_rpc_prepare(task, data);
+	if (err)
+		rpc_exit(task, err);
+}
+
+int nfs_initiate_pgio(struct rpc_clnt *clnt, struct nfs_pgio_data *data,
+		      const struct rpc_call_ops *call_ops, int how, int flags)
+{
+	struct rpc_task *task;
+	struct rpc_message msg = {
+		.rpc_argp = &data->args,
+		.rpc_resp = &data->res,
+		.rpc_cred = data->header->cred,
+	};
+	struct rpc_task_setup task_setup_data = {
+		.rpc_client = clnt,
+		.task = &data->task,
+		.rpc_message = &msg,
+		.callback_ops = call_ops,
+		.callback_data = data,
+		.workqueue = nfsiod_workqueue,
+		.flags = RPC_TASK_ASYNC | flags,
+	};
+	int ret = 0;
+
+	data->header->rw_ops->rw_initiate(data, &msg, &task_setup_data, how);
+
+	dprintk("NFS: %5u initiated pgio call "
+		"(req %s/%llu, %u bytes @ offset %llu)\n",
+		data->task.tk_pid,
+		data->header->inode->i_sb->s_id,
+		(unsigned long long)NFS_FILEID(data->header->inode),
+		data->args.count,
+		(unsigned long long)data->args.offset);
+
+	task = rpc_run_task(&task_setup_data);
+	if (IS_ERR(task)) {
+		ret = PTR_ERR(task);
+		goto out;
+	}
+	if (how & FLUSH_SYNC) {
+		ret = rpc_wait_for_completion_task(task);
+		if (ret == 0)
+			ret = task->tk_status;
+	}
+	rpc_put_task(task);
+out:
+	return ret;
+}
+EXPORT_SYMBOL_GPL(nfs_initiate_pgio);
+
+/**
+ * nfs_pgio_error - Clean up from a pageio error
+ * @desc: IO descriptor
+ * @hdr: pageio header
+ */
+static int nfs_pgio_error(struct nfs_pageio_descriptor *desc,
+			  struct nfs_pgio_header *hdr)
+{
+	set_bit(NFS_IOHDR_REDO, &hdr->flags);
+	nfs_pgio_data_release(hdr->data);
+	hdr->data = NULL;
+	desc->pg_completion_ops->error_cleanup(&desc->pg_list);
+	return -ENOMEM;
+}
+
+/**
+ * nfs_pgio_release - Release pageio data
+ * @calldata: The pageio data to release
+ */
+static void nfs_pgio_release(void *calldata)
+{
+	struct nfs_pgio_data *data = calldata;
+	if (data->header->rw_ops->rw_release)
+		data->header->rw_ops->rw_release(data);
+	nfs_pgio_data_release(data);
+}
+
 /**
  * nfs_pageio_init - initialise a page io descriptor
  * @desc: pointer to descriptor
@@ -307,6 +685,7 @@ void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
 		     struct inode *inode,
 		     const struct nfs_pageio_ops *pg_ops,
 		     const struct nfs_pgio_completion_ops *compl_ops,
+		     const struct nfs_rw_ops *rw_ops,
 		     size_t bsize,
 		     int io_flags)
 {
@@ -320,6 +699,7 @@ void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
 	desc->pg_inode = inode;
 	desc->pg_ops = pg_ops;
 	desc->pg_completion_ops = compl_ops;
+	desc->pg_rw_ops = rw_ops;
 	desc->pg_ioflags = io_flags;
 	desc->pg_error = 0;
 	desc->pg_lseg = NULL;
@@ -328,6 +708,94 @@ void nfs_pageio_init(struct nfs_pageio_descriptor *desc,
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_init);
 
+/**
+ * nfs_pgio_result - Basic pageio error handling
+ * @task: The task that ran
+ * @calldata: Pageio data to check
+ */
+static void nfs_pgio_result(struct rpc_task *task, void *calldata)
+{
+	struct nfs_pgio_data *data = calldata;
+	struct inode *inode = data->header->inode;
+
+	dprintk("NFS: %s: %5u, (status %d)\n", __func__,
+		task->tk_pid, task->tk_status);
+
+	if (data->header->rw_ops->rw_done(task, data, inode) != 0)
+		return;
+	if (task->tk_status < 0)
+		nfs_set_pgio_error(data->header, task->tk_status, data->args.offset);
+	else
+		data->header->rw_ops->rw_result(task, data);
+}
+
+/*
+ * Create an RPC task for the given read or write request and kick it.
+ * The page must have been locked by the caller.
+ *
+ * It may happen that the page we're passed is not marked dirty.
+ * This is the case if nfs_updatepage detects a conflicting request
+ * that has been written but not committed.
+ */
+int nfs_generic_pgio(struct nfs_pageio_descriptor *desc,
+		     struct nfs_pgio_header *hdr)
+{
+	struct nfs_page		*req;
+	struct page		**pages;
+	struct nfs_pgio_data	*data;
+	struct list_head *head = &desc->pg_list;
+	struct nfs_commit_info cinfo;
+
+	data = nfs_pgio_data_alloc(hdr, nfs_page_array_len(desc->pg_base,
+							   desc->pg_count));
+	if (!data)
+		return nfs_pgio_error(desc, hdr);
+
+	nfs_init_cinfo(&cinfo, desc->pg_inode, desc->pg_dreq);
+	pages = data->pages.pagevec;
+	while (!list_empty(head)) {
+		req = nfs_list_entry(head->next);
+		nfs_list_remove_request(req);
+		nfs_list_add_request(req, &hdr->pages);
+		*pages++ = req->wb_page;
+	}
+
+	if ((desc->pg_ioflags & FLUSH_COND_STABLE) &&
+	    (desc->pg_moreio || nfs_reqs_to_commit(&cinfo)))
+		desc->pg_ioflags &= ~FLUSH_COND_STABLE;
+
+	/* Set up the argument struct */
+	nfs_pgio_rpcsetup(data, desc->pg_count, 0, desc->pg_ioflags, &cinfo);
+	hdr->data = data;
+	desc->pg_rpc_callops = &nfs_pgio_common_ops;
+	return 0;
+}
+EXPORT_SYMBOL_GPL(nfs_generic_pgio);
+
+static int nfs_generic_pg_pgios(struct nfs_pageio_descriptor *desc)
+{
+	struct nfs_rw_header *rw_hdr;
+	struct nfs_pgio_header *hdr;
+	int ret;
+
+	rw_hdr = nfs_rw_header_alloc(desc->pg_rw_ops);
+	if (!rw_hdr) {
+		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
+		return -ENOMEM;
+	}
+	hdr = &rw_hdr->header;
+	nfs_pgheader_init(desc, hdr, nfs_rw_header_free);
+	atomic_inc(&hdr->refcnt);
+	ret = nfs_generic_pgio(desc, hdr);
+	if (ret == 0)
+		ret = nfs_initiate_pgio(NFS_CLIENT(hdr->inode),
+					hdr->data, desc->pg_rpc_callops,
+					desc->pg_ioflags, 0);
+	if (atomic_dec_and_test(&hdr->refcnt))
+		hdr->completion_ops->completion(hdr);
+	return ret;
+}
+
 static bool nfs_match_open_context(const struct nfs_open_context *ctx1,
 		const struct nfs_open_context *ctx2)
 {
@@ -356,18 +824,23 @@ static bool nfs_can_coalesce_requests(struct nfs_page *prev,
 				      struct nfs_page *req,
 				      struct nfs_pageio_descriptor *pgio)
 {
-	if (!nfs_match_open_context(req->wb_context, prev->wb_context))
-		return false;
-	if (req->wb_context->dentry->d_inode->i_flock != NULL &&
-	    !nfs_match_lock_context(req->wb_lock_context, prev->wb_lock_context))
-		return false;
-	if (req->wb_pgbase != 0)
-		return false;
-	if (prev->wb_pgbase + prev->wb_bytes != PAGE_CACHE_SIZE)
-		return false;
-	if (req_offset(req) != req_offset(prev) + prev->wb_bytes)
-		return false;
-	return pgio->pg_ops->pg_test(pgio, prev, req);
+	size_t size;
+
+	if (prev) {
+		if (!nfs_match_open_context(req->wb_context, prev->wb_context))
+			return false;
+		if (req->wb_context->dentry->d_inode->i_flock != NULL &&
+		    !nfs_match_lock_context(req->wb_lock_context,
+					    prev->wb_lock_context))
+			return false;
+		if (req_offset(req) != req_offset(prev) + prev->wb_bytes)
+			return false;
+	}
+	size = pgio->pg_ops->pg_test(pgio, prev, req);
+	WARN_ON_ONCE(size > req->wb_bytes);
+	if (size && size < req->wb_bytes)
+		req->wb_bytes = size;
+	return size > 0;
 }
 
 /**
@@ -381,17 +854,16 @@ static bool nfs_can_coalesce_requests(struct nfs_page *prev,
 static int nfs_pageio_do_add_request(struct nfs_pageio_descriptor *desc,
 				     struct nfs_page *req)
 {
+	struct nfs_page *prev = NULL;
 	if (desc->pg_count != 0) {
-		struct nfs_page *prev;
-
 		prev = nfs_list_entry(desc->pg_list.prev);
-		if (!nfs_can_coalesce_requests(prev, req, desc))
-			return 0;
 	} else {
 		if (desc->pg_ops->pg_init)
 			desc->pg_ops->pg_init(desc, req);
 		desc->pg_base = req->wb_pgbase;
 	}
+	if (!nfs_can_coalesce_requests(prev, req, desc))
+		return 0;
 	nfs_list_remove_request(req);
 	nfs_list_add_request(req, &desc->pg_list);
 	desc->pg_count += req->wb_bytes;
@@ -421,22 +893,73 @@ static void nfs_pageio_doio(struct nfs_pageio_descriptor *desc)
  * @desc: destination io descriptor
  * @req: request
  *
+ * This may split a request into subrequests which are all part of the
+ * same page group.
+ *
  * Returns true if the request 'req' was successfully coalesced into the
  * existing list of pages 'desc'.
  */
 static int __nfs_pageio_add_request(struct nfs_pageio_descriptor *desc,
 			   struct nfs_page *req)
 {
-	while (!nfs_pageio_do_add_request(desc, req)) {
-		desc->pg_moreio = 1;
-		nfs_pageio_doio(desc);
-		if (desc->pg_error < 0)
-			return 0;
-		desc->pg_moreio = 0;
-		if (desc->pg_recoalesce)
-			return 0;
-	}
+	struct nfs_page *subreq;
+	unsigned int bytes_left = 0;
+	unsigned int offset, pgbase;
+
+	nfs_page_group_lock(req);
+
+	subreq = req;
+	bytes_left = subreq->wb_bytes;
+	offset = subreq->wb_offset;
+	pgbase = subreq->wb_pgbase;
+
+	do {
+		if (!nfs_pageio_do_add_request(desc, subreq)) {
+			/* make sure pg_test call(s) did nothing */
+			WARN_ON_ONCE(subreq->wb_bytes != bytes_left);
+			WARN_ON_ONCE(subreq->wb_offset != offset);
+			WARN_ON_ONCE(subreq->wb_pgbase != pgbase);
+
+			nfs_page_group_unlock(req);
+			desc->pg_moreio = 1;
+			nfs_pageio_doio(desc);
+			if (desc->pg_error < 0)
+				return 0;
+			desc->pg_moreio = 0;
+			if (desc->pg_recoalesce)
+				return 0;
+			/* retry add_request for this subreq */
+			nfs_page_group_lock(req);
+			continue;
+		}
+
+		/* check for buggy pg_test call(s) */
+		WARN_ON_ONCE(subreq->wb_bytes + subreq->wb_pgbase > PAGE_SIZE);
+		WARN_ON_ONCE(subreq->wb_bytes > bytes_left);
+		WARN_ON_ONCE(subreq->wb_bytes == 0);
+
+		bytes_left -= subreq->wb_bytes;
+		offset += subreq->wb_bytes;
+		pgbase += subreq->wb_bytes;
+
+		if (bytes_left) {
+			subreq = nfs_create_request(req->wb_context,
+					req->wb_page,
+					subreq, pgbase, bytes_left);
+			if (IS_ERR(subreq))
+				goto err_ptr;
+			nfs_lock_request(subreq);
+			subreq->wb_offset  = offset;
+			subreq->wb_index = req->wb_index;
+		}
+	} while (bytes_left > 0);
+
+	nfs_page_group_unlock(req);
 	return 1;
+err_ptr:
+	desc->pg_error = PTR_ERR(subreq);
+	nfs_page_group_unlock(req);
+	return 0;
 }
 
 static int nfs_do_recoalesce(struct nfs_pageio_descriptor *desc)
@@ -535,3 +1058,13 @@ void nfs_destroy_nfspagecache(void)
 	kmem_cache_destroy(nfs_page_cachep);
 }
 
+static const struct rpc_call_ops nfs_pgio_common_ops = {
+	.rpc_call_prepare = nfs_pgio_prepare,
+	.rpc_call_done = nfs_pgio_result,
+	.rpc_release = nfs_pgio_release,
+};
+
+const struct nfs_pageio_ops nfs_pgio_rw_ops = {
+	.pg_test = nfs_generic_pg_test,
+	.pg_doio = nfs_generic_pg_pgios,
+};
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index fd9536e494bc..6fdcd233d6f7 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -1388,11 +1388,6 @@ pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *r
 
 	WARN_ON_ONCE(pgio->pg_lseg != NULL);
 
-	if (req->wb_offset != req->wb_pgbase) {
-		nfs_pageio_reset_read_mds(pgio);
-		return;
-	}
-
 	if (pgio->pg_dreq == NULL)
 		rd_size = i_size_read(pgio->pg_inode) - req_offset(req);
 	else
@@ -1417,11 +1412,6 @@ pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio,
 {
 	WARN_ON_ONCE(pgio->pg_lseg != NULL);
 
-	if (req->wb_offset != req->wb_pgbase) {
-		nfs_pageio_reset_write_mds(pgio);
-		return;
-	}
-
 	pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode,
 					   req->wb_context,
 					   req_offset(req),
@@ -1434,56 +1424,49 @@ pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio,
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_pg_init_write);
 
-void
-pnfs_pageio_init_read(struct nfs_pageio_descriptor *pgio, struct inode *inode,
-		      const struct nfs_pgio_completion_ops *compl_ops)
-{
-	struct nfs_server *server = NFS_SERVER(inode);
-	struct pnfs_layoutdriver_type *ld = server->pnfs_curr_ld;
-
-	if (ld == NULL)
-		nfs_pageio_init_read(pgio, inode, compl_ops);
-	else
-		nfs_pageio_init(pgio, inode, ld->pg_read_ops, compl_ops, server->rsize, 0);
-}
-
-void
-pnfs_pageio_init_write(struct nfs_pageio_descriptor *pgio, struct inode *inode,
-		       int ioflags,
-		       const struct nfs_pgio_completion_ops *compl_ops)
-{
-	struct nfs_server *server = NFS_SERVER(inode);
-	struct pnfs_layoutdriver_type *ld = server->pnfs_curr_ld;
-
-	if (ld == NULL)
-		nfs_pageio_init_write(pgio, inode, ioflags, compl_ops);
-	else
-		nfs_pageio_init(pgio, inode, ld->pg_write_ops, compl_ops, server->wsize, ioflags);
-}
-
-bool
+/*
+ * Return 0 if @req cannot be coalesced into @pgio, otherwise return the number
+ * of bytes (maximum @req->wb_bytes) that can be coalesced.
+ */
+size_t
 pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
 		     struct nfs_page *req)
 {
-	if (pgio->pg_lseg == NULL)
-		return nfs_generic_pg_test(pgio, prev, req);
+	unsigned int size;
+	u64 seg_end, req_start, seg_left;
+
+	size = nfs_generic_pg_test(pgio, prev, req);
+	if (!size)
+		return 0;
 
 	/*
-	 * Test if a nfs_page is fully contained in the pnfs_layout_range.
-	 * Note that this test makes several assumptions:
-	 * - that the previous nfs_page in the struct nfs_pageio_descriptor
-	 *   is known to lie within the range.
-	 *   - that the nfs_page being tested is known to be contiguous with the
-	 *   previous nfs_page.
-	 *   - Layout ranges are page aligned, so we only have to test the
-	 *   start offset of the request.
+	 * 'size' contains the number of bytes left in the current page (up
+	 * to the original size asked for in @req->wb_bytes).
+	 *
+	 * Calculate how many bytes are left in the layout segment
+	 * and if there are less bytes than 'size', return that instead.
 	 *
 	 * Please also note that 'end_offset' is actually the offset of the
 	 * first byte that lies outside the pnfs_layout_range. FIXME?
 	 *
 	 */
-	return req_offset(req) < end_offset(pgio->pg_lseg->pls_range.offset,
-					 pgio->pg_lseg->pls_range.length);
+	if (pgio->pg_lseg) {
+		seg_end = end_offset(pgio->pg_lseg->pls_range.offset,
+				     pgio->pg_lseg->pls_range.length);
+		req_start = req_offset(req);
+		WARN_ON_ONCE(req_start > seg_end);
+		/* start of request is past the last byte of this segment */
+		if (req_start >= seg_end)
+			return 0;
+
+		/* adjust 'size' iff there are fewer bytes left in the
+		 * segment than what nfs_generic_pg_test returned */
+		seg_left = seg_end - req_start;
+		if (seg_left < size)
+			size = (unsigned int)seg_left;
+	}
+
+	return size;
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_pg_test);
 
@@ -1496,7 +1479,7 @@ int pnfs_write_done_resend_to_mds(struct inode *inode,
 	LIST_HEAD(failed);
 
 	/* Resend all requests through the MDS */
-	nfs_pageio_init_write(&pgio, inode, FLUSH_STABLE, compl_ops);
+	nfs_pageio_init_write(&pgio, inode, FLUSH_STABLE, true, compl_ops);
 	pgio.pg_dreq = dreq;
 	while (!list_empty(head)) {
 		struct nfs_page *req = nfs_list_entry(head->next);
@@ -1519,7 +1502,7 @@ int pnfs_write_done_resend_to_mds(struct inode *inode,
 }
 EXPORT_SYMBOL_GPL(pnfs_write_done_resend_to_mds);
 
-static void pnfs_ld_handle_write_error(struct nfs_write_data *data)
+static void pnfs_ld_handle_write_error(struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 
@@ -1538,7 +1521,7 @@ static void pnfs_ld_handle_write_error(struct nfs_write_data *data)
 /*
  * Called by non rpc-based layout drivers
  */
-void pnfs_ld_write_done(struct nfs_write_data *data)
+void pnfs_ld_write_done(struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 
@@ -1554,7 +1537,7 @@ EXPORT_SYMBOL_GPL(pnfs_ld_write_done);
 
 static void
 pnfs_write_through_mds(struct nfs_pageio_descriptor *desc,
-		struct nfs_write_data *data)
+		struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 
@@ -1563,11 +1546,11 @@ pnfs_write_through_mds(struct nfs_pageio_descriptor *desc,
 		nfs_pageio_reset_write_mds(desc);
 		desc->pg_recoalesce = 1;
 	}
-	nfs_writedata_release(data);
+	nfs_pgio_data_release(data);
 }
 
 static enum pnfs_try_status
-pnfs_try_to_write_data(struct nfs_write_data *wdata,
+pnfs_try_to_write_data(struct nfs_pgio_data *wdata,
 			const struct rpc_call_ops *call_ops,
 			struct pnfs_layout_segment *lseg,
 			int how)
@@ -1589,41 +1572,36 @@ pnfs_try_to_write_data(struct nfs_write_data *wdata,
 }
 
 static void
-pnfs_do_multiple_writes(struct nfs_pageio_descriptor *desc, struct list_head *head, int how)
+pnfs_do_write(struct nfs_pageio_descriptor *desc,
+	      struct nfs_pgio_header *hdr, int how)
 {
-	struct nfs_write_data *data;
+	struct nfs_pgio_data *data = hdr->data;
 	const struct rpc_call_ops *call_ops = desc->pg_rpc_callops;
 	struct pnfs_layout_segment *lseg = desc->pg_lseg;
+	enum pnfs_try_status trypnfs;
 
 	desc->pg_lseg = NULL;
-	while (!list_empty(head)) {
-		enum pnfs_try_status trypnfs;
-
-		data = list_first_entry(head, struct nfs_write_data, list);
-		list_del_init(&data->list);
-
-		trypnfs = pnfs_try_to_write_data(data, call_ops, lseg, how);
-		if (trypnfs == PNFS_NOT_ATTEMPTED)
-			pnfs_write_through_mds(desc, data);
-	}
+	trypnfs = pnfs_try_to_write_data(data, call_ops, lseg, how);
+	if (trypnfs == PNFS_NOT_ATTEMPTED)
+		pnfs_write_through_mds(desc, data);
 	pnfs_put_lseg(lseg);
 }
 
 static void pnfs_writehdr_free(struct nfs_pgio_header *hdr)
 {
 	pnfs_put_lseg(hdr->lseg);
-	nfs_writehdr_free(hdr);
+	nfs_rw_header_free(hdr);
 }
 EXPORT_SYMBOL_GPL(pnfs_writehdr_free);
 
 int
 pnfs_generic_pg_writepages(struct nfs_pageio_descriptor *desc)
 {
-	struct nfs_write_header *whdr;
+	struct nfs_rw_header *whdr;
 	struct nfs_pgio_header *hdr;
 	int ret;
 
-	whdr = nfs_writehdr_alloc();
+	whdr = nfs_rw_header_alloc(desc->pg_rw_ops);
 	if (!whdr) {
 		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
 		pnfs_put_lseg(desc->pg_lseg);
@@ -1634,12 +1612,12 @@ pnfs_generic_pg_writepages(struct nfs_pageio_descriptor *desc)
 	nfs_pgheader_init(desc, hdr, pnfs_writehdr_free);
 	hdr->lseg = pnfs_get_lseg(desc->pg_lseg);
 	atomic_inc(&hdr->refcnt);
-	ret = nfs_generic_flush(desc, hdr);
+	ret = nfs_generic_pgio(desc, hdr);
 	if (ret != 0) {
 		pnfs_put_lseg(desc->pg_lseg);
 		desc->pg_lseg = NULL;
 	} else
-		pnfs_do_multiple_writes(desc, &hdr->rpc_list, desc->pg_ioflags);
+		pnfs_do_write(desc, hdr, desc->pg_ioflags);
 	if (atomic_dec_and_test(&hdr->refcnt))
 		hdr->completion_ops->completion(hdr);
 	return ret;
@@ -1655,7 +1633,7 @@ int pnfs_read_done_resend_to_mds(struct inode *inode,
 	LIST_HEAD(failed);
 
 	/* Resend all requests through the MDS */
-	nfs_pageio_init_read(&pgio, inode, compl_ops);
+	nfs_pageio_init_read(&pgio, inode, true, compl_ops);
 	pgio.pg_dreq = dreq;
 	while (!list_empty(head)) {
 		struct nfs_page *req = nfs_list_entry(head->next);
@@ -1674,7 +1652,7 @@ int pnfs_read_done_resend_to_mds(struct inode *inode,
 }
 EXPORT_SYMBOL_GPL(pnfs_read_done_resend_to_mds);
 
-static void pnfs_ld_handle_read_error(struct nfs_read_data *data)
+static void pnfs_ld_handle_read_error(struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 
@@ -1693,7 +1671,7 @@ static void pnfs_ld_handle_read_error(struct nfs_read_data *data)
 /*
  * Called by non rpc-based layout drivers
  */
-void pnfs_ld_read_done(struct nfs_read_data *data)
+void pnfs_ld_read_done(struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 
@@ -1709,7 +1687,7 @@ EXPORT_SYMBOL_GPL(pnfs_ld_read_done);
 
 static void
 pnfs_read_through_mds(struct nfs_pageio_descriptor *desc,
-		struct nfs_read_data *data)
+		struct nfs_pgio_data *data)
 {
 	struct nfs_pgio_header *hdr = data->header;
 
@@ -1718,14 +1696,14 @@ pnfs_read_through_mds(struct nfs_pageio_descriptor *desc,
 		nfs_pageio_reset_read_mds(desc);
 		desc->pg_recoalesce = 1;
 	}
-	nfs_readdata_release(data);
+	nfs_pgio_data_release(data);
 }
 
 /*
  * Call the appropriate parallel I/O subsystem read function.
  */
 static enum pnfs_try_status
-pnfs_try_to_read_data(struct nfs_read_data *rdata,
+pnfs_try_to_read_data(struct nfs_pgio_data *rdata,
 		       const struct rpc_call_ops *call_ops,
 		       struct pnfs_layout_segment *lseg)
 {
@@ -1747,41 +1725,35 @@ pnfs_try_to_read_data(struct nfs_read_data *rdata,
 }
 
 static void
-pnfs_do_multiple_reads(struct nfs_pageio_descriptor *desc, struct list_head *head)
+pnfs_do_read(struct nfs_pageio_descriptor *desc, struct nfs_pgio_header *hdr)
 {
-	struct nfs_read_data *data;
+	struct nfs_pgio_data *data = hdr->data;
 	const struct rpc_call_ops *call_ops = desc->pg_rpc_callops;
 	struct pnfs_layout_segment *lseg = desc->pg_lseg;
+	enum pnfs_try_status trypnfs;
 
 	desc->pg_lseg = NULL;
-	while (!list_empty(head)) {
-		enum pnfs_try_status trypnfs;
-
-		data = list_first_entry(head, struct nfs_read_data, list);
-		list_del_init(&data->list);
-
-		trypnfs = pnfs_try_to_read_data(data, call_ops, lseg);
-		if (trypnfs == PNFS_NOT_ATTEMPTED)
-			pnfs_read_through_mds(desc, data);
-	}
+	trypnfs = pnfs_try_to_read_data(data, call_ops, lseg);
+	if (trypnfs == PNFS_NOT_ATTEMPTED)
+		pnfs_read_through_mds(desc, data);
 	pnfs_put_lseg(lseg);
 }
 
 static void pnfs_readhdr_free(struct nfs_pgio_header *hdr)
 {
 	pnfs_put_lseg(hdr->lseg);
-	nfs_readhdr_free(hdr);
+	nfs_rw_header_free(hdr);
 }
 EXPORT_SYMBOL_GPL(pnfs_readhdr_free);
 
 int
 pnfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc)
 {
-	struct nfs_read_header *rhdr;
+	struct nfs_rw_header *rhdr;
 	struct nfs_pgio_header *hdr;
 	int ret;
 
-	rhdr = nfs_readhdr_alloc();
+	rhdr = nfs_rw_header_alloc(desc->pg_rw_ops);
 	if (!rhdr) {
 		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
 		ret = -ENOMEM;
@@ -1793,12 +1765,12 @@ pnfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc)
 	nfs_pgheader_init(desc, hdr, pnfs_readhdr_free);
 	hdr->lseg = pnfs_get_lseg(desc->pg_lseg);
 	atomic_inc(&hdr->refcnt);
-	ret = nfs_generic_pagein(desc, hdr);
+	ret = nfs_generic_pgio(desc, hdr);
 	if (ret != 0) {
 		pnfs_put_lseg(desc->pg_lseg);
 		desc->pg_lseg = NULL;
 	} else
-		pnfs_do_multiple_reads(desc, &hdr->rpc_list);
+		pnfs_do_read(desc, hdr);
 	if (atomic_dec_and_test(&hdr->refcnt))
 		hdr->completion_ops->completion(hdr);
 	return ret;
@@ -1848,7 +1820,7 @@ void pnfs_set_lo_fail(struct pnfs_layout_segment *lseg)
 EXPORT_SYMBOL_GPL(pnfs_set_lo_fail);
 
 void
-pnfs_set_layoutcommit(struct nfs_write_data *wdata)
+pnfs_set_layoutcommit(struct nfs_pgio_data *wdata)
 {
 	struct nfs_pgio_header *hdr = wdata->header;
 	struct inode *inode = hdr->inode;
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index c3058a076596..4fb309a2b4c4 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -113,8 +113,8 @@ struct pnfs_layoutdriver_type {
 	 * Return PNFS_ATTEMPTED to indicate the layout code has attempted
 	 * I/O, else return PNFS_NOT_ATTEMPTED to fall back to normal NFS
 	 */
-	enum pnfs_try_status (*read_pagelist) (struct nfs_read_data *nfs_data);
-	enum pnfs_try_status (*write_pagelist) (struct nfs_write_data *nfs_data, int how);
+	enum pnfs_try_status (*read_pagelist) (struct nfs_pgio_data *nfs_data);
+	enum pnfs_try_status (*write_pagelist) (struct nfs_pgio_data *nfs_data, int how);
 
 	void (*free_deviceid_node) (struct nfs4_deviceid_node *);
 
@@ -180,11 +180,6 @@ extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp);
 void pnfs_get_layout_hdr(struct pnfs_layout_hdr *lo);
 void pnfs_put_lseg(struct pnfs_layout_segment *lseg);
 
-void pnfs_pageio_init_read(struct nfs_pageio_descriptor *, struct inode *,
-			   const struct nfs_pgio_completion_ops *);
-void pnfs_pageio_init_write(struct nfs_pageio_descriptor *, struct inode *,
-			    int, const struct nfs_pgio_completion_ops *);
-
 void set_pnfs_layoutdriver(struct nfs_server *, const struct nfs_fh *, u32);
 void unset_pnfs_layoutdriver(struct nfs_server *);
 void pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *, struct nfs_page *);
@@ -192,7 +187,8 @@ int pnfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc);
 void pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio,
 			        struct nfs_page *req, u64 wb_size);
 int pnfs_generic_pg_writepages(struct nfs_pageio_descriptor *desc);
-bool pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev, struct nfs_page *req);
+size_t pnfs_generic_pg_test(struct nfs_pageio_descriptor *pgio,
+			    struct nfs_page *prev, struct nfs_page *req);
 void pnfs_set_lo_fail(struct pnfs_layout_segment *lseg);
 struct pnfs_layout_segment *pnfs_layout_process(struct nfs4_layoutget *lgp);
 void pnfs_free_lseg_list(struct list_head *tmp_list);
@@ -217,13 +213,13 @@ bool pnfs_roc(struct inode *ino);
 void pnfs_roc_release(struct inode *ino);
 void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
 bool pnfs_roc_drain(struct inode *ino, u32 *barrier, struct rpc_task *task);
-void pnfs_set_layoutcommit(struct nfs_write_data *wdata);
+void pnfs_set_layoutcommit(struct nfs_pgio_data *wdata);
 void pnfs_cleanup_layoutcommit(struct nfs4_layoutcommit_data *data);
 int pnfs_layoutcommit_inode(struct inode *inode, bool sync);
 int _pnfs_return_layout(struct inode *);
 int pnfs_commit_and_return_layout(struct inode *);
-void pnfs_ld_write_done(struct nfs_write_data *);
-void pnfs_ld_read_done(struct nfs_read_data *);
+void pnfs_ld_write_done(struct nfs_pgio_data *);
+void pnfs_ld_read_done(struct nfs_pgio_data *);
 struct pnfs_layout_segment *pnfs_update_layout(struct inode *ino,
 					       struct nfs_open_context *ctx,
 					       loff_t pos,
@@ -461,18 +457,6 @@ static inline void unset_pnfs_layoutdriver(struct nfs_server *s)
 {
 }
 
-static inline void pnfs_pageio_init_read(struct nfs_pageio_descriptor *pgio, struct inode *inode,
-					 const struct nfs_pgio_completion_ops *compl_ops)
-{
-	nfs_pageio_init_read(pgio, inode, compl_ops);
-}
-
-static inline void pnfs_pageio_init_write(struct nfs_pageio_descriptor *pgio, struct inode *inode, int ioflags,
-					  const struct nfs_pgio_completion_ops *compl_ops)
-{
-	nfs_pageio_init_write(pgio, inode, ioflags, compl_ops);
-}
-
 static inline int
 pnfs_commit_list(struct inode *inode, struct list_head *mds_pages, int how,
 		 struct nfs_commit_info *cinfo)
diff --git a/fs/nfs/proc.c b/fs/nfs/proc.c
index e55ce9e8b034..c171ce1a8a30 100644
--- a/fs/nfs/proc.c
+++ b/fs/nfs/proc.c
@@ -578,7 +578,7 @@ nfs_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle,
 	return 0;
 }
 
-static int nfs_read_done(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs_read_done(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	struct inode *inode = data->header->inode;
 
@@ -594,18 +594,18 @@ static int nfs_read_done(struct rpc_task *task, struct nfs_read_data *data)
 	return 0;
 }
 
-static void nfs_proc_read_setup(struct nfs_read_data *data, struct rpc_message *msg)
+static void nfs_proc_read_setup(struct nfs_pgio_data *data, struct rpc_message *msg)
 {
 	msg->rpc_proc = &nfs_procedures[NFSPROC_READ];
 }
 
-static int nfs_proc_read_rpc_prepare(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs_proc_pgio_rpc_prepare(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	rpc_call_start(task);
 	return 0;
 }
 
-static int nfs_write_done(struct rpc_task *task, struct nfs_write_data *data)
+static int nfs_write_done(struct rpc_task *task, struct nfs_pgio_data *data)
 {
 	struct inode *inode = data->header->inode;
 
@@ -614,19 +614,13 @@ static int nfs_write_done(struct rpc_task *task, struct nfs_write_data *data)
 	return 0;
 }
 
-static void nfs_proc_write_setup(struct nfs_write_data *data, struct rpc_message *msg)
+static void nfs_proc_write_setup(struct nfs_pgio_data *data, struct rpc_message *msg)
 {
 	/* Note: NFSv2 ignores @stable and always uses NFS_FILE_SYNC */
 	data->args.stable = NFS_FILE_SYNC;
 	msg->rpc_proc = &nfs_procedures[NFSPROC_WRITE];
 }
 
-static int nfs_proc_write_rpc_prepare(struct rpc_task *task, struct nfs_write_data *data)
-{
-	rpc_call_start(task);
-	return 0;
-}
-
 static void nfs_proc_commit_rpc_prepare(struct rpc_task *task, struct nfs_commit_data *data)
 {
 	BUG();
@@ -734,13 +728,10 @@ const struct nfs_rpc_ops nfs_v2_clientops = {
 	.fsinfo		= nfs_proc_fsinfo,
 	.pathconf	= nfs_proc_pathconf,
 	.decode_dirent	= nfs2_decode_dirent,
+	.pgio_rpc_prepare = nfs_proc_pgio_rpc_prepare,
 	.read_setup	= nfs_proc_read_setup,
-	.read_pageio_init = nfs_pageio_init_read,
-	.read_rpc_prepare = nfs_proc_read_rpc_prepare,
 	.read_done	= nfs_read_done,
 	.write_setup	= nfs_proc_write_setup,
-	.write_pageio_init = nfs_pageio_init_write,
-	.write_rpc_prepare = nfs_proc_write_rpc_prepare,
 	.write_done	= nfs_write_done,
 	.commit_setup	= nfs_proc_commit_setup,
 	.commit_rpc_prepare = nfs_proc_commit_rpc_prepare,
diff --git a/fs/nfs/read.c b/fs/nfs/read.c
index 411aedda14bb..e818a475ca64 100644
--- a/fs/nfs/read.c
+++ b/fs/nfs/read.c
@@ -24,85 +24,24 @@
 #include "internal.h"
 #include "iostat.h"
 #include "fscache.h"
+#include "pnfs.h"
 
 #define NFSDBG_FACILITY		NFSDBG_PAGECACHE
 
-static const struct nfs_pageio_ops nfs_pageio_read_ops;
-static const struct rpc_call_ops nfs_read_common_ops;
 static const struct nfs_pgio_completion_ops nfs_async_read_completion_ops;
+static const struct nfs_rw_ops nfs_rw_read_ops;
 
 static struct kmem_cache *nfs_rdata_cachep;
 
-struct nfs_read_header *nfs_readhdr_alloc(void)
+static struct nfs_rw_header *nfs_readhdr_alloc(void)
 {
-	struct nfs_read_header *rhdr;
-
-	rhdr = kmem_cache_zalloc(nfs_rdata_cachep, GFP_KERNEL);
-	if (rhdr) {
-		struct nfs_pgio_header *hdr = &rhdr->header;
-
-		INIT_LIST_HEAD(&hdr->pages);
-		INIT_LIST_HEAD(&hdr->rpc_list);
-		spin_lock_init(&hdr->lock);
-		atomic_set(&hdr->refcnt, 0);
-	}
-	return rhdr;
+	return kmem_cache_zalloc(nfs_rdata_cachep, GFP_KERNEL);
 }
-EXPORT_SYMBOL_GPL(nfs_readhdr_alloc);
 
-static struct nfs_read_data *nfs_readdata_alloc(struct nfs_pgio_header *hdr,
-						unsigned int pagecount)
+static void nfs_readhdr_free(struct nfs_rw_header *rhdr)
 {
-	struct nfs_read_data *data, *prealloc;
-
-	prealloc = &container_of(hdr, struct nfs_read_header, header)->rpc_data;
-	if (prealloc->header == NULL)
-		data = prealloc;
-	else
-		data = kzalloc(sizeof(*data), GFP_KERNEL);
-	if (!data)
-		goto out;
-
-	if (nfs_pgarray_set(&data->pages, pagecount)) {
-		data->header = hdr;
-		atomic_inc(&hdr->refcnt);
-	} else {
-		if (data != prealloc)
-			kfree(data);
-		data = NULL;
-	}
-out:
-	return data;
-}
-
-void nfs_readhdr_free(struct nfs_pgio_header *hdr)
-{
-	struct nfs_read_header *rhdr = container_of(hdr, struct nfs_read_header, header);
-
 	kmem_cache_free(nfs_rdata_cachep, rhdr);
 }
-EXPORT_SYMBOL_GPL(nfs_readhdr_free);
-
-void nfs_readdata_release(struct nfs_read_data *rdata)
-{
-	struct nfs_pgio_header *hdr = rdata->header;
-	struct nfs_read_header *read_header = container_of(hdr, struct nfs_read_header, header);
-
-	put_nfs_open_context(rdata->args.context);
-	if (rdata->pages.pagevec != rdata->pages.page_array)
-		kfree(rdata->pages.pagevec);
-	if (rdata == &read_header->rpc_data) {
-		rdata->header = NULL;
-		rdata = NULL;
-	}
-	if (atomic_dec_and_test(&hdr->refcnt))
-		hdr->completion_ops->completion(hdr);
-	/* Note: we only free the rpc_task after callbacks are done.
-	 * See the comment in rpc_free_task() for why
-	 */
-	kfree(rdata);
-}
-EXPORT_SYMBOL_GPL(nfs_readdata_release);
 
 static
 int nfs_return_empty_page(struct page *page)
@@ -114,17 +53,24 @@ int nfs_return_empty_page(struct page *page)
 }
 
 void nfs_pageio_init_read(struct nfs_pageio_descriptor *pgio,
-			      struct inode *inode,
+			      struct inode *inode, bool force_mds,
 			      const struct nfs_pgio_completion_ops *compl_ops)
 {
-	nfs_pageio_init(pgio, inode, &nfs_pageio_read_ops, compl_ops,
-			NFS_SERVER(inode)->rsize, 0);
+	struct nfs_server *server = NFS_SERVER(inode);
+	const struct nfs_pageio_ops *pg_ops = &nfs_pgio_rw_ops;
+
+#ifdef CONFIG_NFS_V4_1
+	if (server->pnfs_curr_ld && !force_mds)
+		pg_ops = server->pnfs_curr_ld->pg_read_ops;
+#endif
+	nfs_pageio_init(pgio, inode, pg_ops, compl_ops, &nfs_rw_read_ops,
+			server->rsize, 0);
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_init_read);
 
 void nfs_pageio_reset_read_mds(struct nfs_pageio_descriptor *pgio)
 {
-	pgio->pg_ops = &nfs_pageio_read_ops;
+	pgio->pg_ops = &nfs_pgio_rw_ops;
 	pgio->pg_bsize = NFS_SERVER(pgio->pg_inode)->rsize;
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_reset_read_mds);
@@ -139,7 +85,7 @@ int nfs_readpage_async(struct nfs_open_context *ctx, struct inode *inode,
 	len = nfs_page_length(page);
 	if (len == 0)
 		return nfs_return_empty_page(page);
-	new = nfs_create_request(ctx, inode, page, 0, len);
+	new = nfs_create_request(ctx, page, NULL, 0, len);
 	if (IS_ERR(new)) {
 		unlock_page(page);
 		return PTR_ERR(new);
@@ -147,7 +93,8 @@ int nfs_readpage_async(struct nfs_open_context *ctx, struct inode *inode,
 	if (len < PAGE_CACHE_SIZE)
 		zero_user_segment(page, len, PAGE_CACHE_SIZE);
 
-	NFS_PROTO(inode)->read_pageio_init(&pgio, inode, &nfs_async_read_completion_ops);
+	nfs_pageio_init_read(&pgio, inode, false,
+			     &nfs_async_read_completion_ops);
 	nfs_pageio_add_request(&pgio, new);
 	nfs_pageio_complete(&pgio);
 	NFS_I(inode)->read_io += pgio.pg_bytes_written;
@@ -158,10 +105,16 @@ static void nfs_readpage_release(struct nfs_page *req)
 {
 	struct inode *d_inode = req->wb_context->dentry->d_inode;
 
-	if (PageUptodate(req->wb_page))
-		nfs_readpage_to_fscache(d_inode, req->wb_page, 0);
+	dprintk("NFS: read done (%s/%llu %d@%lld)\n", d_inode->i_sb->s_id,
+		(unsigned long long)NFS_FILEID(d_inode), req->wb_bytes,
+		(long long)req_offset(req));
 
-	unlock_page(req->wb_page);
+	if (nfs_page_group_sync_on_bit(req, PG_UNLOCKPAGE)) {
+		if (PageUptodate(req->wb_page))
+			nfs_readpage_to_fscache(d_inode, req->wb_page, 0);
+
+		unlock_page(req->wb_page);
+	}
 
 	dprintk("NFS: read done (%s/%Lu %d@%Ld)\n",
 			req->wb_context->dentry->d_inode->i_sb->s_id,
@@ -171,7 +124,12 @@ static void nfs_readpage_release(struct nfs_page *req)
 	nfs_release_request(req);
 }
 
-/* Note io was page aligned */
+static void nfs_page_group_set_uptodate(struct nfs_page *req)
+{
+	if (nfs_page_group_sync_on_bit(req, PG_UPTODATE))
+		SetPageUptodate(req->wb_page);
+}
+
 static void nfs_read_completion(struct nfs_pgio_header *hdr)
 {
 	unsigned long bytes = 0;
@@ -181,21 +139,32 @@ static void nfs_read_completion(struct nfs_pgio_header *hdr)
 	while (!list_empty(&hdr->pages)) {
 		struct nfs_page *req = nfs_list_entry(hdr->pages.next);
 		struct page *page = req->wb_page;
+		unsigned long start = req->wb_pgbase;
+		unsigned long end = req->wb_pgbase + req->wb_bytes;
 
 		if (test_bit(NFS_IOHDR_EOF, &hdr->flags)) {
-			if (bytes > hdr->good_bytes)
-				zero_user(page, 0, PAGE_SIZE);
-			else if (hdr->good_bytes - bytes < PAGE_SIZE)
-				zero_user_segment(page,
-					hdr->good_bytes & ~PAGE_MASK,
-					PAGE_SIZE);
+			/* note: regions of the page not covered by a
+			 * request are zeroed in nfs_readpage_async /
+			 * readpage_async_filler */
+			if (bytes > hdr->good_bytes) {
+				/* nothing in this request was good, so zero
+				 * the full extent of the request */
+				zero_user_segment(page, start, end);
+
+			} else if (hdr->good_bytes - bytes < req->wb_bytes) {
+				/* part of this request has good bytes, but
+				 * not all. zero the bad bytes */
+				start += hdr->good_bytes - bytes;
+				WARN_ON(start < req->wb_pgbase);
+				zero_user_segment(page, start, end);
+			}
 		}
 		bytes += req->wb_bytes;
 		if (test_bit(NFS_IOHDR_ERROR, &hdr->flags)) {
 			if (bytes <= hdr->good_bytes)
-				SetPageUptodate(page);
+				nfs_page_group_set_uptodate(req);
 		} else
-			SetPageUptodate(page);
+			nfs_page_group_set_uptodate(req);
 		nfs_list_remove_request(req);
 		nfs_readpage_release(req);
 	}
@@ -203,95 +172,14 @@ out:
 	hdr->release(hdr);
 }
 
-int nfs_initiate_read(struct rpc_clnt *clnt,
-		      struct nfs_read_data *data,
-		      const struct rpc_call_ops *call_ops, int flags)
+static void nfs_initiate_read(struct nfs_pgio_data *data, struct rpc_message *msg,
+			      struct rpc_task_setup *task_setup_data, int how)
 {
 	struct inode *inode = data->header->inode;
 	int swap_flags = IS_SWAPFILE(inode) ? NFS_RPC_SWAPFLAGS : 0;
-	struct rpc_task *task;
-	struct rpc_message msg = {
-		.rpc_argp = &data->args,
-		.rpc_resp = &data->res,
-		.rpc_cred = data->header->cred,
-	};
-	struct rpc_task_setup task_setup_data = {
-		.task = &data->task,
-		.rpc_client = clnt,
-		.rpc_message = &msg,
-		.callback_ops = call_ops,
-		.callback_data = data,
-		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC | swap_flags | flags,
-	};
 
-	/* Set up the initial task struct. */
-	NFS_PROTO(inode)->read_setup(data, &msg);
-
-	dprintk("NFS: %5u initiated read call (req %s/%llu, %u bytes @ "
-			"offset %llu)\n",
-			data->task.tk_pid,
-			inode->i_sb->s_id,
-			(unsigned long long)NFS_FILEID(inode),
-			data->args.count,
-			(unsigned long long)data->args.offset);
-
-	task = rpc_run_task(&task_setup_data);
-	if (IS_ERR(task))
-		return PTR_ERR(task);
-	rpc_put_task(task);
-	return 0;
-}
-EXPORT_SYMBOL_GPL(nfs_initiate_read);
-
-/*
- * Set up the NFS read request struct
- */
-static void nfs_read_rpcsetup(struct nfs_read_data *data,
-		unsigned int count, unsigned int offset)
-{
-	struct nfs_page *req = data->header->req;
-
-	data->args.fh     = NFS_FH(data->header->inode);
-	data->args.offset = req_offset(req) + offset;
-	data->args.pgbase = req->wb_pgbase + offset;
-	data->args.pages  = data->pages.pagevec;
-	data->args.count  = count;
-	data->args.context = get_nfs_open_context(req->wb_context);
-	data->args.lock_context = req->wb_lock_context;
-
-	data->res.fattr   = &data->fattr;
-	data->res.count   = count;
-	data->res.eof     = 0;
-	nfs_fattr_init(&data->fattr);
-}
-
-static int nfs_do_read(struct nfs_read_data *data,
-		const struct rpc_call_ops *call_ops)
-{
-	struct inode *inode = data->header->inode;
-
-	return nfs_initiate_read(NFS_CLIENT(inode), data, call_ops, 0);
-}
-
-static int
-nfs_do_multiple_reads(struct list_head *head,
-		const struct rpc_call_ops *call_ops)
-{
-	struct nfs_read_data *data;
-	int ret = 0;
-
-	while (!list_empty(head)) {
-		int ret2;
-
-		data = list_first_entry(head, struct nfs_read_data, list);
-		list_del_init(&data->list);
-
-		ret2 = nfs_do_read(data, call_ops);
-		if (ret == 0)
-			ret = ret2;
-	}
-	return ret;
+	task_setup_data->flags |= swap_flags;
+	NFS_PROTO(inode)->read_setup(data, msg);
 }
 
 static void
@@ -311,143 +199,14 @@ static const struct nfs_pgio_completion_ops nfs_async_read_completion_ops = {
 	.completion = nfs_read_completion,
 };
 
-static void nfs_pagein_error(struct nfs_pageio_descriptor *desc,
-		struct nfs_pgio_header *hdr)
-{
-	set_bit(NFS_IOHDR_REDO, &hdr->flags);
-	while (!list_empty(&hdr->rpc_list)) {
-		struct nfs_read_data *data = list_first_entry(&hdr->rpc_list,
-				struct nfs_read_data, list);
-		list_del(&data->list);
-		nfs_readdata_release(data);
-	}
-	desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-}
-
-/*
- * Generate multiple requests to fill a single page.
- *
- * We optimize to reduce the number of read operations on the wire.  If we
- * detect that we're reading a page, or an area of a page, that is past the
- * end of file, we do not generate NFS read operations but just clear the
- * parts of the page that would have come back zero from the server anyway.
- *
- * We rely on the cached value of i_size to make this determination; another
- * client can fill pages on the server past our cached end-of-file, but we
- * won't see the new data until our attribute cache is updated.  This is more
- * or less conventional NFS client behavior.
- */
-static int nfs_pagein_multi(struct nfs_pageio_descriptor *desc,
-			    struct nfs_pgio_header *hdr)
-{
-	struct nfs_page *req = hdr->req;
-	struct page *page = req->wb_page;
-	struct nfs_read_data *data;
-	size_t rsize = desc->pg_bsize, nbytes;
-	unsigned int offset;
-
-	offset = 0;
-	nbytes = desc->pg_count;
-	do {
-		size_t len = min(nbytes,rsize);
-
-		data = nfs_readdata_alloc(hdr, 1);
-		if (!data) {
-			nfs_pagein_error(desc, hdr);
-			return -ENOMEM;
-		}
-		data->pages.pagevec[0] = page;
-		nfs_read_rpcsetup(data, len, offset);
-		list_add(&data->list, &hdr->rpc_list);
-		nbytes -= len;
-		offset += len;
-	} while (nbytes != 0);
-
-	nfs_list_remove_request(req);
-	nfs_list_add_request(req, &hdr->pages);
-	desc->pg_rpc_callops = &nfs_read_common_ops;
-	return 0;
-}
-
-static int nfs_pagein_one(struct nfs_pageio_descriptor *desc,
-			  struct nfs_pgio_header *hdr)
-{
-	struct nfs_page		*req;
-	struct page		**pages;
-	struct nfs_read_data    *data;
-	struct list_head *head = &desc->pg_list;
-
-	data = nfs_readdata_alloc(hdr, nfs_page_array_len(desc->pg_base,
-							  desc->pg_count));
-	if (!data) {
-		nfs_pagein_error(desc, hdr);
-		return -ENOMEM;
-	}
-
-	pages = data->pages.pagevec;
-	while (!list_empty(head)) {
-		req = nfs_list_entry(head->next);
-		nfs_list_remove_request(req);
-		nfs_list_add_request(req, &hdr->pages);
-		*pages++ = req->wb_page;
-	}
-
-	nfs_read_rpcsetup(data, desc->pg_count, 0);
-	list_add(&data->list, &hdr->rpc_list);
-	desc->pg_rpc_callops = &nfs_read_common_ops;
-	return 0;
-}
-
-int nfs_generic_pagein(struct nfs_pageio_descriptor *desc,
-		       struct nfs_pgio_header *hdr)
-{
-	if (desc->pg_bsize < PAGE_CACHE_SIZE)
-		return nfs_pagein_multi(desc, hdr);
-	return nfs_pagein_one(desc, hdr);
-}
-EXPORT_SYMBOL_GPL(nfs_generic_pagein);
-
-static int nfs_generic_pg_readpages(struct nfs_pageio_descriptor *desc)
-{
-	struct nfs_read_header *rhdr;
-	struct nfs_pgio_header *hdr;
-	int ret;
-
-	rhdr = nfs_readhdr_alloc();
-	if (!rhdr) {
-		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-		return -ENOMEM;
-	}
-	hdr = &rhdr->header;
-	nfs_pgheader_init(desc, hdr, nfs_readhdr_free);
-	atomic_inc(&hdr->refcnt);
-	ret = nfs_generic_pagein(desc, hdr);
-	if (ret == 0)
-		ret = nfs_do_multiple_reads(&hdr->rpc_list,
-					    desc->pg_rpc_callops);
-	if (atomic_dec_and_test(&hdr->refcnt))
-		hdr->completion_ops->completion(hdr);
-	return ret;
-}
-
-static const struct nfs_pageio_ops nfs_pageio_read_ops = {
-	.pg_test = nfs_generic_pg_test,
-	.pg_doio = nfs_generic_pg_readpages,
-};
-
 /*
  * This is the callback from RPC telling us whether a reply was
  * received or some error occurred (timeout or socket shutdown).
  */
-int nfs_readpage_result(struct rpc_task *task, struct nfs_read_data *data)
+static int nfs_readpage_done(struct rpc_task *task, struct nfs_pgio_data *data,
+			     struct inode *inode)
 {
-	struct inode *inode = data->header->inode;
-	int status;
-
-	dprintk("NFS: %s: %5u, (status %d)\n", __func__, task->tk_pid,
-			task->tk_status);
-
-	status = NFS_PROTO(inode)->read_done(task, data);
+	int status = NFS_PROTO(inode)->read_done(task, data);
 	if (status != 0)
 		return status;
 
@@ -460,10 +219,10 @@ int nfs_readpage_result(struct rpc_task *task, struct nfs_read_data *data)
 	return 0;
 }
 
-static void nfs_readpage_retry(struct rpc_task *task, struct nfs_read_data *data)
+static void nfs_readpage_retry(struct rpc_task *task, struct nfs_pgio_data *data)
 {
-	struct nfs_readargs *argp = &data->args;
-	struct nfs_readres *resp = &data->res;
+	struct nfs_pgio_args *argp = &data->args;
+	struct nfs_pgio_res  *resp = &data->res;
 
 	/* This is a short read! */
 	nfs_inc_stats(data->header->inode, NFSIOS_SHORTREAD);
@@ -480,17 +239,11 @@ static void nfs_readpage_retry(struct rpc_task *task, struct nfs_read_data *data
 	rpc_restart_call_prepare(task);
 }
 
-static void nfs_readpage_result_common(struct rpc_task *task, void *calldata)
+static void nfs_readpage_result(struct rpc_task *task, struct nfs_pgio_data *data)
 {
-	struct nfs_read_data *data = calldata;
 	struct nfs_pgio_header *hdr = data->header;
 
-	/* Note the only returns of nfs_readpage_result are 0 and -EAGAIN */
-	if (nfs_readpage_result(task, data) != 0)
-		return;
-	if (task->tk_status < 0)
-		nfs_set_pgio_error(hdr, task->tk_status, data->args.offset);
-	else if (data->res.eof) {
+	if (data->res.eof) {
 		loff_t bound;
 
 		bound = data->args.offset + data->res.count;
@@ -505,26 +258,6 @@ static void nfs_readpage_result_common(struct rpc_task *task, void *calldata)
 		nfs_readpage_retry(task, data);
 }
 
-static void nfs_readpage_release_common(void *calldata)
-{
-	nfs_readdata_release(calldata);
-}
-
-void nfs_read_prepare(struct rpc_task *task, void *calldata)
-{
-	struct nfs_read_data *data = calldata;
-	int err;
-	err = NFS_PROTO(data->header->inode)->read_rpc_prepare(task, data);
-	if (err)
-		rpc_exit(task, err);
-}
-
-static const struct rpc_call_ops nfs_read_common_ops = {
-	.rpc_call_prepare = nfs_read_prepare,
-	.rpc_call_done = nfs_readpage_result_common,
-	.rpc_release = nfs_readpage_release_common,
-};
-
 /*
  * Read a page over NFS.
  * We read the page synchronously in the following case:
@@ -592,7 +325,6 @@ static int
 readpage_async_filler(void *data, struct page *page)
 {
 	struct nfs_readdesc *desc = (struct nfs_readdesc *)data;
-	struct inode *inode = page_file_mapping(page)->host;
 	struct nfs_page *new;
 	unsigned int len;
 	int error;
@@ -601,7 +333,7 @@ readpage_async_filler(void *data, struct page *page)
 	if (len == 0)
 		return nfs_return_empty_page(page);
 
-	new = nfs_create_request(desc->ctx, inode, page, 0, len);
+	new = nfs_create_request(desc->ctx, page, NULL, 0, len);
 	if (IS_ERR(new))
 		goto out_error;
 
@@ -654,7 +386,8 @@ int nfs_readpages(struct file *filp, struct address_space *mapping,
 	if (ret == 0)
 		goto read_complete; /* all pages were read */
 
-	NFS_PROTO(inode)->read_pageio_init(&pgio, inode, &nfs_async_read_completion_ops);
+	nfs_pageio_init_read(&pgio, inode, false,
+			     &nfs_async_read_completion_ops);
 
 	ret = read_cache_pages(mapping, pages, readpage_async_filler, &desc);
 
@@ -671,7 +404,7 @@ out:
 int __init nfs_init_readpagecache(void)
 {
 	nfs_rdata_cachep = kmem_cache_create("nfs_read_data",
-					     sizeof(struct nfs_read_header),
+					     sizeof(struct nfs_rw_header),
 					     0, SLAB_HWCACHE_ALIGN,
 					     NULL);
 	if (nfs_rdata_cachep == NULL)
@@ -684,3 +417,12 @@ void nfs_destroy_readpagecache(void)
 {
 	kmem_cache_destroy(nfs_rdata_cachep);
 }
+
+static const struct nfs_rw_ops nfs_rw_read_ops = {
+	.rw_mode		= FMODE_READ,
+	.rw_alloc_header	= nfs_readhdr_alloc,
+	.rw_free_header		= nfs_readhdr_free,
+	.rw_done		= nfs_readpage_done,
+	.rw_result		= nfs_readpage_result,
+	.rw_initiate		= nfs_initiate_read,
+};
diff --git a/fs/nfs/super.c b/fs/nfs/super.c
index 2cb56943e232..084af1060d79 100644
--- a/fs/nfs/super.c
+++ b/fs/nfs/super.c
@@ -2180,11 +2180,23 @@ out_no_address:
 	return -EINVAL;
 }
 
+#define NFS_MOUNT_CMP_FLAGMASK ~(NFS_MOUNT_INTR \
+		| NFS_MOUNT_SECURE \
+		| NFS_MOUNT_TCP \
+		| NFS_MOUNT_VER3 \
+		| NFS_MOUNT_KERBEROS \
+		| NFS_MOUNT_NONLM \
+		| NFS_MOUNT_BROKEN_SUID \
+		| NFS_MOUNT_STRICTLOCK \
+		| NFS_MOUNT_UNSHARED \
+		| NFS_MOUNT_NORESVPORT \
+		| NFS_MOUNT_LEGACY_INTERFACE)
+
 static int
 nfs_compare_remount_data(struct nfs_server *nfss,
 			 struct nfs_parsed_mount_data *data)
 {
-	if (data->flags != nfss->flags ||
+	if ((data->flags ^ nfss->flags) & NFS_MOUNT_CMP_FLAGMASK ||
 	    data->rsize != nfss->rsize ||
 	    data->wsize != nfss->wsize ||
 	    data->version != nfss->nfs_client->rpc_ops->version ||
@@ -2248,6 +2260,7 @@ nfs_remount(struct super_block *sb, int *flags, char *raw_data)
 	data->nfs_server.addrlen = nfss->nfs_client->cl_addrlen;
 	data->version = nfsvers;
 	data->minorversion = nfss->nfs_client->cl_minorversion;
+	data->net = current->nsproxy->net_ns;
 	memcpy(&data->nfs_server.address, &nfss->nfs_client->cl_addr,
 		data->nfs_server.addrlen);
 
@@ -2347,18 +2360,6 @@ void nfs_clone_super(struct super_block *sb, struct nfs_mount_info *mount_info)
  	nfs_initialise_sb(sb);
 }
 
-#define NFS_MOUNT_CMP_FLAGMASK ~(NFS_MOUNT_INTR \
-		| NFS_MOUNT_SECURE \
-		| NFS_MOUNT_TCP \
-		| NFS_MOUNT_VER3 \
-		| NFS_MOUNT_KERBEROS \
-		| NFS_MOUNT_NONLM \
-		| NFS_MOUNT_BROKEN_SUID \
-		| NFS_MOUNT_STRICTLOCK \
-		| NFS_MOUNT_UNSHARED \
-		| NFS_MOUNT_NORESVPORT \
-		| NFS_MOUNT_LEGACY_INTERFACE)
-
 static int nfs_compare_mount_options(const struct super_block *s, const struct nfs_server *b, int flags)
 {
 	const struct nfs_server *a = s->s_fs_info;
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index ffb9459f180b..3ee5af4e738e 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -42,10 +42,10 @@
  * Local function declarations
  */
 static void nfs_redirty_request(struct nfs_page *req);
-static const struct rpc_call_ops nfs_write_common_ops;
 static const struct rpc_call_ops nfs_commit_ops;
 static const struct nfs_pgio_completion_ops nfs_async_write_completion_ops;
 static const struct nfs_commit_completion_ops nfs_commit_completion_ops;
+static const struct nfs_rw_ops nfs_rw_write_ops;
 
 static struct kmem_cache *nfs_wdata_cachep;
 static mempool_t *nfs_wdata_mempool;
@@ -70,76 +70,19 @@ void nfs_commit_free(struct nfs_commit_data *p)
 }
 EXPORT_SYMBOL_GPL(nfs_commit_free);
 
-struct nfs_write_header *nfs_writehdr_alloc(void)
+static struct nfs_rw_header *nfs_writehdr_alloc(void)
 {
-	struct nfs_write_header *p = mempool_alloc(nfs_wdata_mempool, GFP_NOIO);
-
-	if (p) {
-		struct nfs_pgio_header *hdr = &p->header;
+	struct nfs_rw_header *p = mempool_alloc(nfs_wdata_mempool, GFP_NOIO);
 
+	if (p)
 		memset(p, 0, sizeof(*p));
-		INIT_LIST_HEAD(&hdr->pages);
-		INIT_LIST_HEAD(&hdr->rpc_list);
-		spin_lock_init(&hdr->lock);
-		atomic_set(&hdr->refcnt, 0);
-		hdr->verf = &p->verf;
-	}
 	return p;
 }
-EXPORT_SYMBOL_GPL(nfs_writehdr_alloc);
-
-static struct nfs_write_data *nfs_writedata_alloc(struct nfs_pgio_header *hdr,
-						  unsigned int pagecount)
-{
-	struct nfs_write_data *data, *prealloc;
-
-	prealloc = &container_of(hdr, struct nfs_write_header, header)->rpc_data;
-	if (prealloc->header == NULL)
-		data = prealloc;
-	else
-		data = kzalloc(sizeof(*data), GFP_KERNEL);
-	if (!data)
-		goto out;
-
-	if (nfs_pgarray_set(&data->pages, pagecount)) {
-		data->header = hdr;
-		atomic_inc(&hdr->refcnt);
-	} else {
-		if (data != prealloc)
-			kfree(data);
-		data = NULL;
-	}
-out:
-	return data;
-}
 
-void nfs_writehdr_free(struct nfs_pgio_header *hdr)
+static void nfs_writehdr_free(struct nfs_rw_header *whdr)
 {
-	struct nfs_write_header *whdr = container_of(hdr, struct nfs_write_header, header);
 	mempool_free(whdr, nfs_wdata_mempool);
 }
-EXPORT_SYMBOL_GPL(nfs_writehdr_free);
-
-void nfs_writedata_release(struct nfs_write_data *wdata)
-{
-	struct nfs_pgio_header *hdr = wdata->header;
-	struct nfs_write_header *write_header = container_of(hdr, struct nfs_write_header, header);
-
-	put_nfs_open_context(wdata->args.context);
-	if (wdata->pages.pagevec != wdata->pages.page_array)
-		kfree(wdata->pages.pagevec);
-	if (wdata == &write_header->rpc_data) {
-		wdata->header = NULL;
-		wdata = NULL;
-	}
-	if (atomic_dec_and_test(&hdr->refcnt))
-		hdr->completion_ops->completion(hdr);
-	/* Note: we only free the rpc_task after callbacks are done.
-	 * See the comment in rpc_free_task() for why
-	 */
-	kfree(wdata);
-}
-EXPORT_SYMBOL_GPL(nfs_writedata_release);
 
 static void nfs_context_set_write_error(struct nfs_open_context *ctx, int error)
 {
@@ -211,18 +154,78 @@ static void nfs_set_pageerror(struct page *page)
 	nfs_zap_mapping(page_file_mapping(page)->host, page_file_mapping(page));
 }
 
+/*
+ * nfs_page_group_search_locked
+ * @head - head request of page group
+ * @page_offset - offset into page
+ *
+ * Search page group with head @head to find a request that contains the
+ * page offset @page_offset.
+ *
+ * Returns a pointer to the first matching nfs request, or NULL if no
+ * match is found.
+ *
+ * Must be called with the page group lock held
+ */
+static struct nfs_page *
+nfs_page_group_search_locked(struct nfs_page *head, unsigned int page_offset)
+{
+	struct nfs_page *req;
+
+	WARN_ON_ONCE(head != head->wb_head);
+	WARN_ON_ONCE(!test_bit(PG_HEADLOCK, &head->wb_head->wb_flags));
+
+	req = head;
+	do {
+		if (page_offset >= req->wb_pgbase &&
+		    page_offset < (req->wb_pgbase + req->wb_bytes))
+			return req;
+
+		req = req->wb_this_page;
+	} while (req != head);
+
+	return NULL;
+}
+
+/*
+ * nfs_page_group_covers_page
+ * @head - head request of page group
+ *
+ * Return true if the page group with head @head covers the whole page,
+ * returns false otherwise
+ */
+static bool nfs_page_group_covers_page(struct nfs_page *req)
+{
+	struct nfs_page *tmp;
+	unsigned int pos = 0;
+	unsigned int len = nfs_page_length(req->wb_page);
+
+	nfs_page_group_lock(req);
+
+	do {
+		tmp = nfs_page_group_search_locked(req->wb_head, pos);
+		if (tmp) {
+			/* no way this should happen */
+			WARN_ON_ONCE(tmp->wb_pgbase != pos);
+			pos += tmp->wb_bytes - (pos - tmp->wb_pgbase);
+		}
+	} while (tmp && pos < len);
+
+	nfs_page_group_unlock(req);
+	WARN_ON_ONCE(pos > len);
+	return pos == len;
+}
+
 /* We can set the PG_uptodate flag if we see that a write request
  * covers the full page.
  */
-static void nfs_mark_uptodate(struct page *page, unsigned int base, unsigned int count)
+static void nfs_mark_uptodate(struct nfs_page *req)
 {
-	if (PageUptodate(page))
-		return;
-	if (base != 0)
+	if (PageUptodate(req->wb_page))
 		return;
-	if (count != nfs_page_length(page))
+	if (!nfs_page_group_covers_page(req))
 		return;
-	SetPageUptodate(page);
+	SetPageUptodate(req->wb_page);
 }
 
 static int wb_priority(struct writeback_control *wbc)
@@ -258,12 +261,15 @@ static void nfs_set_page_writeback(struct page *page)
 	}
 }
 
-static void nfs_end_page_writeback(struct page *page)
+static void nfs_end_page_writeback(struct nfs_page *req)
 {
-	struct inode *inode = page_file_mapping(page)->host;
+	struct inode *inode = page_file_mapping(req->wb_page)->host;
 	struct nfs_server *nfss = NFS_SERVER(inode);
 
-	end_page_writeback(page);
+	if (!nfs_page_group_sync_on_bit(req, PG_WB_END))
+		return;
+
+	end_page_writeback(req->wb_page);
 	if (atomic_long_dec_return(&nfss->writeback) < NFS_CONGESTION_OFF_THRESH)
 		clear_bdi_congested(&nfss->backing_dev_info, BLK_RW_ASYNC);
 }
@@ -354,10 +360,8 @@ static int nfs_writepage_locked(struct page *page, struct writeback_control *wbc
 	struct nfs_pageio_descriptor pgio;
 	int err;
 
-	NFS_PROTO(page_file_mapping(page)->host)->write_pageio_init(&pgio,
-							  page->mapping->host,
-							  wb_priority(wbc),
-							  &nfs_async_write_completion_ops);
+	nfs_pageio_init_write(&pgio, page->mapping->host, wb_priority(wbc),
+				false, &nfs_async_write_completion_ops);
 	err = nfs_do_writepage(page, wbc, &pgio);
 	nfs_pageio_complete(&pgio);
 	if (err < 0)
@@ -400,7 +404,8 @@ int nfs_writepages(struct address_space *mapping, struct writeback_control *wbc)
 
 	nfs_inc_stats(inode, NFSIOS_VFSWRITEPAGES);
 
-	NFS_PROTO(inode)->write_pageio_init(&pgio, inode, wb_priority(wbc), &nfs_async_write_completion_ops);
+	nfs_pageio_init_write(&pgio, inode, wb_priority(wbc), false,
+				&nfs_async_write_completion_ops);
 	err = write_cache_pages(mapping, wbc, nfs_writepages_callback, &pgio);
 	nfs_pageio_complete(&pgio);
 
@@ -425,6 +430,8 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
 {
 	struct nfs_inode *nfsi = NFS_I(inode);
 
+	WARN_ON_ONCE(req->wb_this_page != req);
+
 	/* Lock the request! */
 	nfs_lock_request(req);
 
@@ -441,6 +448,7 @@ static void nfs_inode_add_request(struct inode *inode, struct nfs_page *req)
 		set_page_private(req->wb_page, (unsigned long)req);
 	}
 	nfsi->npages++;
+	set_bit(PG_INODE_REF, &req->wb_flags);
 	kref_get(&req->wb_kref);
 	spin_unlock(&inode->i_lock);
 }
@@ -452,15 +460,20 @@ static void nfs_inode_remove_request(struct nfs_page *req)
 {
 	struct inode *inode = req->wb_context->dentry->d_inode;
 	struct nfs_inode *nfsi = NFS_I(inode);
+	struct nfs_page *head;
 
-	spin_lock(&inode->i_lock);
-	if (likely(!PageSwapCache(req->wb_page))) {
-		set_page_private(req->wb_page, 0);
-		ClearPagePrivate(req->wb_page);
-		clear_bit(PG_MAPPED, &req->wb_flags);
+	if (nfs_page_group_sync_on_bit(req, PG_REMOVE)) {
+		head = req->wb_head;
+
+		spin_lock(&inode->i_lock);
+		if (likely(!PageSwapCache(head->wb_page))) {
+			set_page_private(head->wb_page, 0);
+			ClearPagePrivate(head->wb_page);
+			clear_bit(PG_MAPPED, &head->wb_flags);
+		}
+		nfsi->npages--;
+		spin_unlock(&inode->i_lock);
 	}
-	nfsi->npages--;
-	spin_unlock(&inode->i_lock);
 	nfs_release_request(req);
 }
 
@@ -583,7 +596,7 @@ nfs_clear_request_commit(struct nfs_page *req)
 }
 
 static inline
-int nfs_write_need_commit(struct nfs_write_data *data)
+int nfs_write_need_commit(struct nfs_pgio_data *data)
 {
 	if (data->verf.committed == NFS_DATA_SYNC)
 		return data->header->lseg == NULL;
@@ -614,7 +627,7 @@ nfs_clear_request_commit(struct nfs_page *req)
 }
 
 static inline
-int nfs_write_need_commit(struct nfs_write_data *data)
+int nfs_write_need_commit(struct nfs_pgio_data *data)
 {
 	return 0;
 }
@@ -625,6 +638,7 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
 {
 	struct nfs_commit_info cinfo;
 	unsigned long bytes = 0;
+	bool do_destroy;
 
 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags))
 		goto out;
@@ -645,7 +659,7 @@ static void nfs_write_completion(struct nfs_pgio_header *hdr)
 			goto next;
 		}
 		if (test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags)) {
-			memcpy(&req->wb_verf, &hdr->verf->verifier, sizeof(req->wb_verf));
+			memcpy(&req->wb_verf, &hdr->verf.verifier, sizeof(req->wb_verf));
 			nfs_mark_request_commit(req, hdr->lseg, &cinfo);
 			goto next;
 		}
@@ -653,7 +667,8 @@ remove_req:
 		nfs_inode_remove_request(req);
 next:
 		nfs_unlock_request(req);
-		nfs_end_page_writeback(req->wb_page);
+		nfs_end_page_writeback(req);
+		do_destroy = !test_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags);
 		nfs_release_request(req);
 	}
 out:
@@ -661,7 +676,7 @@ out:
 }
 
 #if  IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
-static unsigned long
+unsigned long
 nfs_reqs_to_commit(struct nfs_commit_info *cinfo)
 {
 	return cinfo->mds->ncommit;
@@ -718,7 +733,7 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst,
 }
 
 #else
-static unsigned long nfs_reqs_to_commit(struct nfs_commit_info *cinfo)
+unsigned long nfs_reqs_to_commit(struct nfs_commit_info *cinfo)
 {
 	return 0;
 }
@@ -758,6 +773,10 @@ static struct nfs_page *nfs_try_to_update_request(struct inode *inode,
 		if (req == NULL)
 			goto out_unlock;
 
+		/* should be handled by nfs_flush_incompatible */
+		WARN_ON_ONCE(req->wb_head != req);
+		WARN_ON_ONCE(req->wb_this_page != req);
+
 		rqend = req->wb_offset + req->wb_bytes;
 		/*
 		 * Tell the caller to flush out the request if
@@ -819,7 +838,7 @@ static struct nfs_page * nfs_setup_write_request(struct nfs_open_context* ctx,
 	req = nfs_try_to_update_request(inode, page, offset, bytes);
 	if (req != NULL)
 		goto out;
-	req = nfs_create_request(ctx, inode, page, offset, bytes);
+	req = nfs_create_request(ctx, page, NULL, offset, bytes);
 	if (IS_ERR(req))
 		goto out;
 	nfs_inode_add_request(inode, req);
@@ -837,7 +856,7 @@ static int nfs_writepage_setup(struct nfs_open_context *ctx, struct page *page,
 		return PTR_ERR(req);
 	/* Update file length */
 	nfs_grow_file(page, offset, count);
-	nfs_mark_uptodate(page, req->wb_pgbase, req->wb_bytes);
+	nfs_mark_uptodate(req);
 	nfs_mark_request_dirty(req);
 	nfs_unlock_and_release_request(req);
 	return 0;
@@ -863,6 +882,8 @@ int nfs_flush_incompatible(struct file *file, struct page *page)
 			return 0;
 		l_ctx = req->wb_lock_context;
 		do_flush = req->wb_page != page || req->wb_context != ctx;
+		/* for now, flush if more than 1 request in page_group */
+		do_flush |= req->wb_this_page != req;
 		if (l_ctx && ctx->dentry->d_inode->i_flock != NULL) {
 			do_flush |= l_ctx->lockowner.l_owner != current->files
 				|| l_ctx->lockowner.l_pid != current->tgid;
@@ -990,126 +1011,17 @@ static int flush_task_priority(int how)
 	return RPC_PRIORITY_NORMAL;
 }
 
-int nfs_initiate_write(struct rpc_clnt *clnt,
-		       struct nfs_write_data *data,
-		       const struct rpc_call_ops *call_ops,
-		       int how, int flags)
+static void nfs_initiate_write(struct nfs_pgio_data *data, struct rpc_message *msg,
+			       struct rpc_task_setup *task_setup_data, int how)
 {
 	struct inode *inode = data->header->inode;
 	int priority = flush_task_priority(how);
-	struct rpc_task *task;
-	struct rpc_message msg = {
-		.rpc_argp = &data->args,
-		.rpc_resp = &data->res,
-		.rpc_cred = data->header->cred,
-	};
-	struct rpc_task_setup task_setup_data = {
-		.rpc_client = clnt,
-		.task = &data->task,
-		.rpc_message = &msg,
-		.callback_ops = call_ops,
-		.callback_data = data,
-		.workqueue = nfsiod_workqueue,
-		.flags = RPC_TASK_ASYNC | flags,
-		.priority = priority,
-	};
-	int ret = 0;
-
-	/* Set up the initial task struct.  */
-	NFS_PROTO(inode)->write_setup(data, &msg);
 
-	dprintk("NFS: %5u initiated write call "
-		"(req %s/%llu, %u bytes @ offset %llu)\n",
-		data->task.tk_pid,
-		inode->i_sb->s_id,
-		(unsigned long long)NFS_FILEID(inode),
-		data->args.count,
-		(unsigned long long)data->args.offset);
+	task_setup_data->priority = priority;
+	NFS_PROTO(inode)->write_setup(data, msg);
 
 	nfs4_state_protect_write(NFS_SERVER(inode)->nfs_client,
-				 &task_setup_data.rpc_client, &msg, data);
-
-	task = rpc_run_task(&task_setup_data);
-	if (IS_ERR(task)) {
-		ret = PTR_ERR(task);
-		goto out;
-	}
-	if (how & FLUSH_SYNC) {
-		ret = rpc_wait_for_completion_task(task);
-		if (ret == 0)
-			ret = task->tk_status;
-	}
-	rpc_put_task(task);
-out:
-	return ret;
-}
-EXPORT_SYMBOL_GPL(nfs_initiate_write);
-
-/*
- * Set up the argument/result storage required for the RPC call.
- */
-static void nfs_write_rpcsetup(struct nfs_write_data *data,
-		unsigned int count, unsigned int offset,
-		int how, struct nfs_commit_info *cinfo)
-{
-	struct nfs_page *req = data->header->req;
-
-	/* Set up the RPC argument and reply structs
-	 * NB: take care not to mess about with data->commit et al. */
-
-	data->args.fh     = NFS_FH(data->header->inode);
-	data->args.offset = req_offset(req) + offset;
-	/* pnfs_set_layoutcommit needs this */
-	data->mds_offset = data->args.offset;
-	data->args.pgbase = req->wb_pgbase + offset;
-	data->args.pages  = data->pages.pagevec;
-	data->args.count  = count;
-	data->args.context = get_nfs_open_context(req->wb_context);
-	data->args.lock_context = req->wb_lock_context;
-	data->args.stable  = NFS_UNSTABLE;
-	switch (how & (FLUSH_STABLE | FLUSH_COND_STABLE)) {
-	case 0:
-		break;
-	case FLUSH_COND_STABLE:
-		if (nfs_reqs_to_commit(cinfo))
-			break;
-	default:
-		data->args.stable = NFS_FILE_SYNC;
-	}
-
-	data->res.fattr   = &data->fattr;
-	data->res.count   = count;
-	data->res.verf    = &data->verf;
-	nfs_fattr_init(&data->fattr);
-}
-
-static int nfs_do_write(struct nfs_write_data *data,
-		const struct rpc_call_ops *call_ops,
-		int how)
-{
-	struct inode *inode = data->header->inode;
-
-	return nfs_initiate_write(NFS_CLIENT(inode), data, call_ops, how, 0);
-}
-
-static int nfs_do_multiple_writes(struct list_head *head,
-		const struct rpc_call_ops *call_ops,
-		int how)
-{
-	struct nfs_write_data *data;
-	int ret = 0;
-
-	while (!list_empty(head)) {
-		int ret2;
-
-		data = list_first_entry(head, struct nfs_write_data, list);
-		list_del_init(&data->list);
-		
-		ret2 = nfs_do_write(data, call_ops, how);
-		 if (ret == 0)
-			 ret = ret2;
-	}
-	return ret;
+				 &task_setup_data->rpc_client, msg, data);
 }
 
 /* If a nfs_flush_* function fails, it should remove reqs from @head and
@@ -1120,7 +1032,7 @@ static void nfs_redirty_request(struct nfs_page *req)
 {
 	nfs_mark_request_dirty(req);
 	nfs_unlock_request(req);
-	nfs_end_page_writeback(req->wb_page);
+	nfs_end_page_writeback(req);
 	nfs_release_request(req);
 }
 
@@ -1140,173 +1052,30 @@ static const struct nfs_pgio_completion_ops nfs_async_write_completion_ops = {
 	.completion = nfs_write_completion,
 };
 
-static void nfs_flush_error(struct nfs_pageio_descriptor *desc,
-		struct nfs_pgio_header *hdr)
-{
-	set_bit(NFS_IOHDR_REDO, &hdr->flags);
-	while (!list_empty(&hdr->rpc_list)) {
-		struct nfs_write_data *data = list_first_entry(&hdr->rpc_list,
-				struct nfs_write_data, list);
-		list_del(&data->list);
-		nfs_writedata_release(data);
-	}
-	desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-}
-
-/*
- * Generate multiple small requests to write out a single
- * contiguous dirty area on one page.
- */
-static int nfs_flush_multi(struct nfs_pageio_descriptor *desc,
-			   struct nfs_pgio_header *hdr)
-{
-	struct nfs_page *req = hdr->req;
-	struct page *page = req->wb_page;
-	struct nfs_write_data *data;
-	size_t wsize = desc->pg_bsize, nbytes;
-	unsigned int offset;
-	int requests = 0;
-	struct nfs_commit_info cinfo;
-
-	nfs_init_cinfo(&cinfo, desc->pg_inode, desc->pg_dreq);
-
-	if ((desc->pg_ioflags & FLUSH_COND_STABLE) &&
-	    (desc->pg_moreio || nfs_reqs_to_commit(&cinfo) ||
-	     desc->pg_count > wsize))
-		desc->pg_ioflags &= ~FLUSH_COND_STABLE;
-
-
-	offset = 0;
-	nbytes = desc->pg_count;
-	do {
-		size_t len = min(nbytes, wsize);
-
-		data = nfs_writedata_alloc(hdr, 1);
-		if (!data) {
-			nfs_flush_error(desc, hdr);
-			return -ENOMEM;
-		}
-		data->pages.pagevec[0] = page;
-		nfs_write_rpcsetup(data, len, offset, desc->pg_ioflags, &cinfo);
-		list_add(&data->list, &hdr->rpc_list);
-		requests++;
-		nbytes -= len;
-		offset += len;
-	} while (nbytes != 0);
-	nfs_list_remove_request(req);
-	nfs_list_add_request(req, &hdr->pages);
-	desc->pg_rpc_callops = &nfs_write_common_ops;
-	return 0;
-}
-
-/*
- * Create an RPC task for the given write request and kick it.
- * The page must have been locked by the caller.
- *
- * It may happen that the page we're passed is not marked dirty.
- * This is the case if nfs_updatepage detects a conflicting request
- * that has been written but not committed.
- */
-static int nfs_flush_one(struct nfs_pageio_descriptor *desc,
-			 struct nfs_pgio_header *hdr)
-{
-	struct nfs_page		*req;
-	struct page		**pages;
-	struct nfs_write_data	*data;
-	struct list_head *head = &desc->pg_list;
-	struct nfs_commit_info cinfo;
-
-	data = nfs_writedata_alloc(hdr, nfs_page_array_len(desc->pg_base,
-							   desc->pg_count));
-	if (!data) {
-		nfs_flush_error(desc, hdr);
-		return -ENOMEM;
-	}
-
-	nfs_init_cinfo(&cinfo, desc->pg_inode, desc->pg_dreq);
-	pages = data->pages.pagevec;
-	while (!list_empty(head)) {
-		req = nfs_list_entry(head->next);
-		nfs_list_remove_request(req);
-		nfs_list_add_request(req, &hdr->pages);
-		*pages++ = req->wb_page;
-	}
-
-	if ((desc->pg_ioflags & FLUSH_COND_STABLE) &&
-	    (desc->pg_moreio || nfs_reqs_to_commit(&cinfo)))
-		desc->pg_ioflags &= ~FLUSH_COND_STABLE;
-
-	/* Set up the argument struct */
-	nfs_write_rpcsetup(data, desc->pg_count, 0, desc->pg_ioflags, &cinfo);
-	list_add(&data->list, &hdr->rpc_list);
-	desc->pg_rpc_callops = &nfs_write_common_ops;
-	return 0;
-}
-
-int nfs_generic_flush(struct nfs_pageio_descriptor *desc,
-		      struct nfs_pgio_header *hdr)
-{
-	if (desc->pg_bsize < PAGE_CACHE_SIZE)
-		return nfs_flush_multi(desc, hdr);
-	return nfs_flush_one(desc, hdr);
-}
-EXPORT_SYMBOL_GPL(nfs_generic_flush);
-
-static int nfs_generic_pg_writepages(struct nfs_pageio_descriptor *desc)
-{
-	struct nfs_write_header *whdr;
-	struct nfs_pgio_header *hdr;
-	int ret;
-
-	whdr = nfs_writehdr_alloc();
-	if (!whdr) {
-		desc->pg_completion_ops->error_cleanup(&desc->pg_list);
-		return -ENOMEM;
-	}
-	hdr = &whdr->header;
-	nfs_pgheader_init(desc, hdr, nfs_writehdr_free);
-	atomic_inc(&hdr->refcnt);
-	ret = nfs_generic_flush(desc, hdr);
-	if (ret == 0)
-		ret = nfs_do_multiple_writes(&hdr->rpc_list,
-					     desc->pg_rpc_callops,
-					     desc->pg_ioflags);
-	if (atomic_dec_and_test(&hdr->refcnt))
-		hdr->completion_ops->completion(hdr);
-	return ret;
-}
-
-static const struct nfs_pageio_ops nfs_pageio_write_ops = {
-	.pg_test = nfs_generic_pg_test,
-	.pg_doio = nfs_generic_pg_writepages,
-};
-
 void nfs_pageio_init_write(struct nfs_pageio_descriptor *pgio,
-			       struct inode *inode, int ioflags,
+			       struct inode *inode, int ioflags, bool force_mds,
 			       const struct nfs_pgio_completion_ops *compl_ops)
 {
-	nfs_pageio_init(pgio, inode, &nfs_pageio_write_ops, compl_ops,
-				NFS_SERVER(inode)->wsize, ioflags);
+	struct nfs_server *server = NFS_SERVER(inode);
+	const struct nfs_pageio_ops *pg_ops = &nfs_pgio_rw_ops;
+
+#ifdef CONFIG_NFS_V4_1
+	if (server->pnfs_curr_ld && !force_mds)
+		pg_ops = server->pnfs_curr_ld->pg_write_ops;
+#endif
+	nfs_pageio_init(pgio, inode, pg_ops, compl_ops, &nfs_rw_write_ops,
+			server->wsize, ioflags);
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_init_write);
 
 void nfs_pageio_reset_write_mds(struct nfs_pageio_descriptor *pgio)
 {
-	pgio->pg_ops = &nfs_pageio_write_ops;
+	pgio->pg_ops = &nfs_pgio_rw_ops;
 	pgio->pg_bsize = NFS_SERVER(pgio->pg_inode)->wsize;
 }
 EXPORT_SYMBOL_GPL(nfs_pageio_reset_write_mds);
 
 
-void nfs_write_prepare(struct rpc_task *task, void *calldata)
-{
-	struct nfs_write_data *data = calldata;
-	int err;
-	err = NFS_PROTO(data->header->inode)->write_rpc_prepare(task, data);
-	if (err)
-		rpc_exit(task, err);
-}
-
 void nfs_commit_prepare(struct rpc_task *task, void *calldata)
 {
 	struct nfs_commit_data *data = calldata;
@@ -1314,23 +1083,8 @@ void nfs_commit_prepare(struct rpc_task *task, void *calldata)
 	NFS_PROTO(data->inode)->commit_rpc_prepare(task, data);
 }
 
-/*
- * Handle a write reply that flushes a whole page.
- *
- * FIXME: There is an inherent race with invalidate_inode_pages and
- *	  writebacks since the page->count is kept > 1 for as long
- *	  as the page has a write request pending.
- */
-static void nfs_writeback_done_common(struct rpc_task *task, void *calldata)
-{
-	struct nfs_write_data	*data = calldata;
-
-	nfs_writeback_done(task, data);
-}
-
-static void nfs_writeback_release_common(void *calldata)
+static void nfs_writeback_release_common(struct nfs_pgio_data *data)
 {
-	struct nfs_write_data	*data = calldata;
 	struct nfs_pgio_header *hdr = data->header;
 	int status = data->task.tk_status;
 
@@ -1339,34 +1093,46 @@ static void nfs_writeback_release_common(void *calldata)
 		if (test_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags))
 			; /* Do nothing */
 		else if (!test_and_set_bit(NFS_IOHDR_NEED_COMMIT, &hdr->flags))
-			memcpy(hdr->verf, &data->verf, sizeof(*hdr->verf));
-		else if (memcmp(hdr->verf, &data->verf, sizeof(*hdr->verf)))
+			memcpy(&hdr->verf, &data->verf, sizeof(hdr->verf));
+		else if (memcmp(&hdr->verf, &data->verf, sizeof(hdr->verf)))
 			set_bit(NFS_IOHDR_NEED_RESCHED, &hdr->flags);
 		spin_unlock(&hdr->lock);
 	}
-	nfs_writedata_release(data);
 }
 
-static const struct rpc_call_ops nfs_write_common_ops = {
-	.rpc_call_prepare = nfs_write_prepare,
-	.rpc_call_done = nfs_writeback_done_common,
-	.rpc_release = nfs_writeback_release_common,
-};
+/*
+ * Special version of should_remove_suid() that ignores capabilities.
+ */
+static int nfs_should_remove_suid(const struct inode *inode)
+{
+	umode_t mode = inode->i_mode;
+	int kill = 0;
+
+	/* suid always must be killed */
+	if (unlikely(mode & S_ISUID))
+		kill = ATTR_KILL_SUID;
 
+	/*
+	 * sgid without any exec bits is just a mandatory locking mark; leave
+	 * it alone.  If some exec bits are set, it's a real sgid; kill it.
+	 */
+	if (unlikely((mode & S_ISGID) && (mode & S_IXGRP)))
+		kill |= ATTR_KILL_SGID;
+
+	if (unlikely(kill && S_ISREG(mode)))
+		return kill;
+
+	return 0;
+}
 
 /*
  * This function is called when the WRITE call is complete.
  */
-void nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
+static int nfs_writeback_done(struct rpc_task *task, struct nfs_pgio_data *data,
+			      struct inode *inode)
 {
-	struct nfs_writeargs	*argp = &data->args;
-	struct nfs_writeres	*resp = &data->res;
-	struct inode		*inode = data->header->inode;
 	int status;
 
-	dprintk("NFS: %5u nfs_writeback_done (status %d)\n",
-		task->tk_pid, task->tk_status);
-
 	/*
 	 * ->write_done will attempt to use post-op attributes to detect
 	 * conflicting writes by other clients.  A strict interpretation
@@ -1376,11 +1142,11 @@ void nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
 	 */
 	status = NFS_PROTO(inode)->write_done(task, data);
 	if (status != 0)
-		return;
-	nfs_add_stats(inode, NFSIOS_SERVERWRITTENBYTES, resp->count);
+		return status;
+	nfs_add_stats(inode, NFSIOS_SERVERWRITTENBYTES, data->res.count);
 
 #if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
-	if (resp->verf->committed < argp->stable && task->tk_status >= 0) {
+	if (data->res.verf->committed < data->args.stable && task->tk_status >= 0) {
 		/* We tried a write call, but the server did not
 		 * commit data to stable storage even though we
 		 * requested it.
@@ -1396,18 +1162,31 @@ void nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
 			dprintk("NFS:       faulty NFS server %s:"
 				" (committed = %d) != (stable = %d)\n",
 				NFS_SERVER(inode)->nfs_client->cl_hostname,
-				resp->verf->committed, argp->stable);
+				data->res.verf->committed, data->args.stable);
 			complain = jiffies + 300 * HZ;
 		}
 	}
 #endif
-	if (task->tk_status < 0)
-		nfs_set_pgio_error(data->header, task->tk_status, argp->offset);
-	else if (resp->count < argp->count) {
+
+	/* Deal with the suid/sgid bit corner case */
+	if (nfs_should_remove_suid(inode))
+		nfs_mark_for_revalidate(inode);
+	return 0;
+}
+
+/*
+ * This function is called when the WRITE call is complete.
+ */
+static void nfs_writeback_result(struct rpc_task *task, struct nfs_pgio_data *data)
+{
+	struct nfs_pgio_args	*argp = &data->args;
+	struct nfs_pgio_res	*resp = &data->res;
+
+	if (resp->count < argp->count) {
 		static unsigned long    complain;
 
 		/* This a short write! */
-		nfs_inc_stats(inode, NFSIOS_SHORTWRITE);
+		nfs_inc_stats(data->header->inode, NFSIOS_SHORTWRITE);
 
 		/* Has the server at least made some progress? */
 		if (resp->count == 0) {
@@ -1874,7 +1653,7 @@ int nfs_migrate_page(struct address_space *mapping, struct page *newpage,
 int __init nfs_init_writepagecache(void)
 {
 	nfs_wdata_cachep = kmem_cache_create("nfs_write_data",
-					     sizeof(struct nfs_write_header),
+					     sizeof(struct nfs_rw_header),
 					     0, SLAB_HWCACHE_ALIGN,
 					     NULL);
 	if (nfs_wdata_cachep == NULL)
@@ -1936,3 +1715,12 @@ void nfs_destroy_writepagecache(void)
 	kmem_cache_destroy(nfs_wdata_cachep);
 }
 
+static const struct nfs_rw_ops nfs_rw_write_ops = {
+	.rw_mode		= FMODE_WRITE,
+	.rw_alloc_header	= nfs_writehdr_alloc,
+	.rw_free_header		= nfs_writehdr_free,
+	.rw_release		= nfs_writeback_release_common,
+	.rw_done		= nfs_writeback_done,
+	.rw_result		= nfs_writeback_result,
+	.rw_initiate		= nfs_initiate_write,
+};