summary refs log tree commit diff
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-11-09 18:11:22 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2015-11-09 18:11:22 -0800
commite6604ecb70d4b1dbc0372c6518b51c25c4b135a1 (patch)
tree2d12c51b84c3ba8472e59ddbe37da034e2c5251f /net/sunrpc/xprtrdma
parent9d74288ca79249af4b906215788b37d52263b58b (diff)
parent941c3ff3102ccce440034d59cf9e4e9cc10b720d (diff)
downloadlinux-e6604ecb70d4b1dbc0372c6518b51c25c4b135a1.tar.gz
Merge tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  New features:
   - RDMA client backchannel from Chuck
   - Support for NFSv4.2 file CLONE using the btrfs ioctl

  Bugfixes + cleanups:
   - Move socket data receive out of the bottom halves and into a
     workqueue
   - Refactor NFSv4 error handling so synchronous and asynchronous RPC
     handles errors identically.
   - Fix a panic when blocks or object layouts reads return a bad data
     length
   - Fix nfsroot so it can handle a 1024 byte long path.
   - Fix bad usage of page offset in bl_read_pagelist
   - Various NFSv4 callback cleanups+fixes
   - Fix GETATTR bitmap verification
   - Support hexadecimal number for sunrpc debug sysctl files"

* tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (53 commits)
  Sunrpc: Supports hexadecimal number for sysctl files of sunrpc debug
  nfs: Fix GETATTR bitmap verification
  nfs: Remove unused xdr page offsets in getacl/setacl arguments
  fs/nfs: remove unnecessary new_valid_dev check
  SUNRPC: fix variable type
  NFS: Enable client side NFSv4.1 backchannel to use other transports
  pNFS/flexfiles: Add support for FF_FLAGS_NO_IO_THRU_MDS
  pNFS/flexfiles: When mirrored, retry failed reads by switching mirrors
  SUNRPC: Remove the TCP-only restriction in bc_svc_process()
  svcrdma: Add backward direction service for RPC/RDMA transport
  xprtrdma: Handle incoming backward direction RPC calls
  xprtrdma: Add support for sending backward direction RPC replies
  xprtrdma: Pre-allocate Work Requests for backchannel
  xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers
  SUNRPC: Abstract backchannel operations
  xprtrdma: Saving IRQs no longer needed for rb_lock
  xprtrdma: Remove reply tasklet
  xprtrdma: Use workqueue to process RPC/RDMA replies
  xprtrdma: Replace send and receive arrays
  xprtrdma: Refactor reply handler error handling
  ...
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/Makefile1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c394
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c7
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c58
-rw-r--r--net/sunrpc/xprtrdma/transport.c18
-rw-r--r--net/sunrpc/xprtrdma/verbs.c479
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h54
9 files changed, 867 insertions, 298 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de240bd..33f99d3004f2 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
 	svc_rdma.o svc_rdma_transport.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
 	module.o
+rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 000000000000..2dcb44f69e53
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,394 @@
+/*
+ * Copyright (c) 2015 Oracle.  All rights reserved.
+ *
+ * Support for backward direction RPCs on RPC/RDMA.
+ */
+
+#include <linux/module.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/svc.h>
+#include <linux/sunrpc/svc_xprt.h>
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY	RPCDBG_TRANS
+#endif
+
+#define RPCRDMA_BACKCHANNEL_DEBUG
+
+static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
+				 struct rpc_rqst *rqst)
+{
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+	spin_lock(&buf->rb_reqslock);
+	list_del(&req->rl_all);
+	spin_unlock(&buf->rb_reqslock);
+
+	rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+
+	kfree(rqst);
+}
+
+static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
+				 struct rpc_rqst *rqst)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_regbuf *rb;
+	struct rpcrdma_req *req;
+	struct xdr_buf *buf;
+	size_t size;
+
+	req = rpcrdma_create_req(r_xprt);
+	if (!req)
+		return -ENOMEM;
+	req->rl_backchannel = true;
+
+	size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	if (IS_ERR(rb))
+		goto out_fail;
+	req->rl_rdmabuf = rb;
+
+	size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	if (IS_ERR(rb))
+		goto out_fail;
+	rb->rg_owner = req;
+	req->rl_sendbuf = rb;
+	/* so that rpcr_to_rdmar works when receiving a request */
+	rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
+
+	buf = &rqst->rq_snd_buf;
+	buf->head[0].iov_base = rqst->rq_buffer;
+	buf->head[0].iov_len = 0;
+	buf->tail[0].iov_base = NULL;
+	buf->tail[0].iov_len = 0;
+	buf->page_len = 0;
+	buf->len = 0;
+	buf->buflen = size;
+
+	return 0;
+
+out_fail:
+	rpcrdma_bc_free_rqst(r_xprt, rqst);
+	return -ENOMEM;
+}
+
+/* Allocate and add receive buffers to the rpcrdma_buffer's
+ * existing list of rep's. These are released when the
+ * transport is destroyed.
+ */
+static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
+				 unsigned int count)
+{
+	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+	struct rpcrdma_rep *rep;
+	unsigned long flags;
+	int rc = 0;
+
+	while (count--) {
+		rep = rpcrdma_create_rep(r_xprt);
+		if (IS_ERR(rep)) {
+			pr_err("RPC:       %s: reply buffer alloc failed\n",
+			       __func__);
+			rc = PTR_ERR(rep);
+			break;
+		}
+
+		spin_lock_irqsave(&buffers->rb_lock, flags);
+		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	}
+
+	return rc;
+}
+
+/**
+ * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of concurrent incoming requests to expect
+ *
+ * Returns 0 on success; otherwise a negative errno
+ */
+int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
+	struct rpc_rqst *rqst;
+	unsigned int i;
+	int rc;
+
+	/* The backchannel reply path returns each rpc_rqst to the
+	 * bc_pa_list _after_ the reply is sent. If the server is
+	 * faster than the client, it can send another backward
+	 * direction request before the rpc_rqst is returned to the
+	 * list. The client rejects the request in this case.
+	 *
+	 * Twice as many rpc_rqsts are prepared to ensure there is
+	 * always an rpc_rqst available as soon as a reply is sent.
+	 */
+	if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
+		goto out_err;
+
+	for (i = 0; i < (reqs << 1); i++) {
+		rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
+		if (!rqst) {
+			pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
+			       __func__);
+			goto out_free;
+		}
+
+		rqst->rq_xprt = &r_xprt->rx_xprt;
+		INIT_LIST_HEAD(&rqst->rq_list);
+		INIT_LIST_HEAD(&rqst->rq_bc_list);
+
+		if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
+			goto out_free;
+
+		spin_lock_bh(&xprt->bc_pa_lock);
+		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+		spin_unlock_bh(&xprt->bc_pa_lock);
+	}
+
+	rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
+	if (rc)
+		goto out_free;
+
+	rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
+	if (rc)
+		goto out_free;
+
+	buffer->rb_bc_srv_max_requests = reqs;
+	request_module("svcrdma");
+
+	return 0;
+
+out_free:
+	xprt_rdma_bc_destroy(xprt, reqs);
+
+out_err:
+	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
+	return -ENOMEM;
+}
+
+/**
+ * xprt_rdma_bc_up - Create transport endpoint for backchannel service
+ * @serv: server endpoint
+ * @net: network namespace
+ *
+ * The "xprt" is an implied argument: it supplies the name of the
+ * backchannel transport class.
+ *
+ * Returns zero on success, negative errno on failure
+ */
+int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
+{
+	int ret;
+
+	ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
+	if (ret < 0)
+		return ret;
+	return 0;
+}
+
+/**
+ * rpcrdma_bc_marshal_reply - Send backwards direction reply
+ * @rqst: buffer containing RPC reply data
+ *
+ * Returns zero on success.
+ */
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct rpcrdma_msg *headerp;
+	size_t rpclen;
+
+	headerp = rdmab_to_msg(req->rl_rdmabuf);
+	headerp->rm_xid = rqst->rq_xid;
+	headerp->rm_vers = rpcrdma_version;
+	headerp->rm_credit =
+			cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
+	headerp->rm_type = rdma_msg;
+	headerp->rm_body.rm_chunks[0] = xdr_zero;
+	headerp->rm_body.rm_chunks[1] = xdr_zero;
+	headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+	rpclen = rqst->rq_svec[0].iov_len;
+
+	pr_info("RPC:       %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
+		__func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
+	pr_info("RPC:       %s: RPC/RDMA: %*ph\n",
+		__func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
+	pr_info("RPC:       %s:      RPC: %*ph\n",
+		__func__, (int)rpclen, rqst->rq_svec[0].iov_base);
+
+	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
+	req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
+	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+
+	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
+	req->rl_send_iov[1].length = rpclen;
+	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
+
+	req->rl_niovs = 2;
+	return 0;
+}
+
+/**
+ * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of incoming requests to destroy; ignored
+ */
+void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpc_rqst *rqst, *tmp;
+
+	spin_lock_bh(&xprt->bc_pa_lock);
+	list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
+		list_del(&rqst->rq_bc_pa_list);
+		spin_unlock_bh(&xprt->bc_pa_lock);
+
+		rpcrdma_bc_free_rqst(r_xprt, rqst);
+
+		spin_lock_bh(&xprt->bc_pa_lock);
+	}
+	spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * xprt_rdma_bc_free_rqst - Release a backchannel rqst
+ * @rqst: request to release
+ */
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+
+	smp_mb__before_atomic();
+	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
+	clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+	smp_mb__after_atomic();
+
+	spin_lock_bh(&xprt->bc_pa_lock);
+	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+	spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * rpcrdma_bc_receive_call - Handle a backward direction call
+ * @xprt: transport receiving the call
+ * @rep: receive buffer containing the call
+ *
+ * Called in the RPC reply handler, which runs in a tasklet.
+ * Be quick about it.
+ *
+ * Operational assumptions:
+ *    o Backchannel credits are ignored, just as the NFS server
+ *      forechannel currently does
+ *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
+ *      No replay detection is done at the transport level
+ */
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
+			     struct rpcrdma_rep *rep)
+{
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+	struct rpcrdma_msg *headerp;
+	struct svc_serv *bc_serv;
+	struct rpcrdma_req *req;
+	struct rpc_rqst *rqst;
+	struct xdr_buf *buf;
+	size_t size;
+	__be32 *p;
+
+	headerp = rdmab_to_msg(rep->rr_rdmabuf);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+	pr_info("RPC:       %s: callback XID %08x, length=%u\n",
+		__func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
+	pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
+#endif
+
+	/* Sanity check:
+	 * Need at least enough bytes for RPC/RDMA header, as code
+	 * here references the header fields by array offset. Also,
+	 * backward calls are always inline, so ensure there
+	 * are some bytes beyond the RPC/RDMA header.
+	 */
+	if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
+		goto out_short;
+	p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
+	size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
+
+	/* Grab a free bc rqst */
+	spin_lock(&xprt->bc_pa_lock);
+	if (list_empty(&xprt->bc_pa_list)) {
+		spin_unlock(&xprt->bc_pa_lock);
+		goto out_overflow;
+	}
+	rqst = list_first_entry(&xprt->bc_pa_list,
+				struct rpc_rqst, rq_bc_pa_list);
+	list_del(&rqst->rq_bc_pa_list);
+	spin_unlock(&xprt->bc_pa_lock);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+	pr_info("RPC:       %s: using rqst %p\n", __func__, rqst);
+#endif
+
+	/* Prepare rqst */
+	rqst->rq_reply_bytes_recvd = 0;
+	rqst->rq_bytes_sent = 0;
+	rqst->rq_xid = headerp->rm_xid;
+	set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+
+	buf = &rqst->rq_rcv_buf;
+	memset(buf, 0, sizeof(*buf));
+	buf->head[0].iov_base = p;
+	buf->head[0].iov_len = size;
+	buf->len = size;
+
+	/* The receive buffer has to be hooked to the rpcrdma_req
+	 * so that it can be reposted after the server is done
+	 * parsing it but just before sending the backward
+	 * direction reply.
+	 */
+	req = rpcr_to_rdmar(rqst);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+	pr_info("RPC:       %s: attaching rep %p to req %p\n",
+		__func__, rep, req);
+#endif
+	req->rl_reply = rep;
+
+	/* Defeat the retransmit detection logic in send_request */
+	req->rl_connect_cookie = 0;
+
+	/* Queue rqst for ULP's callback service */
+	bc_serv = xprt->bc_serv;
+	spin_lock(&bc_serv->sv_cb_lock);
+	list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
+	spin_unlock(&bc_serv->sv_cb_lock);
+
+	wake_up(&bc_serv->sv_cb_waitq);
+
+	r_xprt->rx_stats.bcall_count++;
+	return;
+
+out_overflow:
+	pr_warn("RPC/RDMA backchannel overflow\n");
+	xprt_disconnect_done(xprt);
+	/* This receive buffer gets reposted automatically
+	 * when the connection is re-established.
+	 */
+	return;
+
+out_short:
+	pr_warn("RPC/RDMA short backward direction call\n");
+
+	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+		xprt_disconnect_done(xprt);
+	else
+		pr_warn("RPC:       %s: reposting rep %p\n",
+			__func__, rep);
+}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a1434447b0d6..88cf9e7269c2 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -256,8 +256,11 @@ frwr_sendcompletion(struct ib_wc *wc)
 
 	/* WARNING: Only wr_id and status are reliable at this point */
 	r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-	pr_warn("RPC:       %s: frmr %p flushed, status %s (%d)\n",
-		__func__, r, ib_wc_status_msg(wc->status), wc->status);
+	if (wc->status == IB_WC_WR_FLUSH_ERR)
+		dprintk("RPC:       %s: frmr %p flushed\n", __func__, r);
+	else
+		pr_warn("RPC:       %s: frmr %p error, status %s (%d)\n",
+			__func__, r, ib_wc_status_msg(wc->status), wc->status);
 	r->r.frmr.fr_state = FRMR_IS_STALE;
 }
 
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd6577467..c10d9699441c 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
+		return rpcrdma_bc_marshal_reply(rqst);
+#endif
+
 	/*
 	 * rpclen gets amount of data in first buffer, which is the
 	 * pre-registered buffer.
@@ -711,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
 	spin_unlock_bh(&xprt->transport_lock);
 }
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+{
+	__be32 *p = (__be32 *)headerp;
+
+	if (headerp->rm_type != rdma_msg)
+		return false;
+	if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+		return false;
+	if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+		return false;
+	if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+		return false;
+
+	/* sanity */
+	if (p[7] != headerp->rm_xid)
+		return false;
+	/* call direction */
+	if (p[8] != cpu_to_be32(RPC_CALL))
+		return false;
+
+	return true;
+}
+#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
+
 /*
  * This function is called when an async event is posted to
  * the connection which changes the connection state. All it
@@ -723,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
 	schedule_delayed_work(&ep->rep_connect_worker, 0);
 }
 
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
  */
