summary refs log tree commit diff
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/Kconfig4
-rw-r--r--net/sunrpc/Makefile2
-rw-r--r--net/sunrpc/backchannel_rqst.c7
-rw-r--r--net/sunrpc/bc_svc.c3
-rw-r--r--net/sunrpc/clnt.c15
-rw-r--r--net/sunrpc/sched.c38
-rw-r--r--net/sunrpc/svc.c6
-rw-r--r--net/sunrpc/svcsock.c14
-rw-r--r--net/sunrpc/xdr.c2
-rw-r--r--net/sunrpc/xprt.c258
-rw-r--r--net/sunrpc/xprtrdma/transport.c6
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h4
-rw-r--r--net/sunrpc/xprtsock.c57
13 files changed, 277 insertions, 139 deletions
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig
index b2198e65d8bb..ffd243d09188 100644
--- a/net/sunrpc/Kconfig
+++ b/net/sunrpc/Kconfig
@@ -4,6 +4,10 @@ config SUNRPC
 config SUNRPC_GSS
 	tristate
 
+config SUNRPC_BACKCHANNEL
+	bool
+	depends on SUNRPC
+
 config SUNRPC_XPRT_RDMA
 	tristate
 	depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS && EXPERIMENTAL
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 9d2fca5ad14a..8209a0411bca 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -13,6 +13,6 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
 	    addr.o rpcb_clnt.o timer.o xdr.o \
 	    sunrpc_syms.o cache.o rpc_pipe.o \
 	    svc_xprt.o
-sunrpc-$(CONFIG_NFS_V4_1) += backchannel_rqst.o bc_svc.o
+sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o bc_svc.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
 sunrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index cf06af3b63c6..91eaa26e4c42 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -29,8 +29,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-#if defined(CONFIG_NFS_V4_1)
-
 /*
  * Helper routines that track the number of preallocation elements
  * on the transport.
@@ -174,7 +172,7 @@ out_free:
 	dprintk("RPC:       setup backchannel transport failed\n");
 	return -1;
 }
-EXPORT_SYMBOL(xprt_setup_backchannel);
+EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
 
 /*
  * Destroys the backchannel preallocated structures.
@@ -204,7 +202,7 @@ void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
 	dprintk("RPC:        backchannel list empty= %s\n",
 		list_empty(&xprt->bc_pa_list) ? "true" : "false");
 }
-EXPORT_SYMBOL(xprt_destroy_backchannel);
+EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
 
 /*
  * One or more rpc_rqst structure have been preallocated during the
@@ -279,4 +277,3 @@ void xprt_free_bc_request(struct rpc_rqst *req)
 	spin_unlock_bh(&xprt->bc_pa_lock);
 }
 
-#endif /* CONFIG_NFS_V4_1 */
diff --git a/net/sunrpc/bc_svc.c b/net/sunrpc/bc_svc.c
index 1dd1a6890007..0b2eb388cbda 100644
--- a/net/sunrpc/bc_svc.c
+++ b/net/sunrpc/bc_svc.c
@@ -27,8 +27,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  * reply over an existing open connection previously established by the client.
  */
 
-#if defined(CONFIG_NFS_V4_1)
-
 #include <linux/module.h>
 
 #include <linux/sunrpc/xprt.h>
@@ -63,4 +61,3 @@ int bc_send(struct rpc_rqst *req)
 	return ret;
 }
 
-#endif /* CONFIG_NFS_V4_1 */
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index c50818f0473b..c5347d29cfb7 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -64,9 +64,9 @@ static void	call_decode(struct rpc_task *task);
 static void	call_bind(struct rpc_task *task);
 static void	call_bind_status(struct rpc_task *task);
 static void	call_transmit(struct rpc_task *task);
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 static void	call_bc_transmit(struct rpc_task *task);
-#endif /* CONFIG_NFS_V4_1 */
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 static void	call_status(struct rpc_task *task);
 static void	call_transmit_status(struct rpc_task *task);
 static void	call_refresh(struct rpc_task *task);
