summary refs log tree commit diff
path: root/net/sunrpc
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 13:29:23 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2017-05-10 13:29:23 -0700
commitc70422f760c120480fee4de6c38804c72aa26bc1 (patch)
tree9c61102379bbbf090c13c373ffdace76fe7711ef /net/sunrpc
parent73ccb023a2f25b72c4b95499ca24760588014614 (diff)
parentb26b78cb726007533d81fdf90a62e915002ef5c8 (diff)
downloadlinux-c70422f760c120480fee4de6c38804c72aa26bc1.tar.gz
Merge tag 'nfsd-4.12' of git://linux-nfs.org/~bfields/linux
Pull nfsd updates from Bruce Fields:
 "Another RDMA update from Chuck Lever, and a bunch of miscellaneous
  bugfixes"

* tag 'nfsd-4.12' of git://linux-nfs.org/~bfields/linux: (26 commits)
  nfsd: Fix up the "supattr_exclcreat" attributes
  nfsd: encoders mustn't use unitialized values in error cases
  nfsd: fix undefined behavior in nfsd4_layout_verify
  lockd: fix lockd shutdown race
  NFSv4: Fix callback server shutdown
  SUNRPC: Refactor svc_set_num_threads()
  NFSv4.x/callback: Create the callback service through svc_create_pooled
  lockd: remove redundant check on block
  svcrdma: Clean out old XDR encoders
  svcrdma: Remove the req_map cache
  svcrdma: Remove unused RDMA Write completion handler
  svcrdma: Reduce size of sge array in struct svc_rdma_op_ctxt
  svcrdma: Clean up RPC-over-RDMA backchannel reply processing
  svcrdma: Report Write/Reply chunk overruns
  svcrdma: Clean up RDMA_ERROR path
  svcrdma: Use rdma_rw API in RPC reply path
  svcrdma: Introduce local rdma_rw API helpers
  svcrdma: Clean up svc_rdma_get_inv_rkey()
  svcrdma: Add helper to save pages under I/O
  svcrdma: Eliminate RPCRDMA_SQ_DEPTH_MULT
  ...
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/Kconfig1
-rw-r--r--net/sunrpc/svc.c134
-rw-r--r--net/sunrpc/xprtrdma/Makefile2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c8
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_backchannel.c71
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c89
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c79
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c512
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c978
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c110
10 files changed, 1197 insertions, 787 deletions
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig
index 04ce2c0b660e..ac09ca803296 100644
--- a/net/sunrpc/Kconfig
+++ b/net/sunrpc/Kconfig
@@ -52,6 +52,7 @@ config SUNRPC_XPRT_RDMA
 	tristate "RPC-over-RDMA transport"
 	depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS
 	default SUNRPC && INFINIBAND
+	select SG_POOL
 	help
 	  This option allows the NFS client and server to use RDMA
 	  transports (InfiniBand, iWARP, or RoCE).
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a08aeb56b8e4..bc0f5a0ecbdc 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -702,59 +702,32 @@ found_pool:
 	return task;
 }
 
-/*
- * Create or destroy enough new threads to make the number
- * of threads the given number.  If `pool' is non-NULL, applies
- * only to threads in that pool, otherwise round-robins between
- * all pools.  Caller must ensure that mutual exclusion between this and
- * server startup or shutdown.
- *
- * Destroying threads relies on the service threads filling in
- * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
- * has been created using svc_create_pooled().
- *
- * Based on code that used to be in nfsd_svc() but tweaked
- * to be pool-aware.
- */
-int
-svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+/* create new threads */
+static int
+svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 {
 	struct svc_rqst	*rqstp;
 	struct task_struct *task;
 	struct svc_pool *chosen_pool;
-	int error = 0;
 	unsigned int state = serv->sv_nrthreads-1;
 	int node;
 
-	if (pool == NULL) {
-		/* The -1 assumes caller has done a svc_get() */
-		nrservs -= (serv->sv_nrthreads-1);
-	} else {
-		spin_lock_bh(&pool->sp_lock);
-		nrservs -= pool->sp_nrthreads;
-		spin_unlock_bh(&pool->sp_lock);
-	}
-
-	/* create new threads */
-	while (nrservs > 0) {
+	do {
 		nrservs--;
 		chosen_pool = choose_pool(serv, pool, &state);
 
 		node = svc_pool_map_get_node(chosen_pool->sp_id);
 		rqstp = svc_prepare_thread(serv, chosen_pool, node);
-		if (IS_ERR(rqstp)) {
-			error = PTR_ERR(rqstp);
-			break;
-		}
+		if (IS_ERR(rqstp))
+			return PTR_ERR(rqstp);
 
 		__module_get(serv->sv_ops->svo_module);
 		task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
 					      node, "%s", serv->sv_name);
 		if (IS_ERR(task)) {
-			error = PTR_ERR(task);
 			module_put(serv->sv_ops->svo_module);
 			svc_exit_thread(rqstp);
-			break;
+			return PTR_ERR(task);
 		}
 
 		rqstp->rq_task = task;
@@ -763,18 +736,103 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 
 		svc_sock_update_bufs(serv);
 		wake_up_process(task);
-	}
+	} while (nrservs > 0);
+
+	return 0;
+}
+
+
+/* destroy old threads */
+static int
+svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	struct task_struct *task;
+	unsigned int state = serv->sv_nrthreads-1;
+
 	/* destroy old threads */
-	while (nrservs < 0 &&
-	       (task = choose_victim(serv, pool, &state)) != NULL) {
+	do {
+		task = choose_victim(serv, pool, &state);
+		if (task == NULL)
+			break;
 		send_sig(SIGINT, task, 1);
 		nrservs++;
+	} while (nrservs < 0);
+
+	return 0;
+}
+
+/*
+ * Create or destroy enough new threads to make the number
+ * of threads the given number.  If `pool' is non-NULL, applies
+ * only to threads in that pool, otherwise round-robins between
+ * all pools.  Caller must ensure that mutual exclusion between this and
+ * server startup or shutdown.
+ *
+ * Destroying threads relies on the service threads filling in
+ * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
+ * has been created using svc_create_pooled().
+ *
+ * Based on code that used to be in nfsd_svc() but tweaked
+ * to be pool-aware.
+ */
+int
+svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	if (pool == NULL) {
+		/* The -1 assumes caller has done a svc_get() */
+		nrservs -= (serv->sv_nrthreads-1);
+	} else {
+		spin_lock_bh(&pool->sp_lock);
+		nrservs -= pool->sp_nrthreads;
+		spin_unlock_bh(&pool->sp_lock);
 	}
 
-	return error;
+	if (nrservs > 0)
+		return svc_start_kthreads(serv, pool, nrservs);
+	if (nrservs < 0)
+		return svc_signal_kthreads(serv, pool, nrservs);
+	return 0;
 }
 EXPORT_SYMBOL_GPL(svc_set_num_threads);
 
+/* destroy old threads */
+static int
+svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	struct task_struct *task;
+	unsigned int state = serv->sv_nrthreads-1;
+
+	/* destroy old threads */
+	do {
+		task = choose_victim(serv, pool, &state);
+		if (task == NULL)
+			break;
+		kthread_stop(task);
+		nrservs++;
+	} while (nrservs < 0);
+	return 0;
+}
+
+int
+svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
+{
+	if (pool == NULL) {
+		/* The -1 assumes caller has done a svc_get() */
+		nrservs -= (serv->sv_nrthreads-1);
+	} else {
+		spin_lock_bh(&pool->sp_lock);
+		nrservs -= pool->sp_nrthreads;
+		spin_unlock_bh(&pool->sp_lock);
+	}
+
+	if (nrservs > 0)
+		return svc_start_kthreads(serv, pool, nrservs);
+	if (nrservs < 0)
+		return svc_stop_kthreads(serv, pool, nrservs);
+	return 0;
+}
+EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
+
 /*
  * Called from a server thread as it's exiting. Caller must hold the "service
  * mutex" for the service.
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index ef19fa42c50f..c1ae8142ab73 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
 	fmr_ops.o frwr_ops.o \
 	svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
-	module.o
+	svc_rdma_rw.o module.o
 rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index c846ca9f1eba..a4a8f6989ee7 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -58,9 +58,9 @@ unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
 unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS;
 static unsigned int min_max_requests = 4;
 static unsigned int max_max_requests = 16384;
-unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
-static unsigned int min_max_inline = 4096;
-static unsigned int max_max_inline = 65536;
+unsigned int svcrdma_max_req_size = RPCRDMA_DEF_INLINE_THRESH;
+static unsigned int min_max_inline = RPCRDMA_DEF_INLINE_THRESH;
+static unsigned int max_max_inline = RPCRDMA_MAX_INLINE_THRESH;
 
 atomic_t rdma_stat_recv;
 atomic_t rdma_stat_read;
@@ -247,8 +247,6 @@ int svc_rdma_init(void)
 	dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
 	dprintk("\tsvcrdma_ord      : %d\n", svcrdma_ord);
 	dprintk("\tmax_requests     : %u\n", svcrdma_max_requests);
-	dprintk("\tsq_depth         : %u\n",
-		svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
 	dprintk("\tmax_bc_requests  : %u\n", svcrdma_max_bc_requests);
 	dprintk("\tmax_inline       : %d\n", svcrdma_max_req_size);
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index ff1df40f0d26..c676ed0efb5a 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -12,7 +12,17 @@
 
 #undef SVCRDMA_BACKCHANNEL_DEBUG
 
-int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
+/**
+ * svc_rdma_handle_bc_reply - Process incoming backchannel reply
+ * @xprt: controlling backchannel transport
+ * @rdma_resp: pointer to incoming transport header
+ * @rcvbuf: XDR buffer into which to decode the reply
+ *
+ * Returns:
+ *	%0 if @rcvbuf is filled in, xprt_complete_rqst called,
+ *	%-EAGAIN if server should call ->recvfrom again.
+ */
+int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
 			     struct xdr_buf *rcvbuf)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