@@ -741,52 +777,32 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	unsigned long cwnd;
 	u32 credits;
 
-	/* Check status. If bad, signal disconnect and return rep to pool */
-	if (rep->rr_len == ~0U) {
-		rpcrdma_recv_buffer_put(rep);
-		if (r_xprt->rx_ep.rep_connected == 1) {
-			r_xprt->rx_ep.rep_connected = -EIO;
-			rpcrdma_conn_func(&r_xprt->rx_ep);
-		}
-		return;
-	}
-	if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
-		dprintk("RPC:       %s: short/invalid reply\n", __func__);
-		goto repost;
-	}
+	dprintk("RPC:       %s: incoming rep %p\n", __func__, rep);
+
+	if (rep->rr_len == RPCRDMA_BAD_LEN)
+		goto out_badstatus;
+	if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+		goto out_shortreply;
+
 	headerp = rdmab_to_msg(rep->rr_rdmabuf);
-	if (headerp->rm_vers != rpcrdma_version) {
-		dprintk("RPC:       %s: invalid version %d\n",
-			__func__, be32_to_cpu(headerp->rm_vers));
-		goto repost;
-	}
+	if (headerp->rm_vers != rpcrdma_version)
+		goto out_badversion;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	if (rpcrdma_is_bcall(headerp))
+		goto out_bcall;
+#endif
 
-	/* Get XID and try for a match. */
-	spin_lock(&xprt->transport_lock);
+	/* Match incoming rpcrdma_rep to an rpcrdma_req to
+	 * get context for handling any incoming chunks.
+	 */
+	spin_lock_bh(&xprt->transport_lock);
 	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-	if (rqst == NULL) {
-		spin_unlock(&xprt->transport_lock);
-		dprintk("RPC:       %s: reply 0x%p failed "
-			"to match any request xid 0x%08x len %d\n",
-			__func__, rep, be32_to_cpu(headerp->rm_xid),
-			rep->rr_len);
-repost:
-		r_xprt->rx_stats.bad_reply_count++;
-		if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
-			rpcrdma_recv_buffer_put(rep);
+	if (!rqst)
+		goto out_nomatch;
 
-		return;
-	}
-
-	/* get request object */
 	req = rpcr_to_rdmar(rqst);