@@ -715,7 +715,7 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 }
 EXPORT_SYMBOL_GPL(rpc_call_async);
 
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /**
  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
  * rpc_execute against it
@@ -758,7 +758,7 @@ out:
 	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
 	return task;
 }
-#endif /* CONFIG_NFS_V4_1 */
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
 void
 rpc_call_start(struct rpc_task *task)
@@ -1361,7 +1361,7 @@ call_transmit_status(struct rpc_task *task)
 	}
 }
 
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /*
  * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
  * addition, disconnect on connectivity errors.
@@ -1425,7 +1425,7 @@ call_bc_transmit(struct rpc_task *task)
 	}
 	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
 }
-#endif /* CONFIG_NFS_V4_1 */
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
 /*
  * 6.	Sort out the RPC call status
@@ -1550,8 +1550,7 @@ call_decode(struct rpc_task *task)
 	kxdrdproc_t	decode = task->tk_msg.rpc_proc->p_decode;
 	__be32		*p;
 
-	dprintk("RPC: %5u call_decode (status %d)\n",
-			task->tk_pid, task->tk_status);
+	dprint_status(task);
 
 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
 		if (clnt->cl_chatty)
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 4814e246a874..d12ffa545811 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -97,14 +97,16 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
 /*
  * Add new request to a priority queue.
  */
-static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
+		struct rpc_task *task,
+		unsigned char queue_priority)
 {
 	struct list_head *q;
 	struct rpc_task *t;
 
 	INIT_LIST_HEAD(&task->u.tk_wait.links);
-	q = &queue->tasks[task->tk_priority];
-	if (unlikely(task->tk_priority > queue->maxpriority))
+	q = &queue->tasks[queue_priority];
+	if (unlikely(queue_priority > queue->maxpriority))
 		q = &queue->tasks[queue->maxpriority];
 	list_for_each_entry(t, q, u.tk_wait.list) {
 		if (t->tk_owner == task->tk_owner) {
@@ -123,12 +125,14 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
+		struct rpc_task *task,
+		unsigned char queue_priority)
 {
 	BUG_ON (RPC_IS_QUEUED(task));
 
 	if (RPC_IS_PRIORITY(queue))
-		__rpc_add_wait_queue_priority(queue, task);
+		__rpc_add_wait_queue_priority(queue, task, queue_priority);
 	else if (RPC_IS_SWAPPER(task))
 		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
 	else
@@ -311,13 +315,15 @@ static void rpc_make_runnable(struct rpc_task *task)
  * NB: An RPC task will only receive interrupt-driven events as long
  * as it's on a wait queue.
  */
-static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
-			rpc_action action)
+static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
+		struct rpc_task *task,
+		rpc_action action,
+		unsigned char queue_priority)
 {
 	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
 			task->tk_pid, rpc_qname(q), jiffies);
 
-	__rpc_add_wait_queue(q, task);
+	__rpc_add_wait_queue(q, task, queue_priority);
 
 	BUG_ON(task->tk_callback != NULL);
 	task->tk_callback = action;
@@ -334,11 +340,25 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 	 * Protect the queue operations.
 	 */
 	spin_lock_bh(&q->lock);
-	__rpc_sleep_on(q, task, action);
+	__rpc_sleep_on_priority(q, task, action, task->tk_priority);
 	spin_unlock_bh(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on);
 
+void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
+		rpc_action action, int priority)
+{
+	/* We shouldn't ever put an inactive task to sleep */
+	BUG_ON(!RPC_IS_ACTIVATED(task));
+
+	/*
+	 * Protect the queue operations.
+	 */
+	spin_lock_bh(&q->lock);
+	__rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
+	spin_unlock_bh(&q->lock);
+}
+
 /**
  * __rpc_do_wake_up_task - wake up a single rpc_task
  * @queue: wait queue
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 2b90292e9505..6a69a1131fb7 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1252,7 +1252,7 @@ svc_process(struct svc_rqst *rqstp)
 	}
 }
 
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /*
  * Process a backchannel RPC request that arrived over an existing
  * outbound connection
@@ -1300,8 +1300,8 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
 		return 0;
 	}
 }
-EXPORT_SYMBOL(bc_svc_process);
-#endif /* CONFIG_NFS_V4_1 */
+EXPORT_SYMBOL_GPL(bc_svc_process);
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
 /*
  * Return (transport-specific) limit on the rpc payload.
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index f2cb5b881dea..767d494de7a2 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -68,12 +68,12 @@ static void		svc_sock_free(struct svc_xprt *);
 static struct svc_xprt *svc_create_socket(struct svc_serv *, int,
 					  struct net *, struct sockaddr *,
 					  int, int);
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 static struct svc_xprt *svc_bc_create_socket(struct svc_serv *, int,
 					     struct net *, struct sockaddr *,
 					     int, int);
 static void svc_bc_sock_free(struct svc_xprt *xprt);
-#endif /* CONFIG_NFS_V4_1 */
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 static struct lock_class_key svc_key[2];
@@ -1243,7 +1243,7 @@ static struct svc_xprt *svc_tcp_create(struct svc_serv *serv,
 	return svc_create_socket(serv, IPPROTO_TCP, net, sa, salen, flags);
 }
 
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 static struct svc_xprt *svc_bc_create_socket(struct svc_serv *, int,
 					     struct net *, struct sockaddr *,
 					     int, int);