@@ -27,13 +37,13 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
 
 	p = (__be32 *)src->iov_base;
 	len = src->iov_len;
-	xid = rmsgp->rm_xid;
+	xid = *rdma_resp;
 
 #ifdef SVCRDMA_BACKCHANNEL_DEBUG
 	pr_info("%s: xid=%08x, length=%zu\n",
 		__func__, be32_to_cpu(xid), len);
 	pr_info("%s: RPC/RDMA: %*ph\n",
-		__func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp);
+		__func__, (int)RPCRDMA_HDRLEN_MIN, rdma_resp);
 	pr_info("%s:      RPC: %*ph\n",
 		__func__, (int)len, p);
 #endif
@@ -53,7 +63,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp,
 		goto out_unlock;
 	memcpy(dst->iov_base, p, len);
 
-	credits = be32_to_cpu(rmsgp->rm_credit);
+	credits = be32_to_cpup(rdma_resp + 2);
 	if (credits == 0)
 		credits = 1;	/* don't deadlock */
 	else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
@@ -90,9 +100,9 @@ out_notfound:
  * Caller holds the connection's mutex and has already marshaled
  * the RPC/RDMA request.
  *
- * This is similar to svc_rdma_reply, but takes an rpc_rqst
- * instead, does not support chunks, and avoids blocking memory
- * allocation.
+ * This is similar to svc_rdma_send_reply_msg, but takes a struct
+ * rpc_rqst instead, does not support chunks, and avoids blocking
+ * memory allocation.
  *
  * XXX: There is still an opportunity to block in svc_rdma_send()
  * if there are no SQ entries to post the Send. This may occur if
@@ -101,59 +111,36 @@ out_notfound:
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 			      struct rpc_rqst *rqst)
 {
-	struct xdr_buf *sndbuf = &rqst->rq_snd_buf;
 	struct svc_rdma_op_ctxt *ctxt;
-	struct svc_rdma_req_map *vec;
-	struct ib_send_wr send_wr;
 	int ret;
 
-	vec = svc_rdma_get_req_map(rdma);
-	ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false);
-	if (ret)
+	ctxt = svc_rdma_get_context(rdma);
+
+	/* rpcrdma_bc_send_request builds the transport header and
+	 * the backchannel RPC message in the same buffer. Thus only
+	 * one SGE is needed to send both.
+	 */
+	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer,
+				     rqst->rq_snd_buf.len);
+	if (ret < 0)
 		goto out_err;
 
 	ret = svc_rdma_repost_recv(rdma, GFP_NOIO);
 	if (ret)
 		goto out_err;
 
-	ctxt = svc_rdma_get_context(rdma);
-	ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
-	ctxt->count = 1;
-
-	ctxt->direction = DMA_TO_DEVICE;
-	ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
-	ctxt->sge[0].length = sndbuf->len;
-	ctxt->sge[0].addr =
-	    ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0,
-			    sndbuf->len, DMA_TO_DEVICE);
-	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) {
-		ret = -EIO;
-		goto out_unmap;
-	}
-	svc_rdma_count_mappings(rdma, ctxt);
-
-	memset(&send_wr, 0, sizeof(send_wr));
-	ctxt->cqe.done = svc_rdma_wc_send;
-	send_wr.wr_cqe = &ctxt->cqe;
-	send_wr.sg_list = ctxt->sge;
-	send_wr.num_sge = 1;
-	send_wr.opcode = IB_WR_SEND;
-	send_wr.send_flags = IB_SEND_SIGNALED;
-
-	ret = svc_rdma_send(rdma, &send_wr);
-	if (ret) {
-		ret = -EIO;
+	ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0);
+	if (ret)
 		goto out_unmap;
-	}
 
 out_err:
-	svc_rdma_put_req_map(rdma, vec);
 	dprintk("svcrdma: %s returns %d\n", __func__, ret);
 	return ret;
 
 out_unmap:
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_put_context(ctxt, 1);
+	ret = -EIO;
 	goto out_err;
 }
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index 1c4aabf0f657..bdcf7d85a3dc 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -166,92 +166,3 @@ out_inval:
 	dprintk("svcrdma: failed to parse transport header\n");
 	return -EINVAL;
 }
-
-int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
-			      struct rpcrdma_msg *rmsgp,
-			      enum rpcrdma_errcode err, __be32 *va)
-{
-	__be32 *startp = va;
-
-	*va++ = rmsgp->rm_xid;
-	*va++ = rmsgp->rm_vers;
-	*va++ = xprt->sc_fc_credits;
-	*va++ = rdma_error;
-	*va++ = cpu_to_be32(err);
-	if (err == ERR_VERS) {
-		*va++ = rpcrdma_version;
-		*va++ = rpcrdma_version;
-	}
-
-	return (int)((unsigned long)va - (unsigned long)startp);
-}
-
-/**
- * svc_rdma_xdr_get_reply_hdr_length - Get length of Reply transport header
- * @rdma_resp: buffer containing Reply transport header
- *
- * Returns length of transport header, in bytes.
- */
-unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp)
-{
-	unsigned int nsegs;
-	__be32 *p;
-
-	p = rdma_resp;
-
-	/* RPC-over-RDMA V1 replies never have a Read list. */
-	p += rpcrdma_fixed_maxsz + 1;
-
-	/* Skip Write list. */
-	while (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
-	}
-
-	/* Skip Reply chunk. */
-	if (*p++ != xdr_zero) {
-		nsegs = be32_to_cpup(p++);
-		p += nsegs * rpcrdma_segment_maxsz;
-	}
-
-	return (unsigned long)p - (unsigned long)rdma_resp;
-}
-
-void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
-{
-	struct rpcrdma_write_array *ary;
-
-	/* no read-list */
-	rmsgp->rm_body.rm_chunks[0] = xdr_zero;
-
-	/* write-array discrim */
-	ary = (struct rpcrdma_write_array *)
-		&rmsgp->rm_body.rm_chunks[1];
-	ary->wc_discrim = xdr_one;
-	ary->wc_nchunks = cpu_to_be32(chunks);
-
-	/* write-list terminator */
-	ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
-
-	/* reply-array discriminator */
-	ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
-}
-
-void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
-				 int chunks)
-{
-	ary->wc_discrim = xdr_one;
-	ary->wc_nchunks = cpu_to_be32(chunks);
-}
-
-void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
-				     int chunk_no,
-				     __be32 rs_handle,
-				     __be64 rs_offset,
-				     u32 write_len)
-{
-	struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
-	seg->rs_handle = rs_handle;
-	seg->rs_offset = rs_offset;
-	seg->rs_length = cpu_to_be32(write_len);
-}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index f7b2daf72a86..27a99bf5b1a6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -558,33 +558,85 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
 	rqstp->rq_arg.buflen = head->arg.buflen;
 }
 
+static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
+				__be32 *rdma_argp, int status)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+	__be32 *p, *err_msgp;
+	unsigned int length;
+	struct page *page;
+	int ret;
+
+	ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
+	if (ret)
+		return;
+
+	page = alloc_page(GFP_KERNEL);
+	if (!page)
+		return;
+	err_msgp = page_address(page);
+
+	p = err_msgp;
+	*p++ = *rdma_argp;
+	*p++ = *(rdma_argp + 1);
+	*p++ = xprt->sc_fc_credits;
+	*p++ = rdma_error;
+	if (status == -EPROTONOSUPPORT) {
+		*p++ = err_vers;
+		*p++ = rpcrdma_version;
+		*p++ = rpcrdma_version;
+	} else {
+		*p++ = err_chunk;
+	}
+	length = (unsigned long)p - (unsigned long)err_msgp;
+
+	/* Map transport header; no RPC message payload */
+	ctxt = svc_rdma_get_context(xprt);
+	ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
+	if (ret) {
+		dprintk("svcrdma: Error %d mapping send for protocol error\n",
+			ret);
+		return;
+	}
+
+	ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0);
+	if (ret) {
+		dprintk("svcrdma: Error %d posting send for protocol error\n",
+			ret);
+		svc_rdma_unmap_dma(ctxt);
+		svc_rdma_put_context(ctxt, 1);
+	}
+}
+
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
  * the RPC/RDMA header small and fixed in size, so it is
  * straightforward to check the RPC header's direction field.
  */
-static bool
-svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp)
+static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt,
+					  __be32 *rdma_resp)
 {
-	__be32 *p = (__be32 *)rmsgp;
+	__be32 *p;
 
 	if (!xprt->xpt_bc_xprt)
 		return false;
 
-	if (rmsgp->rm_type != rdma_msg)
+	p = rdma_resp + 3;
+	if (*p++ != rdma_msg)
 		return false;
-	if (rmsgp->rm_body.rm_chunks[0] != xdr_zero)
+
+	if (*p++ != xdr_zero)
 		return false;
-	if (rmsgp->rm_body.rm_chunks[1] != xdr_zero)
+	if (*p++ != xdr_zero)
 		return false;
-	if (rmsgp->rm_body.rm_chunks[2] != xdr_zero)
+	if (*p++ != xdr_zero)
 		return false;
 
-	/* sanity */
-	if (p[7] != rmsgp->rm_xid)
+	/* XID sanity */
+	if (*p++ != *rdma_resp)
 		return false;
 	/* call direction */
-	if (p[8] == cpu_to_be32(RPC_CALL))
+	if (*p == cpu_to_be32(RPC_CALL))
 		return false;
 
 	return true;
@@ -650,8 +702,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 		goto out_drop;
 	rqstp->rq_xprt_hlen = ret;
 
-	if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) {
-		ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp,
+	if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) {
+		ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt,
+					       &rmsgp->rm_xid,
 					       &rqstp->rq_arg);
 		svc_rdma_put_context(ctxt, 0);
 		if (ret)