-	if (req->rl_reply) {
-		spin_unlock(&xprt->transport_lock);
-		dprintk("RPC:       %s: duplicate reply 0x%p to RPC "
-			"request 0x%p: xid 0x%08x\n", __func__, rep, req,
-			be32_to_cpu(headerp->rm_xid));
-		goto repost;
-	}
+	if (req->rl_reply)
+		goto out_duplicate;
 
 	dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
 		"                   RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +899,50 @@ badheader:
 	if (xprt->cwnd > cwnd)
 		xprt_release_rqst_cong(rqst->rq_task);
 
+	xprt_complete_rqst(rqst->rq_task, status);
+	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 			__func__, xprt, rqst, status);
-	xprt_complete_rqst(rqst->rq_task, status);
-	spin_unlock(&xprt->transport_lock);
+	return;
+
+out_badstatus:
+	rpcrdma_recv_buffer_put(rep);
+	if (r_xprt->rx_ep.rep_connected == 1) {
+		r_xprt->rx_ep.rep_connected = -EIO;
+		rpcrdma_conn_func(&r_xprt->rx_ep);
+	}
+	return;
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+out_bcall:
+	rpcrdma_bc_receive_call(r_xprt, rep);
+	return;
+#endif
+
+out_shortreply:
+	dprintk("RPC:       %s: short/invalid reply\n", __func__);
+	goto repost;
+
+out_badversion:
+	dprintk("RPC:       %s: invalid version %d\n",
+		__func__, be32_to_cpu(headerp->rm_vers));
+	goto repost;
+
+out_nomatch:
+	spin_unlock_bh(&xprt->transport_lock);
+	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
+		__func__, be32_to_cpu(headerp->rm_xid),
+		rep->rr_len);
+	goto repost;
+
+out_duplicate:
+	spin_unlock_bh(&xprt->transport_lock);
+	dprintk("RPC:       %s: "
+		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
+		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
+
+repost:
+	r_xprt->rx_stats.bad_reply_count++;
+	if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+		rpcrdma_recv_buffer_put(rep);
 }
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f023a5..1b7051bdbdc8 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
 		unregister_sysctl_table(svcrdma_table_header);
 		svcrdma_table_header = NULL;
 	}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	svc_unreg_xprt_class(&svc_rdma_bc_class);
+#endif
 	svc_unreg_xprt_class(&svc_rdma_class);
 	kmem_cache_destroy(svc_rdma_map_cachep);
 	kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
 
 	/* Register RDMA with the SVC transport switch */
 	svc_reg_xprt_class(&svc_rdma_class);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	svc_reg_xprt_class(&svc_rdma_bc_class);
+#endif
 	return 0;
  err1:
 	kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index a266e870d870..b348b4adef29 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
 static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
 					struct net *net,
 					struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
 	.xcl_ident = XPRT_TRANSPORT_RDMA,
 };
 
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
+					   struct sockaddr *, int, int);
+static void svc_rdma_bc_detach(struct svc_xprt *);
+static void svc_rdma_bc_free(struct svc_xprt *);
+
+static struct svc_xprt_ops svc_rdma_bc_ops = {
+	.xpo_create = svc_rdma_bc_create,
+	.xpo_detach = svc_rdma_bc_detach,
+	.xpo_free = svc_rdma_bc_free,
+	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+	.xpo_secure_port = svc_rdma_secure_port,
+};
+
+struct svc_xprt_class svc_rdma_bc_class = {
+	.xcl_name = "rdma-bc",
+	.xcl_owner = THIS_MODULE,
+	.xcl_ops = &svc_rdma_bc_ops,
+	.xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
+};
+
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
+					   struct net *net,
+					   struct sockaddr *sa, int salen,
+					   int flags)
+{
+	struct svcxprt_rdma *cma_xprt;
+	struct svc_xprt *xprt;
+
+	cma_xprt = rdma_create_xprt(serv, 0);
+	if (!cma_xprt)
+		return ERR_PTR(-ENOMEM);
+	xprt = &cma_xprt->sc_xprt;
+
+	svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
+	serv->sv_bc_xprt = xprt;
+
+	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+	return xprt;
+}
+
+static void svc_rdma_bc_detach(struct svc_xprt *xprt)
+{
+	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+}
+
+static void svc_rdma_bc_free(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+	dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+	if (xprt)
+		kfree(rdma);
+}
+#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
+
 struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
 {
 	struct svc_rdma_op_ctxt *ctxt;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452bc580c..8c545f7d7525 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 static int
 xprt_rdma_enable_swap(struct rpc_xprt *xprt)
 {
-	return -EINVAL;
+	return 0;
 }
 
 static void
@@ -705,7 +705,13 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
 	.print_stats		= xprt_rdma_print_stats,
 	.enable_swap		= xprt_rdma_enable_swap,
 	.disable_swap		= xprt_rdma_disable_swap,
-	.inject_disconnect	= xprt_rdma_inject_disconnect
+	.inject_disconnect	= xprt_rdma_inject_disconnect,
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	.bc_setup		= xprt_rdma_bc_setup,
+	.bc_up			= xprt_rdma_bc_up,
+	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
+	.bc_destroy		= xprt_rdma_bc_destroy,
+#endif
 };
 
 static struct xprt_class xprt_rdma = {
@@ -732,6 +738,7 @@ void xprt_rdma_cleanup(void)
 		dprintk("RPC:       %s: xprt_unregister returned %i\n",
 			__func__, rc);
 
+	rpcrdma_destroy_wq();
 	frwr_destroy_recovery_wq();
 }
 
@@ -743,8 +750,15 @@ int xprt_rdma_init(void)
 	if (rc)
 		return rc;
 