@@ -1284,7 +1284,7 @@ static void svc_cleanup_bc_xprt_sock(void)
 {
 	svc_unreg_xprt_class(&svc_tcp_bc_class);
 }
-#else /* CONFIG_NFS_V4_1 */
+#else /* CONFIG_SUNRPC_BACKCHANNEL */
 static void svc_init_bc_xprt_sock(void)
 {
 }
@@ -1292,7 +1292,7 @@ static void svc_init_bc_xprt_sock(void)
 static void svc_cleanup_bc_xprt_sock(void)
 {
 }
-#endif /* CONFIG_NFS_V4_1 */
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
 static struct svc_xprt_ops svc_tcp_ops = {
 	.xpo_create = svc_tcp_create,
@@ -1623,7 +1623,7 @@ static void svc_sock_free(struct svc_xprt *xprt)
 	kfree(svsk);
 }
 
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /*
  * Create a back channel svc_xprt which shares the fore channel socket.
  */
@@ -1662,4 +1662,4 @@ static void svc_bc_sock_free(struct svc_xprt *xprt)
 	if (xprt)
 		kfree(container_of(xprt, struct svc_sock, sk_xprt));
 }
-#endif /* CONFIG_NFS_V4_1 */
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index f008c14ad34c..277ebd4bf095 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -126,7 +126,7 @@ xdr_terminate_string(struct xdr_buf *buf, const u32 len)
 	kaddr[buf->page_base + len] = '\0';
 	kunmap_atomic(kaddr, KM_USER0);
 }
-EXPORT_SYMBOL(xdr_terminate_string);
+EXPORT_SYMBOL_GPL(xdr_terminate_string);
 
 void
 xdr_encode_pages(struct xdr_buf *xdr, struct page **pages, unsigned int base,
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ce5eb68a9664..f4385e45a5fc 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -62,6 +62,7 @@
 /*
  * Local functions
  */
+static void	 xprt_init(struct rpc_xprt *xprt, struct net *net);
 static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
 static void	xprt_connect_status(struct rpc_task *task);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
@@ -186,15 +187,16 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
 /**
  * xprt_reserve_xprt - serialize write access to transports
  * @task: task that is requesting access to the transport
+ * @xprt: pointer to the target transport
  *
  * This prevents mixing the payload of separate requests, and prevents
  * transport connects from colliding with writes.  No congestion control
  * is provided.
  */
-int xprt_reserve_xprt(struct rpc_task *task)
+int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
 	struct rpc_rqst *req = task->tk_rqstp;
-	struct rpc_xprt	*xprt = req->rq_xprt;
+	int priority;
 
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 		if (task == xprt->snd_task)
@@ -202,8 +204,10 @@ int xprt_reserve_xprt(struct rpc_task *task)
 		goto out_sleep;
 	}
 	xprt->snd_task = task;