@@ -686,7 +739,7 @@ complete:
 	return ret;
 
 out_err:
-	svc_rdma_send_error(rdma_xprt, rmsgp, ret);
+	svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret);
 	svc_rdma_put_context(ctxt, 0);
 	return 0;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
new file mode 100644
index 000000000000..0cf620277693
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -0,0 +1,512 @@
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
+ */
+
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include <linux/sunrpc/debug.h>
+
+#include <rdma/rw.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/* Each R/W context contains state for one chain of RDMA Read or
+ * Write Work Requests.
+ *
+ * Each WR chain handles a single contiguous server-side buffer,
+ * because scatterlist entries after the first have to start on
+ * page alignment. xdr_buf iovecs cannot guarantee alignment.
+ *
+ * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
+ * from a client may contain a unique R_key, so each WR chain moves
+ * up to one segment at a time.
+ *
+ * The scatterlist makes this data structure over 4KB in size. To
+ * make it less likely to fail, and to handle the allocation for
+ * smaller I/O requests without disabling bottom-halves, these
+ * contexts are created on demand, but cached and reused until the
+ * controlling svcxprt_rdma is destroyed.
+ */
+struct svc_rdma_rw_ctxt {
+	struct list_head	rw_list;
+	struct rdma_rw_ctx	rw_ctx;
+	int			rw_nents;
+	struct sg_table		rw_sg_table;
+	struct scatterlist	rw_first_sgl[0];
+};
+
+static inline struct svc_rdma_rw_ctxt *
+svc_rdma_next_ctxt(struct list_head *list)
+{
+	return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
+					rw_list);
+}
+
+static struct svc_rdma_rw_ctxt *
+svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
+{
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	spin_lock(&rdma->sc_rw_ctxt_lock);
+
+	ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
+	if (ctxt) {
+		list_del(&ctxt->rw_list);
+		spin_unlock(&rdma->sc_rw_ctxt_lock);
+	} else {
+		spin_unlock(&rdma->sc_rw_ctxt_lock);
+		ctxt = kmalloc(sizeof(*ctxt) +
+			       SG_CHUNK_SIZE * sizeof(struct scatterlist),
+			       GFP_KERNEL);
+		if (!ctxt)
+			goto out;
+		INIT_LIST_HEAD(&ctxt->rw_list);
+	}
+
+	ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
+	if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
+				   ctxt->rw_sg_table.sgl)) {
+		kfree(ctxt);
+		ctxt = NULL;
+	}
+out:
+	return ctxt;
+}
+
+static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
+				 struct svc_rdma_rw_ctxt *ctxt)
+{
+	sg_free_table_chained(&ctxt->rw_sg_table, true);
+
+	spin_lock(&rdma->sc_rw_ctxt_lock);
+	list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
+	spin_unlock(&rdma->sc_rw_ctxt_lock);
+}
+
+/**
+ * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
+ * @rdma: transport about to be destroyed
+ *
+ */
+void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
+		list_del(&ctxt->rw_list);
+		kfree(ctxt);
+	}
+}
+
+/* A chunk context tracks all I/O for moving one Read or Write
+ * chunk. This is a a set of rdma_rw's that handle data movement
+ * for all segments of one chunk.
+ *
+ * These are small, acquired with a single allocator call, and
+ * no more than one is needed per chunk. They are allocated on
+ * demand, and not cached.
+ */
+struct svc_rdma_chunk_ctxt {
+	struct ib_cqe		cc_cqe;
+	struct svcxprt_rdma	*cc_rdma;
+	struct list_head	cc_rwctxts;
+	int			cc_sqecount;
+	enum dma_data_direction cc_dir;
+};
+
+static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
+			     struct svc_rdma_chunk_ctxt *cc,
+			     enum dma_data_direction dir)
+{
+	cc->cc_rdma = rdma;
+	svc_xprt_get(&rdma->sc_xprt);
+
+	INIT_LIST_HEAD(&cc->cc_rwctxts);
+	cc->cc_sqecount = 0;
+	cc->cc_dir = dir;
+}
+
+static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
+{
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
+		list_del(&ctxt->rw_list);
+
+		rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
+				    rdma->sc_port_num, ctxt->rw_sg_table.sgl,
+				    ctxt->rw_nents, cc->cc_dir);
+		svc_rdma_put_rw_ctxt(rdma, ctxt);
+	}
+	svc_xprt_put(&rdma->sc_xprt);
+}
+
+/* State for sending a Write or Reply chunk.
+ *  - Tracks progress of writing one chunk over all its segments
+ *  - Stores arguments for the SGL constructor functions
+ */
+struct svc_rdma_write_info {
+	/* write state of this chunk */
+	unsigned int		wi_seg_off;
+	unsigned int		wi_seg_no;
+	unsigned int		wi_nsegs;
+	__be32			*wi_segs;
+
+	/* SGL constructor arguments */
+	struct xdr_buf		*wi_xdr;
+	unsigned char		*wi_base;
+	unsigned int		wi_next_off;
+
+	struct svc_rdma_chunk_ctxt	wi_cc;
+};
+
+static struct svc_rdma_write_info *
+svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
+{
+	struct svc_rdma_write_info *info;
+
+	info = kmalloc(sizeof(*info), GFP_KERNEL);
+	if (!info)
+		return info;
+
+	info->wi_seg_off = 0;
+	info->wi_seg_no = 0;
+	info->wi_nsegs = be32_to_cpup(++chunk);
+	info->wi_segs = ++chunk;
+	svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
+	return info;
+}
+
+static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
+{
+	svc_rdma_cc_release(&info->wi_cc);
+	kfree(info);
+}
+
+/**
+ * svc_rdma_write_done - Write chunk completion
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ * Pages under I/O are freed by a subsequent Send completion.
+ */
+static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_chunk_ctxt *cc =
+			container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_rdma_write_info *info =
+			container_of(cc, struct svc_rdma_write_info, wi_cc);
+
+	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+
+	if (unlikely(wc->status != IB_WC_SUCCESS)) {
+		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+		if (wc->status != IB_WC_WR_FLUSH_ERR)
+			pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
+			       ib_wc_status_msg(wc->status),
+			       wc->status, wc->vendor_err);
+	}
+
+	svc_rdma_write_info_free(info);
+}
+
+/* This function sleeps when the transport's Send Queue is congested.
+ *
+ * Assumptions:
+ * - If ib_post_send() succeeds, only one completion is expected,
+ *   even if one or more WRs are flushed. This is true when posting
+ *   an rdma_rw_ctx or when posting a single signaled WR.
+ */
+static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
+{
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_xprt *xprt = &rdma->sc_xprt;
+	struct ib_send_wr *first_wr, *bad_wr;
+	struct list_head *tmp;
+	struct ib_cqe *cqe;
+	int ret;
+
+	first_wr = NULL;
+	cqe = &cc->cc_cqe;
+	list_for_each(tmp, &cc->cc_rwctxts) {
+		struct svc_rdma_rw_ctxt *ctxt;
+
+		ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
+		first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
+					   rdma->sc_port_num, cqe, first_wr);
+		cqe = NULL;
+	}
+
+	do {
+		if (atomic_sub_return(cc->cc_sqecount,
+				      &rdma->sc_sq_avail) > 0) {
+			ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+			if (ret)
+				break;
+			return 0;
+		}
+
+		atomic_inc(&rdma_stat_sq_starve);
+		atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+		wait_event(rdma->sc_send_wait,
+			   atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
+	} while (1);
+
+	pr_err("svcrdma: ib_post_send failed (%d)\n", ret);
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+
+	/* If even one was posted, there will be a completion. */
+	if (bad_wr != first_wr)
+		return 0;
+
+	atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+	return -ENOTCONN;
+}
+
+/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
+ */
+static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
+			       unsigned int len,
+			       struct svc_rdma_rw_ctxt *ctxt)
+{
+	struct scatterlist *sg = ctxt->rw_sg_table.sgl;
+
+	sg_set_buf(&sg[0], info->wi_base, len);
+	info->wi_base += len;
+
+	ctxt->rw_nents = 1;
+}
+
+/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
+ */
+static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
+				    unsigned int remaining,
+				    struct svc_rdma_rw_ctxt *ctxt)
+{
+	unsigned int sge_no, sge_bytes, page_off, page_no;
+	struct xdr_buf *xdr = info->wi_xdr;
+	struct scatterlist *sg;
+	struct page **page;
+
+	page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
+	page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
+	page = xdr->pages + page_no;
+	info->wi_next_off += remaining;
+	sg = ctxt->rw_sg_table.sgl;
+	sge_no = 0;
+	do {
+		sge_bytes = min_t(unsigned int, remaining,
+				  PAGE_SIZE - page_off);
+		sg_set_page(sg, *page, sge_bytes, page_off);
+
+		remaining -= sge_bytes;
+		sg = sg_next(sg);
+		page_off = 0;
+		sge_no++;
+		page++;
+	} while (remaining);
+
+	ctxt->rw_nents = sge_no;
+}
+
+/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
+ * an RPC Reply.
+ */
+static int
+svc_rdma_build_writes(struct svc_rdma_write_info *info,
+		      void (*constructor)(struct svc_rdma_write_info *info,
+					  unsigned int len,
+					  struct svc_rdma_rw_ctxt *ctxt),
+		      unsigned int remaining)
+{
+	struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
+	struct svcxprt_rdma *rdma = cc->cc_rdma;
+	struct svc_rdma_rw_ctxt *ctxt;
+	__be32 *seg;
+	int ret;
+
+	cc->cc_cqe.done = svc_rdma_write_done;
+	seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
+	do {
+		unsigned int write_len;
+		u32 seg_length, seg_handle;
+		u64 seg_offset;
+
+		if (info->wi_seg_no >= info->wi_nsegs)
+			goto out_overflow;
+
+		seg_handle = be32_to_cpup(seg);
+		seg_length = be32_to_cpup(seg + 1);
+		xdr_decode_hyper(seg + 2, &seg_offset);
+		seg_offset += info->wi_seg_off;
+
+		write_len = min(remaining, seg_length - info->wi_seg_off);
+		ctxt = svc_rdma_get_rw_ctxt(rdma,
+					    (write_len >> PAGE_SHIFT) + 2);
+		if (!ctxt)
+			goto out_noctx;
+
+		constructor(info, write_len, ctxt);
+		ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
+				       rdma->sc_port_num, ctxt->rw_sg_table.sgl,
+				       ctxt->rw_nents, 0, seg_offset,
+				       seg_handle, DMA_TO_DEVICE);
+		if (ret < 0)
+			goto out_initerr;
+
+		list_add(&ctxt->rw_list, &cc->cc_rwctxts);
+		cc->cc_sqecount += ret;
+		if (write_len == seg_length - info->wi_seg_off) {
+			seg += 4;
+			info->wi_seg_no++;
+			info->wi_seg_off = 0;
+		} else {
+			info->wi_seg_off += write_len;
+		}
+		remaining -= write_len;
+	} while (remaining);
+
+	return 0;
+
+out_overflow:
+	dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
+		info->wi_nsegs);
+	return -E2BIG;
+
+out_noctx:
+	dprintk("svcrdma: no R/W ctxs available\n");
+	return -ENOMEM;
+
+out_initerr:
+	svc_rdma_put_rw_ctxt(rdma, ctxt);
+	pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
+	return -EIO;
+}
+
+/* Send one of an xdr_buf's kvecs by itself. To send a Reply
+ * chunk, the whole RPC Reply is written back to the client.
+ * This function writes either the head or tail of the xdr_buf
+ * containing the Reply.
+ */
+static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
+				  struct kvec *vec)
+{
+	info->wi_base = vec->iov_base;
+	return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
+				     vec->iov_len);
+}
+
+/* Send an xdr_buf's page list by itself. A Write chunk is
+ * just the page list. a Reply chunk is the head, page list,
+ * and tail. This function is shared between the two types
+ * of chunk.
+ */
+static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
+				      struct xdr_buf *xdr)
+{
+	info->wi_xdr = xdr;
+	info->wi_next_off = 0;
+	return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
+				     xdr->page_len);
+}
+
+/**
+ * svc_rdma_send_write_chunk - Write all segments in a Write chunk
+ * @rdma: controlling RDMA transport
+ * @wr_ch: Write chunk provided by client
+ * @xdr: xdr_buf containing the data payload
+ *
+ * Returns a non-negative number of bytes the chunk consumed, or
+ *	%-E2BIG if the payload was larger than the Write chunk,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ */
+int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
+			      struct xdr_buf *xdr)
+{
+	struct svc_rdma_write_info *info;
+	int ret;
+
+	if (!xdr->page_len)
+		return 0;
+
+	info = svc_rdma_write_info_alloc(rdma, wr_ch);
+	if (!info)
+		return -ENOMEM;
+
+	ret = svc_rdma_send_xdr_pagelist(info, xdr);
+	if (ret < 0)
+		goto out_err;
+
+	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+	if (ret < 0)
+		goto out_err;
+	return xdr->page_len;
+
+out_err:
+	svc_rdma_write_info_free(info);
+	return ret;
+}
+
+/**
+ * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
+ * @rdma: controlling RDMA transport
+ * @rp_ch: Reply chunk provided by client
+ * @writelist: true if client provided a Write list
+ * @xdr: xdr_buf containing an RPC Reply
+ *
+ * Returns a non-negative number of bytes the chunk consumed, or
+ *	%-E2BIG if the payload was larger than the Reply chunk,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ */
+int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
+			      bool writelist, struct xdr_buf *xdr)
+{
+	struct svc_rdma_write_info *info;
+	int consumed, ret;
+
+	info = svc_rdma_write_info_alloc(rdma, rp_ch);
+	if (!info)
+		return -ENOMEM;
+
+	ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
+	if (ret < 0)
+		goto out_err;
+	consumed = xdr->head[0].iov_len;
+
+	/* Send the page list in the Reply chunk only if the
+	 * client did not provide Write chunks.
+	 */
+	if (!writelist && xdr->page_len) {
+		ret = svc_rdma_send_xdr_pagelist(info, xdr);
+		if (ret < 0)
+			goto out_err;
+		consumed += xdr->page_len;
+	}
+
+	if (xdr->tail[0].iov_len) {
+		ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
+		if (ret < 0)
+			goto out_err;
+		consumed += xdr->tail[0].iov_len;
+	}
+
+	ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
+	if (ret < 0)
+		goto out_err;
+	return consumed;
+
+out_err:
+	svc_rdma_write_info_free(info);
+	return ret;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 515221b16d09..1736337f3a55 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -1,4 +1,5 @@
 /*
+ * Copyright (c) 2016 Oracle. All rights reserved.
  * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  *
@@ -40,6 +41,63 @@
  * Author: Tom Tucker <tom@opengridcomputing.com>
  */
 
+/* Operation
+ *
+ * The main entry point is svc_rdma_sendto. This is called by the
+ * RPC server when an RPC Reply is ready to be transmitted to a client.
+ *
+ * The passed-in svc_rqst contains a struct xdr_buf which holds an
+ * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA
+ * transport header, post all Write WRs needed for this Reply, then post
+ * a Send WR conveying the transport header and the RPC message itself to
+ * the client.
+ *
+ * svc_rdma_sendto must fully transmit the Reply before returning, as
+ * the svc_rqst will be recycled as soon as sendto returns. Remaining
+ * resources referred to by the svc_rqst are also recycled at that time.
+ * Therefore any resources that must remain longer must be detached
+ * from the svc_rqst and released later.
+ *
+ * Page Management
+ *
+ * The I/O that performs Reply transmission is asynchronous, and may
+ * complete well after sendto returns. Thus pages under I/O must be
+ * removed from the svc_rqst before sendto returns.
+ *
+ * The logic here depends on Send Queue and completion ordering. Since
+ * the Send WR is always posted last, it will always complete last. Thus
+ * when it completes, it is guaranteed that all previous Write WRs have
+ * also completed.
+ *
+ * Write WRs are constructed and posted. Each Write segment gets its own
+ * svc_rdma_rw_ctxt, allowing the Write completion handler to find and
+ * DMA-unmap the pages under I/O for that Write segment. The Write
+ * completion handler does not release any pages.
+ *
+ * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt.
+ * The ownership of all of the Reply's pages are transferred into that
+ * ctxt, the Send WR is posted, and sendto returns.
+ *
+ * The svc_rdma_op_ctxt is presented when the Send WR completes. The
+ * Send completion handler finally releases the Reply's pages.
+ *
+ * This mechanism also assumes that completions on the transport's Send
+ * Completion Queue do not run in parallel. Otherwise a Write completion
+ * and Send completion running at the same time could release pages that
+ * are still DMA-mapped.
+ *
+ * Error Handling
+ *
+ * - If the Send WR is posted successfully, it will either complete
+ *   successfully, or get flushed. Either way, the Send completion
+ *   handler releases the Reply's pages.
+ * - If the Send WR cannot be not posted, the forward path releases
+ *   the Reply's pages.
+ *
+ * This handles the case, without the use of page reference counting,
+ * where two different Write segments send portions of the same page.
+ */
+
 #include <linux/sunrpc/debug.h>
 #include <linux/sunrpc/rpc_rdma.h>
 #include <linux/spinlock.h>
@@ -55,113 +113,141 @@ static u32 xdr_padsize(u32 len)
 	return (len & 3) ? (4 - (len & 3)) : 0;
 }
 