+	rc = rpcrdma_alloc_wq();
+	if (rc) {
+		frwr_destroy_recovery_wq();
+		return rc;
+	}
+
 	rc = xprt_register_transport(&xprt_rdma);
 	if (rc) {
+		rpcrdma_destroy_wq();
 		frwr_destroy_recovery_wq();
 		return rc;
 	}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f63369bd01c5..eadd1655145a 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -68,47 +68,33 @@
  * internal functions
  */
 
-/*
- * handle replies in tasklet context, using a single, global list
- * rdma tasklet function -- just turn around and call the func
- * for all replies on the list
- */
-
-static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
-static LIST_HEAD(rpcrdma_tasklets_g);
+static struct workqueue_struct *rpcrdma_receive_wq;
 
-static void
-rpcrdma_run_tasklet(unsigned long data)
+int
+rpcrdma_alloc_wq(void)
 {
-	struct rpcrdma_rep *rep;
-	unsigned long flags;
-
-	data = data;
-	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-	while (!list_empty(&rpcrdma_tasklets_g)) {
-		rep = list_entry(rpcrdma_tasklets_g.next,
-				 struct rpcrdma_rep, rr_list);
-		list_del(&rep->rr_list);
-		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+	struct workqueue_struct *recv_wq;
 
-		rpcrdma_reply_handler(rep);
+	recv_wq = alloc_workqueue("xprtrdma_receive",
+				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+				  0);
+	if (!recv_wq)
+		return -ENOMEM;
 
-		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-	}
-	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+	rpcrdma_receive_wq = recv_wq;
+	return 0;
 }
 
-static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
-
-static void
-rpcrdma_schedule_tasklet(struct list_head *sched_list)
+void
+rpcrdma_destroy_wq(void)
 {
-	unsigned long flags;
+	struct workqueue_struct *wq;
 
-	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-	list_splice_tail(sched_list, &rpcrdma_tasklets_g);
-	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-	tasklet_schedule(&rpcrdma_tasklet_g);
+	if (rpcrdma_receive_wq) {
+		wq = rpcrdma_receive_wq;
+		rpcrdma_receive_wq = NULL;
+		destroy_workqueue(wq);
+	}
 }
 
 static void
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 	}
 }
 
-static int
-rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The common case is a single send completion is waiting. By
+ * passing two WC entries to ib_poll_cq, a return code of 1
+ * means there is exactly one WC waiting and no more. We don't
+ * have to invoke ib_poll_cq again to know that the CQ has been
+ * properly drained.
+ */
+static void
+rpcrdma_sendcq_poll(struct ib_cq *cq)
 {
-	struct ib_wc *wcs;
-	int budget, count, rc;
+	struct ib_wc *pos, wcs[2];
+	int count, rc;
 
-	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
 	do {
-		wcs = ep->rep_send_wcs;
+		pos = wcs;
 
-		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-		if (rc <= 0)
-			return rc;
+		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+		if (rc < 0)
+			break;
 
 		count = rc;
 		while (count-- > 0)
-			rpcrdma_sendcq_process_wc(wcs++);
-	} while (rc == RPCRDMA_POLLSIZE && --budget);
-	return 0;
+			rpcrdma_sendcq_process_wc(pos++);
+	} while (rc == ARRAY_SIZE(wcs));
+	return;
 }
 
-/*
- * Handle send, fast_reg_mr, and local_inv completions.
- *
- * Send events are typically suppressed and thus do not result
- * in an upcall. Occasionally one is signaled, however. This
- * prevents the provider's completion queue from wrapping and
- * losing a completion.
+/* Handle provider send completion upcalls.
  */
 static void
 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-	int rc;
-
-	rc = rpcrdma_sendcq_poll(cq, ep);
-	if (rc) {
-		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
+	do {
+		rpcrdma_sendcq_poll(cq);
+	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
+}
 
-	rc = ib_req_notify_cq(cq,
-			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-	if (rc == 0)
-		return;
-	if (rc < 0) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
+static void
+rpcrdma_receive_worker(struct work_struct *work)
+{
+	struct rpcrdma_rep *rep =
+			container_of(work, struct rpcrdma_rep, rr_work);
 
-	rpcrdma_sendcq_poll(cq, ep);
+	rpcrdma_reply_handler(rep);
 }
 
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
 {
 	struct rpcrdma_rep *rep =
 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
 	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
-	list_add_tail(&rep->rr_list, sched_list);
+	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 	return;
+
 out_fail:
 	if (wc->status != IB_WC_WR_FLUSH_ERR)
 		pr_err("RPC:       %s: rep %p: %s\n",
 		       __func__, rep, ib_wc_status_msg(wc->status));
-	rep->rr_len = ~0U;
+	rep->rr_len = RPCRDMA_BAD_LEN;
 	goto out_schedule;
 }
 
-static int
-rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * struct ib_wc is 64 bytes, making the poll array potentially
+ * large. But this is at the bottom of the call chain. Further
+ * substantial work is done in another thread.
+ */
+static void
+rpcrdma_recvcq_poll(struct ib_cq *cq)
 {
-	struct list_head sched_list;
-	struct ib_wc *wcs;
-	int budget, count, rc;
+	struct ib_wc *pos, wcs[4];
+	int count, rc;
 
-	INIT_LIST_HEAD(&sched_list);
-	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
 	do {
-		wcs = ep->rep_recv_wcs;
+		pos = wcs;
 
-		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-		if (rc <= 0)
-			goto out_schedule;
+		rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+		if (rc < 0)
+			break;
 
 		count = rc;
 		while (count-- > 0)
-			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
-	} while (rc == RPCRDMA_POLLSIZE && --budget);
-	rc = 0;
-
-out_schedule:
-	rpcrdma_schedule_tasklet(&sched_list);
-	return rc;
+			rpcrdma_recvcq_process_wc(pos++);
+	} while (rc == ARRAY_SIZE(wcs));
 }
 
-/*
- * Handle receive completions.
- *
- * It is reentrant but processes single events in order to maintain
- * ordering of receives to keep server credits.
- *
- * It is the responsibility of the scheduled tasklet to return
- * recv buffers to the pool. NOTE: this affects synchronization of
- * connection shutdown. That is, the structures required for
- * the completion of the reply handler must remain intact until
- * all memory has been reclaimed.
+/* Handle provider receive completion upcalls.
  */
 static void
 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
 {
-	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-	int rc;
-
-	rc = rpcrdma_recvcq_poll(cq, ep);
-	if (rc) {
-		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
-
-	rc = ib_req_notify_cq(cq,
-			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-	if (rc == 0)
-		return;
-	if (rc < 0) {
-		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-			__func__, rc);
-		return;
-	}
-
-	rpcrdma_recvcq_poll(cq, ep);
+	do {
+		rpcrdma_recvcq_poll(cq);
+	} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+				  IB_CQ_REPORT_MISSED_EVENTS) > 0);
 }
 
 static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
 	struct ib_wc wc;