-	req->rq_bytes_sent = 0;
-	req->rq_ntrans++;
+	if (req != NULL) {
+		req->rq_bytes_sent = 0;
+		req->rq_ntrans++;
+	}
 
 	return 1;
 
@@ -212,10 +216,13 @@ out_sleep:
 			task->tk_pid, xprt);
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
-	if (req->rq_ntrans)
-		rpc_sleep_on(&xprt->resend, task, NULL);
+	if (req == NULL)
+		priority = RPC_PRIORITY_LOW;
+	else if (!req->rq_ntrans)
+		priority = RPC_PRIORITY_NORMAL;
 	else
-		rpc_sleep_on(&xprt->sending, task, NULL);
+		priority = RPC_PRIORITY_HIGH;
+	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
 	return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -239,22 +246,24 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
  * integrated into the decision of whether a request is allowed to be
  * woken up and given access to the transport.
  */
-int xprt_reserve_xprt_cong(struct rpc_task *task)
+int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-	struct rpc_xprt	*xprt = task->tk_xprt;
 	struct rpc_rqst *req = task->tk_rqstp;
+	int priority;
 
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
 		if (task == xprt->snd_task)
 			return 1;
 		goto out_sleep;
 	}
+	if (req == NULL) {
+		xprt->snd_task = task;
+		return 1;
+	}
 	if (__xprt_get_cong(xprt, task)) {
 		xprt->snd_task = task;
-		if (req) {
-			req->rq_bytes_sent = 0;
-			req->rq_ntrans++;
-		}
+		req->rq_bytes_sent = 0;
+		req->rq_ntrans++;
 		return 1;
 	}
 	xprt_clear_locked(xprt);
@@ -262,10 +271,13 @@ out_sleep:
 	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
-	if (req && req->rq_ntrans)
-		rpc_sleep_on(&xprt->resend, task, NULL);
+	if (req == NULL)
+		priority = RPC_PRIORITY_LOW;
+	else if (!req->rq_ntrans)
+		priority = RPC_PRIORITY_NORMAL;
 	else
-		rpc_sleep_on(&xprt->sending, task, NULL);
+		priority = RPC_PRIORITY_HIGH;
+	rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
 	return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -275,7 +287,7 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
 	int retval;
 
 	spin_lock_bh(&xprt->transport_lock);
-	retval = xprt->ops->reserve_xprt(task);
+	retval = xprt->ops->reserve_xprt(xprt, task);
 	spin_unlock_bh(&xprt->transport_lock);
 	return retval;
 }
@@ -288,12 +300,9 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		return;
 
-	task = rpc_wake_up_next(&xprt->resend);
-	if (!task) {
-		task = rpc_wake_up_next(&xprt->sending);
-		if (!task)
-			goto out_unlock;
-	}
+	task = rpc_wake_up_next(&xprt->sending);
+	if (task == NULL)
+		goto out_unlock;
 
 	req = task->tk_rqstp;
 	xprt->snd_task = task;