-int svc_rdma_map_xdr(struct svcxprt_rdma *xprt,
-		     struct xdr_buf *xdr,
-		     struct svc_rdma_req_map *vec,
-		     bool write_chunk_present)
+/* Returns length of transport header, in bytes.
+ */
+static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp)
 {
-	int sge_no;
-	u32 sge_bytes;
-	u32 page_bytes;
-	u32 page_off;
-	int page_no;
-
-	if (xdr->len !=
-	    (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
-		pr_err("svcrdma: %s: XDR buffer length error\n", __func__);
-		return -EIO;
-	}
+	unsigned int nsegs;
+	__be32 *p;
 
-	/* Skip the first sge, this is for the RPCRDMA header */
-	sge_no = 1;
+	p = rdma_resp;
+
+	/* RPC-over-RDMA V1 replies never have a Read list. */
+	p += rpcrdma_fixed_maxsz + 1;
 
-	/* Head SGE */
-	vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
-	vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
-	sge_no++;
-
-	/* pages SGE */
-	page_no = 0;
-	page_bytes = xdr->page_len;
-	page_off = xdr->page_base;
-	while (page_bytes) {
-		vec->sge[sge_no].iov_base =
-			page_address(xdr->pages[page_no]) + page_off;
-		sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
-		page_bytes -= sge_bytes;
-		vec->sge[sge_no].iov_len = sge_bytes;
-
-		sge_no++;
-		page_no++;
-		page_off = 0; /* reset for next time through loop */
+	/* Skip Write list. */
+	while (*p++ != xdr_zero) {
+		nsegs = be32_to_cpup(p++);
+		p += nsegs * rpcrdma_segment_maxsz;
 	}
 
-	/* Tail SGE */
-	if (xdr->tail[0].iov_len) {
-		unsigned char *base = xdr->tail[0].iov_base;
-		size_t len = xdr->tail[0].iov_len;
-		u32 xdr_pad = xdr_padsize(xdr->page_len);
+	/* Skip Reply chunk. */
+	if (*p++ != xdr_zero) {
+		nsegs = be32_to_cpup(p++);
+		p += nsegs * rpcrdma_segment_maxsz;
+	}
 
-		if (write_chunk_present && xdr_pad) {
-			base += xdr_pad;
-			len -= xdr_pad;
-		}
+	return (unsigned long)p - (unsigned long)rdma_resp;
+}
 
-		if (len) {
-			vec->sge[sge_no].iov_base = base;
-			vec->sge[sge_no].iov_len = len;
-			sge_no++;
+/* One Write chunk is copied from Call transport header to Reply
+ * transport header. Each segment's length field is updated to
+ * reflect number of bytes consumed in the segment.
+ *
+ * Returns number of segments in this chunk.
+ */
+static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+					   unsigned int remaining)
+{
+	unsigned int i, nsegs;
+	u32 seg_len;
+
+	/* Write list discriminator */
+	*dst++ = *src++;
+
+	/* number of segments in this chunk */
+	nsegs = be32_to_cpup(src);
+	*dst++ = *src++;
+
+	for (i = nsegs; i; i--) {
+		/* segment's RDMA handle */
+		*dst++ = *src++;
+
+		/* bytes returned in this segment */
+		seg_len = be32_to_cpu(*src);
+		if (remaining >= seg_len) {
+			/* entire segment was consumed */
+			*dst = *src;
+			remaining -= seg_len;
+		} else {
+			/* segment only partly filled */
+			*dst = cpu_to_be32(remaining);
+			remaining = 0;
 		}
-	}
+		dst++; src++;
 
-	dprintk("svcrdma: %s: sge_no %d page_no %d "
-		"page_base %u page_len %u head_len %zu tail_len %zu\n",
-		__func__, sge_no, page_no, xdr->page_base, xdr->page_len,
-		xdr->head[0].iov_len, xdr->tail[0].iov_len);
+		/* segment's RDMA offset */
+		*dst++ = *src++;
+		*dst++ = *src++;
+	}
 
-	vec->count = sge_no;
-	return 0;
+	return nsegs;
 }
 
-static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
-			      struct xdr_buf *xdr,
-			      u32 xdr_off, size_t len, int dir)
+/* The client provided a Write list in the Call message. Fill in
+ * the segments in the first Write chunk in the Reply's transport
+ * header with the number of bytes consumed in each segment.
+ * Remaining chunks are returned unused.
+ *
+ * Assumptions:
+ *  - Client has provided only one Write chunk
+ */
+static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
+					   unsigned int consumed)
 {
-	struct page *page;
-	dma_addr_t dma_addr;
-	if (xdr_off < xdr->head[0].iov_len) {
-		/* This offset is in the head */
-		xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
-		page = virt_to_page(xdr->head[0].iov_base);
-	} else {
-		xdr_off -= xdr->head[0].iov_len;
-		if (xdr_off < xdr->page_len) {
-			/* This offset is in the page list */
-			xdr_off += xdr->page_base;
-			page = xdr->pages[xdr_off >> PAGE_SHIFT];
-			xdr_off &= ~PAGE_MASK;
-		} else {
-			/* This offset is in the tail */
-			xdr_off -= xdr->page_len;
-			xdr_off += (unsigned long)
-				xdr->tail[0].iov_base & ~PAGE_MASK;
-			page = virt_to_page(xdr->tail[0].iov_base);
-		}
+	unsigned int nsegs;
+	__be32 *p, *q;
+
+	/* RPC-over-RDMA V1 replies never have a Read list. */
+	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+	q = wr_ch;
+	while (*q != xdr_zero) {
+		nsegs = xdr_encode_write_chunk(p, q, consumed);
+		q += 2 + nsegs * rpcrdma_segment_maxsz;
+		p += 2 + nsegs * rpcrdma_segment_maxsz;
+		consumed = 0;
 	}
-	dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
-				   min_t(size_t, PAGE_SIZE, len), dir);
-	return dma_addr;
+
+	/* Terminate Write list */
+	*p++ = xdr_zero;
+
+	/* Reply chunk discriminator; may be replaced later */
+	*p = xdr_zero;
+}
+
+/* The client provided a Reply chunk in the Call message. Fill in
+ * the segments in the Reply chunk in the Reply message with the
+ * number of bytes consumed in each segment.
+ *
+ * Assumptions:
+ * - Reply can always fit in the provided Reply chunk
+ */
+static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
+					    unsigned int consumed)
+{
+	__be32 *p;
+
+	/* Find the Reply chunk in the Reply's xprt header.
+	 * RPC-over-RDMA V1 replies never have a Read list.
+	 */
+	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+	/* Skip past Write list */
+	while (*p++ != xdr_zero)
+		p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+
+	xdr_encode_write_chunk(p, rp_ch, consumed);
 }
 
 /* Parse the RPC Call's transport header.
  */