-	LIST_HEAD(sched_list);
 
 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-		rpcrdma_recvcq_process_wc(&wc, &sched_list);
-	if (!list_empty(&sched_list))
-		rpcrdma_schedule_tasklet(&sched_list);
+		rpcrdma_recvcq_process_wc(&wc);
 	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
 		rpcrdma_sendcq_process_wc(&wc);
 }
@@ -623,6 +569,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	struct ib_device_attr *devattr = &ia->ri_devattr;
 	struct ib_cq *sendcq, *recvcq;
 	struct ib_cq_init_attr cq_attr = {};
+	unsigned int max_qp_wr;
 	int rc, err;
 
 	if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -631,18 +578,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 		return -ENOMEM;
 	}
 
+	if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+		dprintk("RPC:       %s: insufficient wqe's available\n",
+			__func__);
+		return -ENOMEM;
+	}
+	max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
+
 	/* check provider's send/recv wr limits */
-	if (cdata->max_requests > devattr->max_qp_wr)
-		cdata->max_requests = devattr->max_qp_wr;
+	if (cdata->max_requests > max_qp_wr)
+		cdata->max_requests = max_qp_wr;
 
 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 	ep->rep_attr.qp_context = ep;
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
 	rc = ia->ri_ops->ro_open(ia, ep, cdata);
 	if (rc)
 		return rc;
 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
 	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
 	ep->rep_attr.cap.max_recv_sge = 1;
 	ep->rep_attr.cap.max_inline_data = 0;
@@ -670,7 +626,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
 	cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
 	sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
-			      rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
 	if (IS_ERR(sendcq)) {
 		rc = PTR_ERR(sendcq);
 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -687,7 +643,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 
 	cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
 	recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
-			      rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+			      rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
 	if (IS_ERR(recvcq)) {
 		rc = PTR_ERR(recvcq);
 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -886,7 +842,21 @@ retry:
 		}
 		rc = ep->rep_connected;
 	} else {
+		struct rpcrdma_xprt *r_xprt;
+		unsigned int extras;
+
 		dprintk("RPC:       %s: connected\n", __func__);
+
+		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+		if (extras) {
+			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+			if (rc)
+				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+					__func__, rc);
+				rc = 0;
+		}
 	}
 
 out:
@@ -923,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	}
 }
 
-static struct rpcrdma_req *
+struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
+	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 	struct rpcrdma_req *req;
 
 	req = kzalloc(sizeof(*req), GFP_KERNEL);
 	if (req == NULL)
 		return ERR_PTR(-ENOMEM);
 
+	INIT_LIST_HEAD(&req->rl_free);
+	spin_lock(&buffer->rb_reqslock);
+	list_add(&req->rl_all, &buffer->rb_allreqs);
+	spin_unlock(&buffer->rb_reqslock);
 	req->rl_buffer = &r_xprt->rx_buf;
 	return req;
 }
 
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -958,6 +933,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 
 	rep->rr_device = ia->ri_device;
 	rep->rr_rxprt = r_xprt;
+	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
 	return rep;
 
 out_free:
@@ -971,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-	char *p;
-	size_t len;
 	int i, rc;
 
-	buf->rb_max_requests = cdata->max_requests;
+	buf->rb_max_requests = r_xprt->rx_data.max_requests;
+	buf->rb_bc_srv_max_requests = 0;
 	spin_lock_init(&buf->rb_lock);
 
-	/* Need to allocate:
-	 *   1.  arrays for send and recv pointers
-	 *   2.  arrays of struct rpcrdma_req to fill in pointers
-	 *   3.  array of struct rpcrdma_rep for replies
-	 * Send/recv buffers in req/rep need to be registered
-	 */
-	len = buf->rb_max_requests *
-		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-
-	p = kzalloc(len, GFP_KERNEL);
-	if (p == NULL) {
-		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
-			__func__, len);
-		rc = -ENOMEM;
-		goto out;
-	}
-	buf->rb_pool = p;	/* for freeing it later */
-
-	buf->rb_send_bufs = (struct rpcrdma_req **) p;
-	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
-	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
-	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
-
 	rc = ia->ri_ops->ro_init(r_xprt);
 	if (rc)
 		goto out;
 
+	INIT_LIST_HEAD(&buf->rb_send_bufs);
+	INIT_LIST_HEAD(&buf->rb_allreqs);
+	spin_lock_init(&buf->rb_reqslock);
 	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_req *req;
-		struct rpcrdma_rep *rep;
 
 		req = rpcrdma_create_req(r_xprt);
 		if (IS_ERR(req)) {
@@ -1017,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 			rc = PTR_ERR(req);
 			goto out;
 		}