@@ -310,24 +319,25 @@ out_unlock:
 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 {
 	struct rpc_task *task;
+	struct rpc_rqst *req;
 
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		return;
 	if (RPCXPRT_CONGESTED(xprt))
 		goto out_unlock;
-	task = rpc_wake_up_next(&xprt->resend);
-	if (!task) {
-		task = rpc_wake_up_next(&xprt->sending);
-		if (!task)
-			goto out_unlock;
+	task = rpc_wake_up_next(&xprt->sending);
+	if (task == NULL)
+		goto out_unlock;
+
+	req = task->tk_rqstp;
+	if (req == NULL) {
+		xprt->snd_task = task;
+		return;
 	}
 	if (__xprt_get_cong(xprt, task)) {
-		struct rpc_rqst *req = task->tk_rqstp;
 		xprt->snd_task = task;
-		if (req) {
-			req->rq_bytes_sent = 0;
-			req->rq_ntrans++;
-		}
+		req->rq_bytes_sent = 0;
+		req->rq_ntrans++;
 		return;
 	}
 out_unlock:
@@ -852,7 +862,7 @@ int xprt_prepare_transmit(struct rpc_task *task)
 		err = req->rq_reply_bytes_recvd;
 		goto out_unlock;
 	}
-	if (!xprt->ops->reserve_xprt(task))
+	if (!xprt->ops->reserve_xprt(xprt, task))
 		err = -EAGAIN;
 out_unlock:
 	spin_unlock_bh(&xprt->transport_lock);
@@ -928,28 +938,66 @@ void xprt_transmit(struct rpc_task *task)
 	spin_unlock_bh(&xprt->transport_lock);
 }
 
+static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
+{
+	struct rpc_rqst *req = ERR_PTR(-EAGAIN);
+
+	if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
+		goto out;
+	req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
+	if (req != NULL)
+		goto out;
+	atomic_dec(&xprt->num_reqs);
+	req = ERR_PTR(-ENOMEM);
+out:
+	return req;
+}
+
+static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+	if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
+		kfree(req);
+		return true;
+	}
+	return false;
+}
+
 static void xprt_alloc_slot(struct rpc_task *task)
 {
 	struct rpc_xprt	*xprt = task->tk_xprt;
+	struct rpc_rqst *req;
 
-	task->tk_status = 0;
-	if (task->tk_rqstp)
-		return;
 	if (!list_empty(&xprt->free)) {
-		struct rpc_rqst	*req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
-		list_del_init(&req->rq_list);
-		task->tk_rqstp = req;
-		xprt_request_init(task, xprt);
-		return;
+		req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
+		list_del(&req->rq_list);
+		goto out_init_req;
+	}
+	req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
+	if (!IS_ERR(req))
+		goto out_init_req;
+	switch (PTR_ERR(req)) {
+	case -ENOMEM:
+		rpc_delay(task, HZ >> 2);
+		dprintk("RPC:       dynamic allocation of request slot "
+				"failed! Retrying\n");
+		break;
+	case -EAGAIN:
+		rpc_sleep_on(&xprt->backlog, task, NULL);
+		dprintk("RPC:       waiting for request slot\n");
 	}
-	dprintk("RPC:       waiting for request slot\n");
 	task->tk_status = -EAGAIN;
-	task->tk_timeout = 0;
-	rpc_sleep_on(&xprt->backlog, task, NULL);
+	return;
+out_init_req:
+	task->tk_status = 0;
+	task->tk_rqstp = req;
+	xprt_request_init(task, xprt);
 }
 
 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
+	if (xprt_dynamic_free_slot(xprt, req))
+		return;
+
 	memset(req, 0, sizeof(*req));	/* mark unused */
 
 	spin_lock(&xprt->reserve_lock);
@@ -958,25 +1006,49 @@ static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 	spin_unlock(&xprt->reserve_lock);
 }
 
-struct rpc_xprt *xprt_alloc(struct net *net, int size, int max_req)
+static void xprt_free_all_slots(struct rpc_xprt *xprt)
+{
+	struct rpc_rqst *req;
+	while (!list_empty(&xprt->free)) {
+		req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
+		list_del(&req->rq_list);
+		kfree(req);
+	}
+}
+
+struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
+		unsigned int num_prealloc,
+		unsigned int max_alloc)
 {
 	struct rpc_xprt *xprt;
+	struct rpc_rqst *req;
+	int i;
 
 	xprt = kzalloc(size, GFP_KERNEL);
 	if (xprt == NULL)
 		goto out;
-	atomic_set(&xprt->count, 1);
 
-	xprt->max_reqs = max_req;
-	xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL);
-	if (xprt->slot == NULL)
+	xprt_init(xprt, net);
+
+	for (i = 0; i < num_prealloc; i++) {
+		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
+		if (!req)
+			break;
+		list_add(&req->rq_list, &xprt->free);
+	}
+	if (i < num_prealloc)
 		goto out_free;