-static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
-				      struct rpcrdma_write_array **write,
-				      struct rpcrdma_write_array **reply)
+static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
+				      __be32 **write, __be32 **reply)
 {
 	__be32 *p;
 
-	p = (__be32 *)&rmsgp->rm_body.rm_chunks[0];
+	p = rdma_argp + rpcrdma_fixed_maxsz;
 
 	/* Read list */
 	while (*p++ != xdr_zero)
@@ -169,7 +255,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
 
 	/* Write list */
 	if (*p != xdr_zero) {
-		*write = (struct rpcrdma_write_array *)p;
+		*write = p;
 		while (*p++ != xdr_zero)
 			p += 1 + be32_to_cpu(*p) * 4;
 	} else {
@@ -179,7 +265,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
 
 	/* Reply chunk */
 	if (*p != xdr_zero)
-		*reply = (struct rpcrdma_write_array *)p;
+		*reply = p;
 	else
 		*reply = NULL;
 }
@@ -189,360 +275,321 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp,
  * Invalidate, and responder chooses one rkey to invalidate.
  *
  * Find a candidate rkey to invalidate when sending a reply.  Picks the
- * first rkey it finds in the chunks lists.
+ * first R_key it finds in the chunk lists.
  *
  * Returns zero if RPC's chunk lists are empty.
  */
-static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp,
-				 struct rpcrdma_write_array *wr_ary,
-				 struct rpcrdma_write_array *rp_ary)
+static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
+				 __be32 *wr_lst, __be32 *rp_ch)
 {
-	struct rpcrdma_read_chunk *rd_ary;
-	struct rpcrdma_segment *arg_ch;
+	__be32 *p;
 
-	rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0];
-	if (rd_ary->rc_discrim != xdr_zero)
-		return be32_to_cpu(rd_ary->rc_target.rs_handle);
+	p = rdma_argp + rpcrdma_fixed_maxsz;
+	if (*p != xdr_zero)
+		p += 2;
+	else if (wr_lst && be32_to_cpup(wr_lst + 1))
+		p = wr_lst + 2;
+	else if (rp_ch && be32_to_cpup(rp_ch + 1))
+		p = rp_ch + 2;
+	else
+		return 0;
+	return be32_to_cpup(p);
+}
 
-	if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) {
-		arg_ch = &wr_ary->wc_array[0].wc_target;
-		return be32_to_cpu(arg_ch->rs_handle);
-	}
+/* ib_dma_map_page() is used here because svc_rdma_dma_unmap()
+ * is used during completion to DMA-unmap this memory, and
+ * it uses ib_dma_unmap_page() exclusively.
+ */
+static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
+				struct svc_rdma_op_ctxt *ctxt,
+				unsigned int sge_no,
+				unsigned char *base,
+				unsigned int len)
+{
+	unsigned long offset = (unsigned long)base & ~PAGE_MASK;
+	struct ib_device *dev = rdma->sc_cm_id->device;
+	dma_addr_t dma_addr;
 
-	if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) {
-		arg_ch = &rp_ary->wc_array[0].wc_target;
-		return be32_to_cpu(arg_ch->rs_handle);
-	}
+	dma_addr = ib_dma_map_page(dev, virt_to_page(base),
+				   offset, len, DMA_TO_DEVICE);
+	if (ib_dma_mapping_error(dev, dma_addr))
+		return -EIO;
 
+	ctxt->sge[sge_no].addr = dma_addr;
+	ctxt->sge[sge_no].length = len;
+	ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+	svc_rdma_count_mappings(rdma, ctxt);
 	return 0;
 }
 
-/* Assumptions:
- * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
- */
-static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
-		      u32 rmr, u64 to,
-		      u32 xdr_off, int write_len,
-		      struct svc_rdma_req_map *vec)
+static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
+				 struct svc_rdma_op_ctxt *ctxt,
+				 unsigned int sge_no,
+				 struct page *page,
+				 unsigned int offset,
+				 unsigned int len)
 {
-	struct ib_rdma_wr write_wr;
-	struct ib_sge *sge;
-	int xdr_sge_no;
-	int sge_no;
-	int sge_bytes;
-	int sge_off;
-	int bc;
-	struct svc_rdma_op_ctxt *ctxt;
+	struct ib_device *dev = rdma->sc_cm_id->device;
+	dma_addr_t dma_addr;
 
-	if (vec->count > RPCSVC_MAXPAGES) {
-		pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
+	dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE);
+	if (ib_dma_mapping_error(dev, dma_addr))
 		return -EIO;
-	}
 
-	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
-		"write_len=%d, vec->sge=%p, vec->count=%lu\n",
-		rmr, (unsigned long long)to, xdr_off,
-		write_len, vec->sge, vec->count);
+	ctxt->sge[sge_no].addr = dma_addr;
+	ctxt->sge[sge_no].length = len;
+	ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
+	svc_rdma_count_mappings(rdma, ctxt);
+	return 0;
+}
 