-		buf->rb_send_bufs[i] = req;
+		req->rl_backchannel = false;
+		list_add(&req->rl_free, &buf->rb_send_bufs);
+	}
+
+	INIT_LIST_HEAD(&buf->rb_recv_bufs);
+	for (i = 0; i < buf->rb_max_requests + 2; i++) {
+		struct rpcrdma_rep *rep;
 
 		rep = rpcrdma_create_rep(r_xprt);
 		if (IS_ERR(rep)) {
@@ -1026,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 			rc = PTR_ERR(rep);
 			goto out;
 		}
-		buf->rb_recv_bufs[i] = rep;
+		list_add(&rep->rr_list, &buf->rb_recv_bufs);
 	}
 
 	return 0;
@@ -1035,22 +994,38 @@ out:
 	return rc;
 }
 
+static struct rpcrdma_req *
+rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
+{
+	struct rpcrdma_req *req;
+
+	req = list_first_entry(&buf->rb_send_bufs,
+			       struct rpcrdma_req, rl_free);
+	list_del(&req->rl_free);
+	return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
+{
+	struct rpcrdma_rep *rep;
+
+	rep = list_first_entry(&buf->rb_recv_bufs,
+			       struct rpcrdma_rep, rr_list);
+	list_del(&rep->rr_list);
+	return rep;
+}
+
 static void
 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 {
-	if (!rep)
-		return;
-
 	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
 	kfree(rep);
 }
 
-static void
+void
 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 {
-	if (!req)
-		return;
-
 	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
 	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
 	kfree(req);
@@ -1060,25 +1035,29 @@ void
 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-	int i;
 
-	/* clean up in reverse order from create
-	 *   1.  recv mr memory (mr free, then kfree)
-	 *   2.  send mr memory (mr free, then kfree)
-	 *   3.  MWs
-	 */
-	dprintk("RPC:       %s: entering\n", __func__);
+	while (!list_empty(&buf->rb_recv_bufs)) {
+		struct rpcrdma_rep *rep;
 
-	for (i = 0; i < buf->rb_max_requests; i++) {
-		if (buf->rb_recv_bufs)
-			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
-		if (buf->rb_send_bufs)
-			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+		rep = rpcrdma_buffer_get_rep_locked(buf);
+		rpcrdma_destroy_rep(ia, rep);
 	}
 
-	ia->ri_ops->ro_destroy(buf);
+	spin_lock(&buf->rb_reqslock);
+	while (!list_empty(&buf->rb_allreqs)) {
+		struct rpcrdma_req *req;
+
+		req = list_first_entry(&buf->rb_allreqs,
+				       struct rpcrdma_req, rl_all);
+		list_del(&req->rl_all);
+
+		spin_unlock(&buf->rb_reqslock);
+		rpcrdma_destroy_req(ia, req);
+		spin_lock(&buf->rb_reqslock);
+	}
+	spin_unlock(&buf->rb_reqslock);
 
-	kfree(buf->rb_pool);
+	ia->ri_ops->ro_destroy(buf);
 }
 
 struct rpcrdma_mw *
@@ -1110,53 +1089,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 	spin_unlock(&buf->rb_mwlock);
 }
 
-static void
-rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
-	buf->rb_send_bufs[--buf->rb_send_index] = req;
-	req->rl_niovs = 0;
-	if (req->rl_reply) {
-		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
-		req->rl_reply = NULL;
-	}
-}
-
 /*
  * Get a set of request/reply buffers.
  *
- * Reply buffer (if needed) is attached to send buffer upon return.
- * Rule:
- *    rb_send_index and rb_recv_index MUST always be pointing to the
- *    *next* available buffer (non-NULL). They are incremented after
- *    removing buffers, and decremented *before* returning them.
+ * Reply buffer (if available) is attached to send buffer upon return.
  */
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 {
 	struct rpcrdma_req *req;
-	unsigned long flags;
-
-	spin_lock_irqsave(&buffers->rb_lock, flags);
 
-	if (buffers->rb_send_index == buffers->rb_max_requests) {
-		spin_unlock_irqrestore(&buffers->rb_lock, flags);
-		dprintk("RPC:       %s: out of request buffers\n", __func__);
-		return ((struct rpcrdma_req *)NULL);
-	}
-
-	req = buffers->rb_send_bufs[buffers->rb_send_index];
-	if (buffers->rb_send_index < buffers->rb_recv_index) {
-		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
-			__func__,
-			buffers->rb_recv_index - buffers->rb_send_index);
-		req->rl_reply = NULL;
-	} else {
-		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-	}
-	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+	spin_lock(&buffers->rb_lock);
+	if (list_empty(&buffers->rb_send_bufs))
+		goto out_reqbuf;
+	req = rpcrdma_buffer_get_req_locked(buffers);
+	if (list_empty(&buffers->rb_recv_bufs))
+		goto out_repbuf;
+	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+	spin_unlock(&buffers->rb_lock);
+	return req;
 
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+out_reqbuf:
+	spin_unlock(&buffers->rb_lock);
+	pr_warn("RPC:       %s: out of request buffers\n", __func__);
+	return NULL;
+out_repbuf:
+	spin_unlock(&buffers->rb_lock);
+	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
+	req->rl_reply = NULL;
 	return req;
 }
 