+	if (max_alloc > num_prealloc)
+		xprt->max_reqs = max_alloc;
+	else
+		xprt->max_reqs = num_prealloc;
+	xprt->min_reqs = num_prealloc;
+	atomic_set(&xprt->num_reqs, num_prealloc);
 
-	xprt->xprt_net = get_net(net);
 	return xprt;
 
 out_free:
-	kfree(xprt);
+	xprt_free(xprt);
 out:
 	return NULL;
 }
@@ -985,7 +1057,7 @@ EXPORT_SYMBOL_GPL(xprt_alloc);
 void xprt_free(struct rpc_xprt *xprt)
 {
 	put_net(xprt->xprt_net);
-	kfree(xprt->slot);
+	xprt_free_all_slots(xprt);
 	kfree(xprt);
 }
 EXPORT_SYMBOL_GPL(xprt_free);
@@ -1001,10 +1073,24 @@ void xprt_reserve(struct rpc_task *task)
 {
 	struct rpc_xprt	*xprt = task->tk_xprt;
 
-	task->tk_status = -EIO;
+	task->tk_status = 0;
+	if (task->tk_rqstp != NULL)
+		return;
+
+	/* Note: grabbing the xprt_lock_write() here is not strictly needed,
+	 * but ensures that we throttle new slot allocation if the transport
+	 * is congested (e.g. if reconnecting or if we're out of socket
+	 * write buffer space).
+	 */
+	task->tk_timeout = 0;
+	task->tk_status = -EAGAIN;
+	if (!xprt_lock_write(xprt, task))
+		return;
+
 	spin_lock(&xprt->reserve_lock);
 	xprt_alloc_slot(task);
 	spin_unlock(&xprt->reserve_lock);
+	xprt_release_write(xprt, task);
 }
 
 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
@@ -1021,6 +1107,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 {
 	struct rpc_rqst	*req = task->tk_rqstp;
 
+	INIT_LIST_HEAD(&req->rq_list);
 	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 	req->rq_task	= task;
 	req->rq_xprt    = xprt;
@@ -1073,6 +1160,34 @@ void xprt_release(struct rpc_task *task)
 		xprt_free_bc_request(req);
 }
 
+static void xprt_init(struct rpc_xprt *xprt, struct net *net)
+{
+	atomic_set(&xprt->count, 1);
+
+	spin_lock_init(&xprt->transport_lock);
+	spin_lock_init(&xprt->reserve_lock);
+
+	INIT_LIST_HEAD(&xprt->free);
+	INIT_LIST_HEAD(&xprt->recv);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	spin_lock_init(&xprt->bc_pa_lock);
+	INIT_LIST_HEAD(&xprt->bc_pa_list);
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+	xprt->last_used = jiffies;
+	xprt->cwnd = RPC_INITCWND;
+	xprt->bind_index = 0;
+
+	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
+	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
+	rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
+	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
+
+	xprt_init_xid(xprt);
+
+	xprt->xprt_net = get_net(net);
+}
+
 /**
  * xprt_create_transport - create an RPC transport
  * @args: rpc transport creation arguments
@@ -1081,7 +1196,6 @@ void xprt_release(struct rpc_task *task)
 struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
 {
 	struct rpc_xprt	*xprt;
-	struct rpc_rqst	*req;
 	struct xprt_class *t;
 
 	spin_lock(&xprt_list_lock);
@@ -1100,46 +1214,17 @@ found:
 	if (IS_ERR(xprt)) {
 		dprintk("RPC:       xprt_create_transport: failed, %ld\n",
 				-PTR_ERR(xprt));
-		return xprt;
+		goto out;
 	}
-	if (test_and_set_bit(XPRT_INITIALIZED, &xprt->state))
-		/* ->setup returned a pre-initialized xprt: */
-		return xprt;
-
-	spin_lock_init(&xprt->transport_lock);
-	spin_lock_init(&xprt->reserve_lock);
-
-	INIT_LIST_HEAD(&xprt->free);
-	INIT_LIST_HEAD(&xprt->recv);
-#if defined(CONFIG_NFS_V4_1)
-	spin_lock_init(&xprt->bc_pa_lock);
-	INIT_LIST_HEAD(&xprt->bc_pa_list);
-#endif /* CONFIG_NFS_V4_1 */
-
 	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
 	if (xprt_has_timer(xprt))
 		setup_timer(&xprt->timer, xprt_init_autodisconnect,
 			    (unsigned long)xprt);
 	else
 		init_timer(&xprt->timer);