-	ctxt = svc_rdma_get_context(xprt);
+/**
+ * svc_rdma_map_reply_hdr - DMA map the transport header buffer
+ * @rdma: controlling transport
+ * @ctxt: op_ctxt for the Send WR
+ * @rdma_resp: buffer containing transport header
+ * @len: length of transport header
+ *
+ * Returns:
+ *	%0 if the header is DMA mapped,
+ *	%-EIO if DMA mapping failed.
+ */
+int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
+			   struct svc_rdma_op_ctxt *ctxt,
+			   __be32 *rdma_resp,
+			   unsigned int len)
+{
 	ctxt->direction = DMA_TO_DEVICE;
-	sge = ctxt->sge;
-
-	/* Find the SGE associated with xdr_off */
-	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
-	     xdr_sge_no++) {
-		if (vec->sge[xdr_sge_no].iov_len > bc)
-			break;
-		bc -= vec->sge[xdr_sge_no].iov_len;
-	}
-
-	sge_off = bc;
-	bc = write_len;
-	sge_no = 0;
-
-	/* Copy the remaining SGE */
-	while (bc != 0) {
-		sge_bytes = min_t(size_t,
-			  bc, vec->sge[xdr_sge_no].iov_len-sge_off);
-		sge[sge_no].length = sge_bytes;
-		sge[sge_no].addr =
-			dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
-				    sge_bytes, DMA_TO_DEVICE);
-		xdr_off += sge_bytes;
-		if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-					 sge[sge_no].addr))
-			goto err;
-		svc_rdma_count_mappings(xprt, ctxt);
-		sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey;
-		ctxt->count++;
-		sge_off = 0;
-		sge_no++;
-		xdr_sge_no++;
-		if (xdr_sge_no > vec->count) {
-			pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
-			goto err;
-		}
-		bc -= sge_bytes;
-		if (sge_no == xprt->sc_max_sge)
-			break;
-	}
-
-	/* Prepare WRITE WR */
-	memset(&write_wr, 0, sizeof write_wr);
-	ctxt->cqe.done = svc_rdma_wc_write;
-	write_wr.wr.wr_cqe = &ctxt->cqe;
-	write_wr.wr.sg_list = &sge[0];
-	write_wr.wr.num_sge = sge_no;
-	write_wr.wr.opcode = IB_WR_RDMA_WRITE;
-	write_wr.wr.send_flags = IB_SEND_SIGNALED;
-	write_wr.rkey = rmr;
-	write_wr.remote_addr = to;
-
-	/* Post It */
-	atomic_inc(&rdma_stat_write);
-	if (svc_rdma_send(xprt, &write_wr.wr))
-		goto err;
-	return write_len - bc;
- err:
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 0);
-	return -EIO;
+	ctxt->pages[0] = virt_to_page(rdma_resp);
+	ctxt->count = 1;
+	return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len);
 }
 
-noinline
-static int send_write_chunks(struct svcxprt_rdma *xprt,
-			     struct rpcrdma_write_array *wr_ary,
-			     struct rpcrdma_msg *rdma_resp,
-			     struct svc_rqst *rqstp,
-			     struct svc_rdma_req_map *vec)
+/* Load the xdr_buf into the ctxt's sge array, and DMA map each
+ * element as it is added.
+ *
+ * Returns the number of sge elements loaded on success, or
+ * a negative errno on failure.
+ */
+static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
+				  struct svc_rdma_op_ctxt *ctxt,
+				  struct xdr_buf *xdr, __be32 *wr_lst)
 {
-	u32 xfer_len = rqstp->rq_res.page_len;
-	int write_len;
-	u32 xdr_off;
-	int chunk_off;
-	int chunk_no;
-	int nchunks;
-	struct rpcrdma_write_array *res_ary;
+	unsigned int len, sge_no, remaining, page_off;
+	struct page **ppages;
+	unsigned char *base;
+	u32 xdr_pad;
 	int ret;
 
-	res_ary = (struct rpcrdma_write_array *)
-		&rdma_resp->rm_body.rm_chunks[1];
-
-	/* Write chunks start at the pagelist */
-	nchunks = be32_to_cpu(wr_ary->wc_nchunks);
-	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
-	     xfer_len && chunk_no < nchunks;
-	     chunk_no++) {
-		struct rpcrdma_segment *arg_ch;
-		u64 rs_offset;
-
-		arg_ch = &wr_ary->wc_array[chunk_no].wc_target;
-		write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
-
-		/* Prepare the response chunk given the length actually
-		 * written */
-		xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
-		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
-						arg_ch->rs_handle,
-						arg_ch->rs_offset,
-						write_len);
-		chunk_off = 0;
-		while (write_len) {
-			ret = send_write(xprt, rqstp,
-					 be32_to_cpu(arg_ch->rs_handle),
-					 rs_offset + chunk_off,
-					 xdr_off,
-					 write_len,
-					 vec);
-			if (ret <= 0)
-				goto out_err;
-			chunk_off += ret;
-			xdr_off += ret;
-			xfer_len -= ret;
-			write_len -= ret;
+	sge_no = 1;
+
+	ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++,
+				   xdr->head[0].iov_base,
+				   xdr->head[0].iov_len);
+	if (ret < 0)
+		return ret;
+
+	/* If a Write chunk is present, the xdr_buf's page list
+	 * is not included inline. However the Upper Layer may
+	 * have added XDR padding in the tail buffer, and that
+	 * should not be included inline.
+	 */
+	if (wr_lst) {
+		base = xdr->tail[0].iov_base;
+		len = xdr->tail[0].iov_len;
+		xdr_pad = xdr_padsize(xdr->page_len);
+
+		if (len && xdr_pad) {
+			base += xdr_pad;
+			len -= xdr_pad;
 		}
+
+		goto tail;
+	}
+
+	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+	page_off = xdr->page_base & ~PAGE_MASK;
+	remaining = xdr->page_len;
+	while (remaining) {
+		len = min_t(u32, PAGE_SIZE - page_off, remaining);
+
+		ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++,
+					    *ppages++, page_off, len);
+		if (ret < 0)
+			return ret;
+
+		remaining -= len;
+		page_off = 0;
 	}
-	/* Update the req with the number of chunks actually used */
-	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
 
-	return rqstp->rq_res.page_len;
+	base = xdr->tail[0].iov_base;
+	len = xdr->tail[0].iov_len;
+tail:
+	if (len) {
+		ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len);
+		if (ret < 0)
+			return ret;
+	}
 
-out_err:
-	pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret);
-	return -EIO;
+	return sge_no - 1;
 }
 
-noinline
-static int send_reply_chunks(struct svcxprt_rdma *xprt,
-			     struct rpcrdma_write_array *rp_ary,
-			     struct rpcrdma_msg *rdma_resp,
-			     struct svc_rqst *rqstp,
-			     struct svc_rdma_req_map *vec)
+/* The svc_rqst and all resources it owns are released as soon as
+ * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt
+ * so they are released by the Send completion handler.
+ */
+static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
+				   struct svc_rdma_op_ctxt *ctxt)
 {
-	u32 xfer_len = rqstp->rq_res.len;
-	int write_len;
-	u32 xdr_off;
-	int chunk_no;
-	int chunk_off;
-	int nchunks;
-	struct rpcrdma_segment *ch;
-	struct rpcrdma_write_array *res_ary;
-	int ret;
+	int i, pages = rqstp->rq_next_page - rqstp->rq_respages;
 
-	/* XXX: need to fix when reply lists occur with read-list and or
-	 * write-list */
-	res_ary = (struct rpcrdma_write_array *)
-		&rdma_resp->rm_body.rm_chunks[2];
-
-	/* xdr offset starts at RPC message */
-	nchunks = be32_to_cpu(rp_ary->wc_nchunks);
-	for (xdr_off = 0, chunk_no = 0;
-	     xfer_len && chunk_no < nchunks;
-	     chunk_no++) {
-		u64 rs_offset;
-		ch = &rp_ary->wc_array[chunk_no].wc_target;
-		write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
-
-		/* Prepare the reply chunk given the length actually
-		 * written */
-		xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
-		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
-						ch->rs_handle, ch->rs_offset,
-						write_len);
-		chunk_off = 0;
-		while (write_len) {
-			ret = send_write(xprt, rqstp,
-					 be32_to_cpu(ch->rs_handle),
-					 rs_offset + chunk_off,
-					 xdr_off,
-					 write_len,
-					 vec);
-			if (ret <= 0)
-				goto out_err;
-			chunk_off += ret;
-			xdr_off += ret;
-			xfer_len -= ret;
-			write_len -= ret;
-		}
+	ctxt->count += pages;
+	for (i = 0; i < pages; i++) {
+		ctxt->pages[i + 1] = rqstp->rq_respages[i];
+		rqstp->rq_respages[i] = NULL;
 	}
-	/* Update the req with the number of chunks actually used */
-	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+	rqstp->rq_next_page = rqstp->rq_respages + 1;
+}
 