@@ -1168,30 +1128,31 @@ void
 rpcrdma_buffer_put(struct rpcrdma_req *req)
 {
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
-	unsigned long flags;
+	struct rpcrdma_rep *rep = req->rl_reply;
 
-	spin_lock_irqsave(&buffers->rb_lock, flags);
-	rpcrdma_buffer_put_sendbuf(req, buffers);
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	req->rl_niovs = 0;
+	req->rl_reply = NULL;
+
+	spin_lock(&buffers->rb_lock);
+	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+	if (rep)
+		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+	spin_unlock(&buffers->rb_lock);
 }
 
 /*
  * Recover reply buffers from pool.
- * This happens when recovering from error conditions.
- * Post-increment counter/array index.
+ * This happens when recovering from disconnect.
  */
 void
 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
 {
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
-	unsigned long flags;
 
-	spin_lock_irqsave(&buffers->rb_lock, flags);
-	if (buffers->rb_recv_index < buffers->rb_max_requests) {
-		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-	}
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	spin_lock(&buffers->rb_lock);
+	if (!list_empty(&buffers->rb_recv_bufs))
+		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+	spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1202,11 +1163,10 @@ void
 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 {
 	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-	unsigned long flags;
 
-	spin_lock_irqsave(&buffers->rb_lock, flags);
-	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
-	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	spin_lock(&buffers->rb_lock);
+	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+	spin_unlock(&buffers->rb_lock);
 }
 
 /*
@@ -1363,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
 	return rc;
 }
 
+/**
+ * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_rep *rep;
+	unsigned long flags;
+	int rc;
+
+	while (count--) {
+		spin_lock_irqsave(&buffers->rb_lock, flags);
+		if (list_empty(&buffers->rb_recv_bufs))
+			goto out_reqbuf;
+		rep = rpcrdma_buffer_get_rep_locked(buffers);
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+		rc = rpcrdma_ep_post_recv(ia, ep, rep);
+		if (rc)
+			goto out_rc;
+	}
+
+	return 0;
+
+out_reqbuf:
+	spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	pr_warn("%s: no extra receive buffers\n", __func__);
+	return -ENOMEM;
+
+out_rc:
+	rpcrdma_recv_buffer_put(rep);
+	return rc;
+}
+
 /* How many chunk list items fit within our inline buffers?
  */
 unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c82abf44e39d..ac7f8d4f632a 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
  * RDMA Endpoint -- one per transport instance
  */
 
-#define RPCRDMA_WC_BUDGET	(128)
-#define RPCRDMA_POLLSIZE	(16)
-
 struct rpcrdma_ep {
 	atomic_t		rep_cqcount;
 	int			rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
 	struct rdma_conn_param	rep_remote_cma;
 	struct sockaddr_storage	rep_remote_addr;
 	struct delayed_work	rep_connect_worker;
-	struct ib_wc		rep_send_wcs[RPCRDMA_POLLSIZE];
-	struct ib_wc		rep_recv_wcs[RPCRDMA_POLLSIZE];
 };
 
 /*
@@ -106,6 +101,16 @@ struct rpcrdma_ep {
  */
 #define RPCRDMA_IGNORE_COMPLETION	(0ULL)
 
+/* Pre-allocate extra Work Requests for handling backward receives
+ * and sends. This is a fixed value because the Work Queues are
+ * allocated when the forward channel is set up.
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+#define RPCRDMA_BACKWARD_WRS		(8)
+#else
+#define RPCRDMA_BACKWARD_WRS		(0)
+#endif
+
 /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
  *
  * The below structure appears at the front of a large region of kmalloc'd
@@ -169,10 +174,13 @@ struct rpcrdma_rep {
 	unsigned int		rr_len;
 	struct ib_device	*rr_device;
 	struct rpcrdma_xprt	*rr_rxprt;
+	struct work_struct	rr_work;
 	struct list_head	rr_list;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 };
 
+#define RPCRDMA_BAD_LEN		(~0U)
+
 /*
  * struct rpcrdma_mw - external memory region metadata
  *
@@ -256,6 +264,7 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
 #define RPCRDMA_MAX_IOVS	(2)
 
 struct rpcrdma_req {
+	struct list_head	rl_free;
 	unsigned int		rl_niovs;
 	unsigned int		rl_nchunks;
 	unsigned int		rl_connect_cookie;
@@ -265,6 +274,9 @@ struct rpcrdma_req {
 	struct rpcrdma_regbuf	*rl_rdmabuf;
 	struct rpcrdma_regbuf	*rl_sendbuf;
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
+
+	struct list_head	rl_all;
+	bool			rl_backchannel;
 };
 
 static inline struct rpcrdma_req *
@@ -289,12 +301,14 @@ struct rpcrdma_buffer {
 	struct list_head	rb_all;
 	char			*rb_pool;
 
-	spinlock_t		rb_lock;	/* protect buf arrays */
+	spinlock_t		rb_lock;	/* protect buf lists */
+	struct list_head	rb_send_bufs;
+	struct list_head	rb_recv_bufs;
 	u32			rb_max_requests;
-	int			rb_send_index;
-	int			rb_recv_index;
-	struct rpcrdma_req	**rb_send_bufs;
-	struct rpcrdma_rep	**rb_recv_bufs;
+
+	u32			rb_bc_srv_max_requests;
+	spinlock_t		rb_reqslock;	/* protect rb_allreqs */
+	struct list_head	rb_allreqs;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
@@ -340,6 +354,7 @@ struct rpcrdma_stats {
 	unsigned long		failed_marshal_count;
 	unsigned long		bad_reply_count;
 	unsigned long		nomsg_call_count;
+	unsigned long		bcall_count;
 };
 
 /*
@@ -415,6 +430,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
+struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
+void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
@@ -431,10 +449,14 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 			 struct rpcrdma_regbuf *);
 
 unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
 int frwr_alloc_recovery_wq(void);
 void frwr_destroy_recovery_wq(void);
 
+int rpcrdma_alloc_wq(void);
+void rpcrdma_destroy_wq(void);
+
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */
@@ -495,6 +517,18 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
 int xprt_rdma_init(void);
 void xprt_rdma_cleanup(void);
 
+/* Backchannel calls - xprtrdma/backchannel.c
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
+int xprt_rdma_bc_up(struct svc_serv *, struct net *);
+int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
+void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
+#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
+
 /* Temporary NFS request map cache. Created in svc_rdma.c  */
 extern struct kmem_cache *svc_rdma_map_cachep;
 /* WR context cache. Created in svc_rdma.c  */