-	xprt->last_used = jiffies;
-	xprt->cwnd = RPC_INITCWND;
-	xprt->bind_index = 0;
-
-	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
-	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
-	rpc_init_wait_queue(&xprt->sending, "xprt_sending");
-	rpc_init_wait_queue(&xprt->resend, "xprt_resend");
-	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
-
-	/* initialize free list */
-	for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
-		list_add(&req->rq_list, &xprt->free);
-
-	xprt_init_xid(xprt);
-
 	dprintk("RPC:       created transport %p with %u slots\n", xprt,
 			xprt->max_reqs);
+out:
 	return xprt;
 }
 
@@ -1157,7 +1242,6 @@ static void xprt_destroy(struct rpc_xprt *xprt)
 	rpc_destroy_wait_queue(&xprt->binding);
 	rpc_destroy_wait_queue(&xprt->pending);
 	rpc_destroy_wait_queue(&xprt->sending);
-	rpc_destroy_wait_queue(&xprt->resend);
 	rpc_destroy_wait_queue(&xprt->backlog);
 	cancel_work_sync(&xprt->task_cleanup);
 	/*
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 0867070bb5ca..b446e100286f 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -283,6 +283,7 @@ xprt_setup_rdma(struct xprt_create *args)
 	}
 
 	xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
+			xprt_rdma_slot_table_entries,
 			xprt_rdma_slot_table_entries);
 	if (xprt == NULL) {
 		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
@@ -452,9 +453,8 @@ xprt_rdma_connect(struct rpc_task *task)
 }
 
 static int
-xprt_rdma_reserve_xprt(struct rpc_task *task)
+xprt_rdma_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-	struct rpc_xprt *xprt = task->tk_xprt;
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
 
@@ -466,7 +466,7 @@ xprt_rdma_reserve_xprt(struct rpc_task *task)
 		BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
 	}
 	xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
-	return xprt_reserve_xprt_cong(task);
+	return xprt_reserve_xprt_cong(xprt, task);
 }
 
 /*
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index cae761a8536c..08c5d5a128fc 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -42,7 +42,7 @@
 
 #include <linux/wait.h> 		/* wait_queue_head_t, etc */
 #include <linux/spinlock.h> 		/* spinlock_t, etc */
-#include <asm/atomic.h>			/* atomic_t, etc */
+#include <linux/atomic.h>			/* atomic_t, etc */
 
 #include <rdma/rdma_cm.h>		/* RDMA connection api */
 #include <rdma/ib_verbs.h>		/* RDMA verbs api */