-	return rqstp->rq_res.len;
+/**
+ * svc_rdma_post_send_wr - Set up and post one Send Work Request
+ * @rdma: controlling transport
+ * @ctxt: op_ctxt for transmitting the Send WR
+ * @num_sge: number of SGEs to send
+ * @inv_rkey: R_key argument to Send With Invalidate, or zero
+ *
+ * Returns:
+ *	%0 if the Send* was posted successfully,
+ *	%-ENOTCONN if the connection was lost or dropped,
+ *	%-EINVAL if there was a problem with the Send we built,
+ *	%-ENOMEM if ib_post_send failed.
+ */
+int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
+			  struct svc_rdma_op_ctxt *ctxt, int num_sge,
+			  u32 inv_rkey)
+{
+	struct ib_send_wr *send_wr = &ctxt->send_wr;
 
-out_err:
-	pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret);
-	return -EIO;
+	dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge);
+
+	send_wr->next = NULL;
+	ctxt->cqe.done = svc_rdma_wc_send;
+	send_wr->wr_cqe = &ctxt->cqe;
+	send_wr->sg_list = ctxt->sge;
+	send_wr->num_sge = num_sge;
+	send_wr->send_flags = IB_SEND_SIGNALED;
+	if (inv_rkey) {
+		send_wr->opcode = IB_WR_SEND_WITH_INV;
+		send_wr->ex.invalidate_rkey = inv_rkey;
+	} else {
+		send_wr->opcode = IB_WR_SEND;
+	}
+
+	return svc_rdma_send(rdma, send_wr);
 }
 
-/* This function prepares the portion of the RPCRDMA message to be
- * sent in the RDMA_SEND. This function is called after data sent via
- * RDMA has already been transmitted. There are three cases:
- * - The RPCRDMA header, RPC header, and payload are all sent in a
- *   single RDMA_SEND. This is the "inline" case.
- * - The RPCRDMA header and some portion of the RPC header and data
- *   are sent via this RDMA_SEND and another portion of the data is
- *   sent via RDMA.
- * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
- *   header and data are all transmitted via RDMA.
- * In all three cases, this function prepares the RPCRDMA header in
- * sge[0], the 'type' parameter indicates the type to place in the
- * RPCRDMA header, and the 'byte_count' field indicates how much of
- * the XDR to include in this RDMA_SEND. NB: The offset of the payload
- * to send is zero in the XDR.
+/* Prepare the portion of the RPC Reply that will be transmitted
+ * via RDMA Send. The RPC-over-RDMA transport header is prepared
+ * in sge[0], and the RPC xdr_buf is prepared in following sges.
+ *
+ * Depending on whether a Write list or Reply chunk is present,
+ * the server may send all, a portion of, or none of the xdr_buf.
+ * In the latter case, only the transport header (sge[0]) is
+ * transmitted.
+ *
+ * RDMA Send is the last step of transmitting an RPC reply. Pages
+ * involved in the earlier RDMA Writes are here transferred out
+ * of the rqstp and into the ctxt's page array. These pages are
+ * DMA unmapped by each Write completion, but the subsequent Send
+ * completion finally releases these pages.
+ *
+ * Assumptions:
+ * - The Reply's transport header will never be larger than a page.
  */
-static int send_reply(struct svcxprt_rdma *rdma,
-		      struct svc_rqst *rqstp,
-		      struct page *page,
-		      struct rpcrdma_msg *rdma_resp,
-		      struct svc_rdma_req_map *vec,
-		      int byte_count,
-		      u32 inv_rkey)
+static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
+				   __be32 *rdma_argp, __be32 *rdma_resp,
+				   struct svc_rqst *rqstp,
+				   __be32 *wr_lst, __be32 *rp_ch)
 {
 	struct svc_rdma_op_ctxt *ctxt;
-	struct ib_send_wr send_wr;
-	u32 xdr_off;
-	int sge_no;
-	int sge_bytes;
-	int page_no;
-	int pages;
-	int ret = -EIO;
-
-	/* Prepare the context */
+	u32 inv_rkey;
+	int ret;
+
+	dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n",
+		(rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"),
+		rqstp->rq_res.head[0].iov_len,
+		rqstp->rq_res.page_len,
+		rqstp->rq_res.tail[0].iov_len);
+
 	ctxt = svc_rdma_get_context(rdma);
-	ctxt->direction = DMA_TO_DEVICE;
-	ctxt->pages[0] = page;
-	ctxt->count = 1;
 
-	/* Prepare the SGE for the RPCRDMA Header */
-	ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
-	ctxt->sge[0].length =
-	    svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp);
-	ctxt->sge[0].addr =
-	    ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
-			    ctxt->sge[0].length, DMA_TO_DEVICE);
-	if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
+	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp,
+				     svc_rdma_reply_hdr_len(rdma_resp));
+	if (ret < 0)
 		goto err;
-	svc_rdma_count_mappings(rdma, ctxt);
-
-	ctxt->direction = DMA_TO_DEVICE;
 
-	/* Map the payload indicated by 'byte_count' */
-	xdr_off = 0;
-	for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
-		sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
-		byte_count -= sge_bytes;
-		ctxt->sge[sge_no].addr =
-			dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
-				    sge_bytes, DMA_TO_DEVICE);
-		xdr_off += sge_bytes;
-		if (ib_dma_mapping_error(rdma->sc_cm_id->device,
-					 ctxt->sge[sge_no].addr))
+	if (!rp_ch) {
+		ret = svc_rdma_map_reply_msg(rdma, ctxt,
+					     &rqstp->rq_res, wr_lst);
+		if (ret < 0)
 			goto err;
-		svc_rdma_count_mappings(rdma, ctxt);
-		ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey;
-		ctxt->sge[sge_no].length = sge_bytes;
 	}
-	if (byte_count != 0) {
-		pr_err("svcrdma: Could not map %d bytes\n", byte_count);
+
+	svc_rdma_save_io_pages(rqstp, ctxt);
+
+	inv_rkey = 0;
+	if (rdma->sc_snd_w_inv)
+		inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch);
+	ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey);
+	if (ret)
 		goto err;
-	}
 
-	/* Save all respages in the ctxt and remove them from the
-	 * respages array. They are our pages until the I/O
-	 * completes.
+	return 0;
+
+err:
+	pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
+	svc_rdma_unmap_dma(ctxt);
+	svc_rdma_put_context(ctxt, 1);
+	return ret;
+}
+
+/* Given the client-provided Write and Reply chunks, the server was not
+ * able to form a complete reply. Return an RDMA_ERROR message so the
+ * client can retire this RPC transaction. As above, the Send completion
+ * routine releases payload pages that were part of a previous RDMA Write.
+ *
+ * Remote Invalidation is skipped for simplicity.
+ */
+static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
+				   __be32 *rdma_resp, struct svc_rqst *rqstp)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+	__be32 *p;
+	int ret;
+
+	ctxt = svc_rdma_get_context(rdma);
+
+	/* Replace the original transport header with an
+	 * RDMA_ERROR response. XID etc are preserved.
 	 */
-	pages = rqstp->rq_next_page - rqstp->rq_respages;
-	for (page_no = 0; page_no < pages; page_no++) {
-		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
-		ctxt->count++;
-		rqstp->rq_respages[page_no] = NULL;
-	}
-	rqstp->rq_next_page = rqstp->rq_respages + 1;
+	p = rdma_resp + 3;
+	*p++ = rdma_error;
+	*p   = err_chunk;
 
-	if (sge_no > rdma->sc_max_sge) {
-		pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+	ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20);
+	if (ret < 0)
 		goto err;
-	}
-	memset(&send_wr, 0, sizeof send_wr);
-	ctxt->cqe.done = svc_rdma_wc_send;
-	send_wr.wr_cqe = &ctxt->cqe;
-	send_wr.sg_list = ctxt->sge;
-	send_wr.num_sge = sge_no;
-	if (inv_rkey) {
-		send_wr.opcode = IB_WR_SEND_WITH_INV;
-		send_wr.ex.invalidate_rkey = inv_rkey;
-	} else
-		send_wr.opcode = IB_WR_SEND;
-	send_wr.send_flags =  IB_SEND_SIGNALED;
 
-	ret = svc_rdma_send(rdma, &send_wr);
+	svc_rdma_save_io_pages(rqstp, ctxt);
+
+	ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0);
 	if (ret)
 		goto err;
 
 	return 0;
 
- err:
+err:
+	pr_err("svcrdma: failed to post Send WR (%d)\n", ret);
 	svc_rdma_unmap_dma(ctxt);
 	svc_rdma_put_context(ctxt, 1);
 	return ret;
@@ -552,39 +599,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
 {
 }
 
+/**
+ * svc_rdma_sendto - Transmit an RPC reply
+ * @rqstp: processed RPC request, reply XDR already in ::rq_res
+ *
+ * Any resources still associated with @rqstp are released upon return.
+ * If no reply message was possible, the connection is closed.
+ *
+ * Returns:
+ *	%0 if an RPC reply has been successfully posted,
+ *	%-ENOMEM if a resource shortage occurred (connection is lost),
+ *	%-ENOTCONN if posting failed (connection is lost).
+ */
 int svc_rdma_sendto(struct svc_rqst *rqstp)
 {
 	struct svc_xprt *xprt = rqstp->rq_xprt;
 	struct svcxprt_rdma *rdma =
 		container_of(xprt, struct svcxprt_rdma, sc_xprt);
-	struct rpcrdma_msg *rdma_argp;
-	struct rpcrdma_msg *rdma_resp;
-	struct rpcrdma_write_array *wr_ary, *rp_ary;
-	int ret;
-	int inline_bytes;
+	__be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch;
+	struct xdr_buf *xdr = &rqstp->rq_res;
 	struct page *res_page;
-	struct svc_rdma_req_map *vec;
-	u32 inv_rkey;
-	__be32 *p;
-
-	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+	int ret;
 
-	/* Get the RDMA request header. The receive logic always
-	 * places this at the start of page 0.
+	/* Find the call's chunk lists to decide how to send the reply.
+	 * Receive places the Call's xprt header at the start of page 0.
 	 */
 	rdma_argp = page_address(rqstp->rq_pages[0]);
-	svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary);
-
-	inv_rkey = 0;
-	if (rdma->sc_snd_w_inv)
-		inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary);
+	svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch);
 
-	/* Build an req vec for the XDR */
-	vec = svc_rdma_get_req_map(rdma);
-	ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL);
-	if (ret)
-		goto err0;
-	inline_bytes = rqstp->rq_res.len;
+	dprintk("svcrdma: preparing response for XID 0x%08x\n",
+		be32_to_cpup(rdma_argp));
 
 	/* Create the RDMA response header. xprt->xpt_mutex,
 	 * acquired in svc_send(), serializes RPC replies. The
@@ -598,115 +642,57 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
 		goto err0;
 	rdma_resp = page_address(res_page);
 
-	p = &rdma_resp->rm_xid;
-	*p++ = rdma_argp->rm_xid;
-	*p++ = rdma_argp->rm_vers;
+	p = rdma_resp;
+	*p++ = *rdma_argp;
+	*p++ = *(rdma_argp + 1);
 	*p++ = rdma->sc_fc_credits;
-	*p++ = rp_ary ? rdma_nomsg : rdma_msg;
+	*p++ = rp_ch ? rdma_nomsg : rdma_msg;
 
 	/* Start with empty chunks */
 	*p++ = xdr_zero;
 	*p++ = xdr_zero;
 	*p   = xdr_zero;
 
-	/* Send any write-chunk data and build resp write-list */
-	if (wr_ary) {
-		ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec);
+	if (wr_lst) {
+		/* XXX: Presume the client sent only one Write chunk */
+		ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr);
 		if (ret < 0)
-			goto err1;
-		inline_bytes -= ret + xdr_padsize(ret);
+			goto err2;
+		svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret);
 	}
-
-	/* Send any reply-list data and update resp reply-list */
-	if (rp_ary) {
-		ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec);
+	if (rp_ch) {
+		ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr);
 		if (ret < 0)
-			goto err1;
-		inline_bytes -= ret;
+			goto err2;
+		svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret);
 	}
 
-	/* Post a fresh Receive buffer _before_ sending the reply */
 	ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
 	if (ret)
 		goto err1;
-
-	ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec,
-			 inline_bytes, inv_rkey);
+	ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp,
+				      wr_lst, rp_ch);
 	if (ret < 0)
 		goto err0;
+	return 0;
 
-	svc_rdma_put_req_map(rdma, vec);
-	dprintk("svcrdma: send_reply returns %d\n", ret);
-	return ret;
+ err2:
+	if (ret != -E2BIG)
+		goto err1;
+
+	ret = svc_rdma_post_recv(rdma, GFP_KERNEL);
+	if (ret)
+		goto err1;
+	ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp);
+	if (ret < 0)
+		goto err0;
+	return 0;
 
  err1:
 	put_page(res_page);
  err0:
-	svc_rdma_put_req_map(rdma, vec);
 	pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n",
 	       ret);
-	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
 	return -ENOTCONN;
 }
-
-void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
-			 int status)
-{
-	struct ib_send_wr err_wr;
-	struct page *p;
-	struct svc_rdma_op_ctxt *ctxt;
-	enum rpcrdma_errcode err;
-	__be32 *va;
-	int length;
-	int ret;
-
-	ret = svc_rdma_repost_recv(xprt, GFP_KERNEL);
-	if (ret)
-		return;
-
-	p = alloc_page(GFP_KERNEL);
-	if (!p)
-		return;
-	va = page_address(p);
-
-	/* XDR encode an error reply */
-	err = ERR_CHUNK;
-	if (status == -EPROTONOSUPPORT)
-		err = ERR_VERS;
-	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
-
-	ctxt = svc_rdma_get_context(xprt);
-	ctxt->direction = DMA_TO_DEVICE;
-	ctxt->count = 1;
-	ctxt->pages[0] = p;
-
-	/* Prepare SGE for local address */
-	ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey;
-	ctxt->sge[0].length = length;
-	ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
-					    p, 0, length, DMA_TO_DEVICE);
-	if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
-		dprintk("svcrdma: Error mapping buffer for protocol error\n");
-		svc_rdma_put_context(ctxt, 1);
-		return;
-	}
-	svc_rdma_count_mappings(xprt, ctxt);
-
-	/* Prepare SEND WR */
-	memset(&err_wr, 0, sizeof(err_wr));
-	ctxt->cqe.done = svc_rdma_wc_send;
-	err_wr.wr_cqe = &ctxt->cqe;
-	err_wr.sg_list = ctxt->sge;
-	err_wr.num_sge = 1;
-	err_wr.opcode = IB_WR_SEND;
-	err_wr.send_flags = IB_SEND_SIGNALED;
-
-	/* Post It */
-	ret = svc_rdma_send(xprt, &err_wr);
-	if (ret) {
-		dprintk("svcrdma: Error %d posting send for protocol error\n",
-			ret);
-		svc_rdma_unmap_dma(ctxt);
-		svc_rdma_put_context(ctxt, 1);
-	}
-}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index fc8f14c7bfec..a9d9cb1ba4c6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -272,85 +272,6 @@ static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
 	}
 }
 
-static struct svc_rdma_req_map *alloc_req_map(gfp_t flags)
-{
-	struct svc_rdma_req_map *map;
-
-	map = kmalloc(sizeof(*map), flags);
-	if (map)
-		INIT_LIST_HEAD(&map->free);
-	return map;
-}
-
-static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt)
-{
-	unsigned int i;
-
-	/* One for each receive buffer on this connection. */
-	i = xprt->sc_max_requests;
-
-	while (i--) {
-		struct svc_rdma_req_map *map;
-
-		map = alloc_req_map(GFP_KERNEL);
-		if (!map) {
-			dprintk("svcrdma: No memory for request map\n");
-			return false;
-		}
-		list_add(&map->free, &xprt->sc_maps);
-	}
-	return true;
-}
-
-struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt)
-{
-	struct svc_rdma_req_map *map = NULL;
-
-	spin_lock(&xprt->sc_map_lock);
-	if (list_empty(&xprt->sc_maps))
-		goto out_empty;
-
-	map = list_first_entry(&xprt->sc_maps,
-			       struct svc_rdma_req_map, free);
-	list_del_init(&map->free);
-	spin_unlock(&xprt->sc_map_lock);
-
-out:
-	map->count = 0;
-	return map;
-
-out_empty:
-	spin_unlock(&xprt->sc_map_lock);
-
-	/* Pre-allocation amount was incorrect */
-	map = alloc_req_map(GFP_NOIO);
-	if (map)
-		goto out;
-
-	WARN_ONCE(1, "svcrdma: empty request map list?\n");
-	return NULL;
-}
-
-void svc_rdma_put_req_map(struct svcxprt_rdma *xprt,
-			  struct svc_rdma_req_map *map)
-{
-	spin_lock(&xprt->sc_map_lock);
-	list_add(&map->free, &xprt->sc_maps);
-	spin_unlock(&xprt->sc_map_lock);
-}
-
-static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt)
-{
-	while (!list_empty(&xprt->sc_maps)) {
-		struct svc_rdma_req_map *map;
-
-		map = list_first_entry(&xprt->sc_maps,
-				       struct svc_rdma_req_map, free);
-		list_del(&map->free);
-		kfree(map);
-	}
-}
-
 /* QP event handler */
 static void qp_event_handler(struct ib_event *event, void *context)
 {
@@ -474,24 +395,6 @@ void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 }
 
 /**
- * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC
- * @cq:        completion queue
- * @wc:        completed WR
- *
- */
-void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc)
-{
-	struct ib_cqe *cqe = wc->wr_cqe;
-	struct svc_rdma_op_ctxt *ctxt;
-
-	svc_rdma_send_wc_common_put(cq, wc, "write");
-
-	ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
-	svc_rdma_unmap_dma(ctxt);
-	svc_rdma_put_context(ctxt, 0);
-}
-
-/**
  * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC
  * @cq:        completion queue
  * @wc:        completed WR
@@ -561,14 +464,14 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
-	INIT_LIST_HEAD(&cma_xprt->sc_maps);
+	INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
 	init_waitqueue_head(&cma_xprt->sc_send_wait);
 
 	spin_lock_init(&cma_xprt->sc_lock);
 	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 	spin_lock_init(&cma_xprt->sc_frmr_q_lock);
 	spin_lock_init(&cma_xprt->sc_ctxt_lock);
-	spin_lock_init(&cma_xprt->sc_map_lock);
+	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 
 	/*
 	 * Note that this implies that the underlying transport support
@@ -999,6 +902,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 		newxprt, newxprt->sc_cm_id);
 
 	dev = newxprt->sc_cm_id->device;
+	newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
 
 	/* Qualify the transport resource defaults with the
 	 * capabilities of this particular device */
@@ -1014,13 +918,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 					    svcrdma_max_bc_requests);
 	newxprt->sc_rq_depth = newxprt->sc_max_requests +
 			       newxprt->sc_max_bc_requests;
-	newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
+	newxprt->sc_sq_depth = newxprt->sc_rq_depth;
 	atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
 
 	if (!svc_rdma_prealloc_ctxts(newxprt))
 		goto errout;
-	if (!svc_rdma_prealloc_maps(newxprt))
-		goto errout;
 
 	/*
 	 * Limit ORD based on client limit, local device limit, and
@@ -1050,6 +952,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 	memset(&qp_attr, 0, sizeof qp_attr);
 	qp_attr.event_handler = qp_event_handler;
 	qp_attr.qp_context = &newxprt->sc_xprt;
+	qp_attr.port_num = newxprt->sc_cm_id->port_num;
+	qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests;
 	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
 	qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth;
 	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
@@ -1248,8 +1152,8 @@ static void __svc_rdma_free(struct work_struct *work)
 	}
 
 	rdma_dealloc_frmr_q(rdma);
+	svc_rdma_destroy_rw_ctxts(rdma);
 	svc_rdma_destroy_ctxts(rdma);
-	svc_rdma_destroy_maps(rdma);
 
 	/* Destroy the QP if present (not a listener) */
 	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))