@@ -109,7 +109,7 @@ struct rpcrdma_ep {
  */
 
 /* temporary static scatter/gather max */
-#define RPCRDMA_MAX_DATA_SEGS	(8)	/* max scatter/gather */
+#define RPCRDMA_MAX_DATA_SEGS	(64)	/* max scatter/gather */
 #define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
 #define MAX_RPCRDMAHDR	(\
 	/* max supported RPC/RDMA header */ \
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 72abb7358933..d7f97ef26590 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -37,7 +37,7 @@
 #include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/xprtsock.h>
 #include <linux/file.h>
-#ifdef CONFIG_NFS_V4_1
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
 #include <linux/sunrpc/bc_xprt.h>
 #endif
 
@@ -54,7 +54,8 @@ static void xs_close(struct rpc_xprt *xprt);
  * xprtsock tunables
  */
 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
-unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
+unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
+unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
 
 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
@@ -75,6 +76,7 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
 
 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
+static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
 
@@ -104,6 +106,15 @@ static ctl_table xs_tunables_table[] = {
 		.extra2		= &max_slot_table_size
 	},
 	{
+		.procname	= "tcp_max_slot_table_entries",
+		.data		= &xprt_max_tcp_slot_table_entries,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec_minmax,
+		.extra1		= &min_slot_table_size,
+		.extra2		= &max_tcp_slot_table_limit
+	},
+	{
 		.procname	= "min_resvport",
 		.data		= &xprt_min_resvport,
 		.maxlen		= sizeof(unsigned int),
@@ -755,6 +766,8 @@ static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 	if (task == NULL)
 		goto out_release;
 	req = task->tk_rqstp;
+	if (req == NULL)
+		goto out_release;
 	if (req->rq_bytes_sent == 0)
 		goto out_release;
 	if (req->rq_bytes_sent == req->rq_snd_buf.len)
@@ -1236,7 +1249,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
 	return 0;
 }
 
-#if defined(CONFIG_NFS_V4_1)
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /*
  * Obtains an rpc_rqst previously allocated and invokes the common
  * tcp read code to read the data.  The result is placed in the callback
@@ -1299,7 +1312,7 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
 {
 	return xs_tcp_read_reply(xprt, desc);
 }
-#endif /* CONFIG_NFS_V4_1 */
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
 /*
  * Read data off the transport.  This can be either an RPC_CALL or an
@@ -2489,7 +2502,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
 }
 
 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
-				      unsigned int slot_table_size)
+				      unsigned int slot_table_size,
+				      unsigned int max_slot_table_size)
 {
 	struct rpc_xprt *xprt;
 	struct sock_xprt *new;
@@ -2499,7 +2513,8 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
 		return ERR_PTR(-EBADF);
 	}
 
-	xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size);
+	xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
+			max_slot_table_size);
 	if (xprt == NULL) {
 		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
 				"rpc_xprt\n");
@@ -2541,7 +2556,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
 	struct rpc_xprt *xprt;
 	struct rpc_xprt *ret;
 
-	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+			xprt_max_tcp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2605,7 +2621,8 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
 	struct sock_xprt *transport;
 	struct rpc_xprt *ret;
 
-	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
+			xprt_udp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2681,7 +2698,8 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
 	struct sock_xprt *transport;
 	struct rpc_xprt *ret;
 
-	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+			xprt_max_tcp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2760,7 +2778,8 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
 		 */
 		 return args->bc_xprt->xpt_bc_xprt;
 	}
-	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+			xprt_tcp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2947,8 +2966,26 @@ static struct kernel_param_ops param_ops_slot_table_size = {
 #define param_check_slot_table_size(name, p) \
 	__param_check(name, p, unsigned int);
 
+static int param_set_max_slot_table_size(const char *val,
+				     const struct kernel_param *kp)
+{
+	return param_set_uint_minmax(val, kp,
+			RPC_MIN_SLOT_TABLE,
+			RPC_MAX_SLOT_TABLE_LIMIT);
+}
+
+static struct kernel_param_ops param_ops_max_slot_table_size = {
+	.set = param_set_max_slot_table_size,
+	.get = param_get_uint,
+};
+
+#define param_check_max_slot_table_size(name, p) \
+	__param_check(name, p, unsigned int);
+
 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
 		   slot_table_size, 0644);
+module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
+		   max_slot_table_size, 0644);
 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
 		   slot_table_size, 0644);