summary refs log tree commit diff
path: root/net/sunrpc
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2009-09-22 07:54:33 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2009-09-22 07:54:33 -0700
commita87e84b5cdfacf11af4e8a85c4bca9793658536f (patch)
treef8e3cb2d339d8ed0e987d55f725e501730cdc81d /net/sunrpc
parent342ff1a1b558ebbdb8cbd55ab6a63eca8b2473ca (diff)
parent3c394ddaa7ea4205f933fd9b481166b2669368a9 (diff)
downloadlinux-a87e84b5cdfacf11af4e8a85c4bca9793658536f.tar.gz
Merge branch 'for-2.6.32' of git://linux-nfs.org/~bfields/linux
* 'for-2.6.32' of git://linux-nfs.org/~bfields/linux: (68 commits)
  nfsd4: nfsv4 clients should cross mountpoints
  nfsd: revise 4.1 status documentation
  sunrpc/cache: avoid variable over-loading in cache_defer_req
  sunrpc/cache: use list_del_init for the list_head entries in cache_deferred_req
  nfsd: return success for non-NFS4 nfs4_state_start
  nfsd41: Refactor create_client()
  nfsd41: modify nfsd4.1 backchannel to use new xprt class
  nfsd41: Backchannel: Implement cb_recall over NFSv4.1
  nfsd41: Backchannel: cb_sequence callback
  nfsd41: Backchannel: Setup sequence information
  nfsd41: Backchannel: Server backchannel RPC wait queue
  nfsd41: Backchannel: Add sequence arguments to callback RPC arguments
  nfsd41: Backchannel: callback infrastructure
  nfsd4: use common rpc_cred for all callbacks
  nfsd4: allow nfs4 state startup to fail
  SUNRPC: Defer the auth_gss upcall when the RPC call is asynchronous
  nfsd4: fix null dereference creating nfsv4 callback client
  nfsd4: fix whitespace in NFSPROC4_CLNT_CB_NULL definition
  nfsd41: sunrpc: add new xprt class for nfsv4.1 backchannel
  sunrpc/cache: simplify cache_fresh_locked and cache_fresh_unlocked.
  ...
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth.c20
-rw-r--r--net/sunrpc/auth_generic.c4
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c6
-rw-r--r--net/sunrpc/cache.c109
-rw-r--r--net/sunrpc/clnt.c1
-rw-r--r--net/sunrpc/sched.c7
-rw-r--r--net/sunrpc/sunrpc.h14
-rw-r--r--net/sunrpc/svc_xprt.c25
-rw-r--r--net/sunrpc/svcauth_unix.c1
-rw-r--r--net/sunrpc/svcsock.c335
-rw-r--r--net/sunrpc/xprt.c15
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c4
-rw-r--r--net/sunrpc/xprtsock.c242
13 files changed, 603 insertions, 180 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 0c431c277af5..54a4e042f104 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -385,7 +385,7 @@ rpcauth_init_cred(struct rpc_cred *cred, const struct auth_cred *acred,
 EXPORT_SYMBOL_GPL(rpcauth_init_cred);
 
 void
-rpcauth_generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred)
+rpcauth_generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred, int lookupflags)
 {
 	task->tk_msg.rpc_cred = get_rpccred(cred);
 	dprintk("RPC: %5u holding %s cred %p\n", task->tk_pid,
@@ -394,7 +394,7 @@ rpcauth_generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred)
 EXPORT_SYMBOL_GPL(rpcauth_generic_bind_cred);
 
 static void
-rpcauth_bind_root_cred(struct rpc_task *task)
+rpcauth_bind_root_cred(struct rpc_task *task, int lookupflags)
 {
 	struct rpc_auth *auth = task->tk_client->cl_auth;
 	struct auth_cred acred = {
@@ -405,7 +405,7 @@ rpcauth_bind_root_cred(struct rpc_task *task)
 
 	dprintk("RPC: %5u looking up %s cred\n",
 		task->tk_pid, task->tk_client->cl_auth->au_ops->au_name);
-	ret = auth->au_ops->lookup_cred(auth, &acred, 0);
+	ret = auth->au_ops->lookup_cred(auth, &acred, lookupflags);
 	if (!IS_ERR(ret))
 		task->tk_msg.rpc_cred = ret;
 	else
@@ -413,14 +413,14 @@ rpcauth_bind_root_cred(struct rpc_task *task)
 }
 
 static void
-rpcauth_bind_new_cred(struct rpc_task *task)
+rpcauth_bind_new_cred(struct rpc_task *task, int lookupflags)
 {
 	struct rpc_auth *auth = task->tk_client->cl_auth;
 	struct rpc_cred *ret;
 
 	dprintk("RPC: %5u looking up %s cred\n",
 		task->tk_pid, auth->au_ops->au_name);
-	ret = rpcauth_lookupcred(auth, 0);
+	ret = rpcauth_lookupcred(auth, lookupflags);
 	if (!IS_ERR(ret))
 		task->tk_msg.rpc_cred = ret;
 	else
@@ -430,12 +430,16 @@ rpcauth_bind_new_cred(struct rpc_task *task)
 void
 rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
 {
+	int lookupflags = 0;
+
+	if (flags & RPC_TASK_ASYNC)
+		lookupflags |= RPCAUTH_LOOKUP_NEW;
 	if (cred != NULL)
-		cred->cr_ops->crbind(task, cred);
+		cred->cr_ops->crbind(task, cred, lookupflags);
 	else if (flags & RPC_TASK_ROOTCREDS)
-		rpcauth_bind_root_cred(task);
+		rpcauth_bind_root_cred(task, lookupflags);
 	else
-		rpcauth_bind_new_cred(task);
+		rpcauth_bind_new_cred(task, lookupflags);
 }
 
 void
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 4028502f0528..bf88bf8e9365 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -55,13 +55,13 @@ struct rpc_cred *rpc_lookup_machine_cred(void)
 EXPORT_SYMBOL_GPL(rpc_lookup_machine_cred);
 
 static void
-generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred)
+generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred, int lookupflags)
 {
 	struct rpc_auth *auth = task->tk_client->cl_auth;
 	struct auth_cred *acred = &container_of(cred, struct generic_cred, gc_base)->acred;
 	struct rpc_cred *ret;
 
-	ret = auth->au_ops->lookup_cred(auth, acred, 0);
+	ret = auth->au_ops->lookup_cred(auth, acred, lookupflags);
 	if (!IS_ERR(ret))
 		task->tk_msg.rpc_cred = ret;
 	else
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 2e6a148d277c..f6c51e562a02 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -1374,8 +1374,10 @@ svcauth_gss_release(struct svc_rqst *rqstp)
 		if (stat)
 			goto out_err;
 		break;
-	default:
-		goto out_err;
+	/*
+	 * For any other gc_svc value, svcauth_gss_accept() already set
+	 * the auth_error appropriately; just fall through:
+	 */
 	}
 
 out:
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 45cdaff9b361..d6eee291a0e2 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -103,23 +103,21 @@ struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
 EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
 
 
-static void queue_loose(struct cache_detail *detail, struct cache_head *ch);
+static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch);
 
-static int cache_fresh_locked(struct cache_head *head, time_t expiry)
+static void cache_fresh_locked(struct cache_head *head, time_t expiry)
 {
 	head->expiry_time = expiry;
 	head->last_refresh = get_seconds();
-	return !test_and_set_bit(CACHE_VALID, &head->flags);
+	set_bit(CACHE_VALID, &head->flags);
 }
 
 static void cache_fresh_unlocked(struct cache_head *head,
-			struct cache_detail *detail, int new)
+				 struct cache_detail *detail)
 {
-	if (new)
-		cache_revisit_request(head);
 	if (test_and_clear_bit(CACHE_PENDING, &head->flags)) {
 		cache_revisit_request(head);
-		queue_loose(detail, head);
+		cache_dequeue(detail, head);
 	}
 }
 
@@ -132,7 +130,6 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
 	 */
 	struct cache_head **head;
 	struct cache_head *tmp;
-	int is_new;
 
 	if (!test_bit(CACHE_VALID, &old->flags)) {
 		write_lock(&detail->hash_lock);
@@ -141,9 +138,9 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
 				set_bit(CACHE_NEGATIVE, &old->flags);
 			else
 				detail->update(old, new);
-			is_new = cache_fresh_locked(old, new->expiry_time);
+			cache_fresh_locked(old, new->expiry_time);
 			write_unlock(&detail->hash_lock);
-			cache_fresh_unlocked(old, detail, is_new);
+			cache_fresh_unlocked(old, detail);
 			return old;
 		}
 		write_unlock(&detail->hash_lock);
@@ -167,11 +164,11 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
 	*head = tmp;
 	detail->entries++;
 	cache_get(tmp);
-	is_new = cache_fresh_locked(tmp, new->expiry_time);
+	cache_fresh_locked(tmp, new->expiry_time);
 	cache_fresh_locked(old, 0);
 	write_unlock(&detail->hash_lock);
-	cache_fresh_unlocked(tmp, detail, is_new);
-	cache_fresh_unlocked(old, detail, 0);
+	cache_fresh_unlocked(tmp, detail);
+	cache_fresh_unlocked(old, detail);
 	cache_put(old, detail);
 	return tmp;
 }
@@ -184,6 +181,22 @@ static int cache_make_upcall(struct cache_detail *cd, struct cache_head *h)
 	return cd->cache_upcall(cd, h);
 }
 
+static inline int cache_is_valid(struct cache_detail *detail, struct cache_head *h)
+{
+	if (!test_bit(CACHE_VALID, &h->flags) ||
+	    h->expiry_time < get_seconds())
+		return -EAGAIN;
+	else if (detail->flush_time > h->last_refresh)
+		return -EAGAIN;
+	else {
+		/* entry is valid */
+		if (test_bit(CACHE_NEGATIVE, &h->flags))
+			return -ENOENT;
+		else
+			return 0;
+	}
+}
+
 /*
  * This is the generic cache management routine for all
  * the authentication caches.
@@ -192,8 +205,10 @@ static int cache_make_upcall(struct cache_detail *cd, struct cache_head *h)
  *
  *
  * Returns 0 if the cache_head can be used, or cache_puts it and returns
- * -EAGAIN if upcall is pending,
- * -ETIMEDOUT if upcall failed and should be retried,
+ * -EAGAIN if upcall is pending and request has been queued
+ * -ETIMEDOUT if upcall failed or request could not be queue or
+ *           upcall completed but item is still invalid (implying that
+ *           the cache item has been replaced with a newer one).
  * -ENOENT if cache entry was negative
  */
 int cache_check(struct cache_detail *detail,
@@ -203,17 +218,7 @@ int cache_check(struct cache_detail *detail,
 	long refresh_age, age;
 
 	/* First decide return status as best we can */
-	if (!test_bit(CACHE_VALID, &h->flags) ||
-	    h->expiry_time < get_seconds())
-		rv = -EAGAIN;
-	else if (detail->flush_time > h->last_refresh)
-		rv = -EAGAIN;
-	else {
-		/* entry is valid */
-		if (test_bit(CACHE_NEGATIVE, &h->flags))
-			rv = -ENOENT;
-		else rv = 0;
-	}
+	rv = cache_is_valid(detail, h);
 
 	/* now see if we want to start an upcall */
 	refresh_age = (h->expiry_time - h->last_refresh);
@@ -229,10 +234,11 @@ int cache_check(struct cache_detail *detail,
 			switch (cache_make_upcall(detail, h)) {
 			case -EINVAL:
 				clear_bit(CACHE_PENDING, &h->flags);
+				cache_revisit_request(h);
 				if (rv == -EAGAIN) {
 					set_bit(CACHE_NEGATIVE, &h->flags);
-					cache_fresh_unlocked(h, detail,
-					     cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY));
+					cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY);
+					cache_fresh_unlocked(h, detail);
 					rv = -ENOENT;
 				}
 				break;
@@ -245,10 +251,14 @@ int cache_check(struct cache_detail *detail,
 		}
 	}
 
-	if (rv == -EAGAIN)
-		if (cache_defer_req(rqstp, h) != 0)
-			rv = -ETIMEDOUT;
-
+	if (rv == -EAGAIN) {
+		if (cache_defer_req(rqstp, h) < 0) {
+			/* Request is not deferred */
+			rv = cache_is_valid(detail, h);
+			if (rv == -EAGAIN)
+				rv = -ETIMEDOUT;
+		}
+	}
 	if (rv)
 		cache_put(h, detail);
 	return rv;
@@ -396,7 +406,7 @@ static int cache_clean(void)
 				)
 				continue;
 			if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
-				queue_loose(current_detail, ch);
+				cache_dequeue(current_detail, ch);
 
 			if (atomic_read(&ch->ref.refcount) == 1)
 				break;
@@ -412,8 +422,10 @@ static int cache_clean(void)
 		if (!ch)
 			current_index ++;
 		spin_unlock(&cache_list_lock);
-		if (ch)
+		if (ch) {
+			cache_revisit_request(ch);
 			cache_put(ch, d);
+		}
 	} else
 		spin_unlock(&cache_list_lock);
 
@@ -488,7 +500,7 @@ static int cache_defer_cnt;
 
 static int cache_defer_req(struct cache_req *req, struct cache_head *item)
 {
-	struct cache_deferred_req *dreq;
+	struct cache_deferred_req *dreq, *discard;
 	int hash = DFR_HASH(item);
 
 	if (cache_defer_cnt >= DFR_MAX) {
@@ -496,11 +508,11 @@ static int cache_defer_req(struct cache_req *req, struct cache_head *item)
 		 * or continue and drop the oldest below
 		 */
 		if (net_random()&1)
-			return -ETIMEDOUT;
+			return -ENOMEM;
 	}
 	dreq = req->defer(req);
 	if (dreq == NULL)
-		return -ETIMEDOUT;
+		return -ENOMEM;
 
 	dreq->item = item;
 
@@ -513,23 +525,24 @@ static int cache_defer_req(struct cache_req *req, struct cache_head *item)
 	list_add(&dreq->hash, &cache_defer_hash[hash]);
 
 	/* it is in, now maybe clean up */
-	dreq = NULL;
+	discard = NULL;
 	if (++cache_defer_cnt > DFR_MAX) {
-		dreq = list_entry(cache_defer_list.prev,
-				  struct cache_deferred_req, recent);
-		list_del(&dreq->recent);
-		list_del(&dreq->hash);
+		discard = list_entry(cache_defer_list.prev,
+				     struct cache_deferred_req, recent);
+		list_del_init(&discard->recent);
+		list_del_init(&discard->hash);
 		cache_defer_cnt--;
 	}
 	spin_unlock(&cache_defer_lock);
 
-	if (dreq) {
+	if (discard)
 		/* there was one too many */
-		dreq->revisit(dreq, 1);
-	}
+		discard->revisit(discard, 1);
+
 	if (!test_bit(CACHE_PENDING, &item->flags)) {
 		/* must have just been validated... */
 		cache_revisit_request(item);
+		return -EAGAIN;
 	}
 	return 0;
 }
@@ -551,7 +564,7 @@ static void cache_revisit_request(struct cache_head *item)
 			dreq = list_entry(lp, struct cache_deferred_req, hash);
 			lp = lp->next;
 			if (dreq->item == item) {
-				list_del(&dreq->hash);
+				list_del_init(&dreq->hash);
 				list_move(&dreq->recent, &pending);
 				cache_defer_cnt--;
 			}
@@ -577,7 +590,7 @@ void cache_clean_deferred(void *owner)
 
 	list_for_each_entry_safe(dreq, tmp, &cache_defer_list, recent) {
 		if (dreq->owner == owner) {
-			list_del(&dreq->hash);
+			list_del_init(&dreq->hash);
 			list_move(&dreq->recent, &pending);
 			cache_defer_cnt--;
 		}
@@ -887,7 +900,7 @@ static int cache_release(struct inode *inode, struct file *filp,
 
 
 
-static void queue_loose(struct cache_detail *detail, struct cache_head *ch)
+static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
 {
 	struct cache_queue *cq;
 	spin_lock(&queue_lock);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index fac0ca93f06b..a417d5ab5dd7 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -288,6 +288,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 		.srcaddr = args->saddress,
 		.dstaddr = args->address,
 		.addrlen = args->addrsize,
+		.bc_xprt = args->bc_xprt,
 	};
 	char servername[48];
 
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 8f459abe97cf..cef74ba0666c 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -21,6 +21,8 @@
 
 #include <linux/sunrpc/clnt.h>
 
+#include "sunrpc.h"
+
 #ifdef RPC_DEBUG
 #define RPCDBG_FACILITY		RPCDBG_SCHED
 #define RPC_TASK_MAGIC_ID	0xf00baa
@@ -711,11 +713,6 @@ static void rpc_async_schedule(struct work_struct *work)
 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
 }
 
-struct rpc_buffer {
-	size_t	len;
-	char	data[];
-};
-
 /**
  * rpc_malloc - allocate an RPC buffer
  * @task: RPC task that will use this buffer
diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
index 5d9dd742264b..90c292e2738b 100644
--- a/net/sunrpc/sunrpc.h
+++ b/net/sunrpc/sunrpc.h
@@ -27,11 +27,25 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #ifndef _NET_SUNRPC_SUNRPC_H
 #define _NET_SUNRPC_SUNRPC_H
 
+#include <linux/net.h>
+
+/*
+ * Header for dynamically allocated rpc buffers.
+ */
+struct rpc_buffer {
+	size_t	len;
+	char	data[];
+};
+
 static inline int rpc_reply_expected(struct rpc_task *task)
 {
 	return (task->tk_msg.rpc_proc != NULL) &&
 		(task->tk_msg.rpc_proc->p_decode != NULL);
 }
 
+int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
+		    struct page *headpage, unsigned long headoffset,
+		    struct page *tailpage, unsigned long tailoffset);
+
 #endif /* _NET_SUNRPC_SUNRPC_H */
 
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 27d44332f017..df124f78ee48 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -160,6 +160,7 @@ void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt,
 	mutex_init(&xprt->xpt_mutex);
 	spin_lock_init(&xprt->xpt_lock);
 	set_bit(XPT_BUSY, &xprt->xpt_flags);
+	rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending");
 }
 EXPORT_SYMBOL_GPL(svc_xprt_init);
 
@@ -710,10 +711,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
 	spin_unlock_bh(&pool->sp_lock);
 
 	len = 0;
-	if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
-		dprintk("svc_recv: found XPT_CLOSE\n");
-		svc_delete_xprt(xprt);
-	} else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
+	if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
 		struct svc_xprt *newxpt;
 		newxpt = xprt->xpt_ops->xpo_accept(xprt);
 		if (newxpt) {
@@ -739,7 +737,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
 			svc_xprt_received(newxpt);
 		}
 		svc_xprt_received(xprt);
-	} else {
+	} else if (!test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 			rqstp, pool->sp_id, xprt,
 			atomic_read(&xprt->xpt_ref.refcount));
@@ -752,6 +750,11 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
 		dprintk("svc: got len=%d\n", len);
 	}
 
+	if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
+		dprintk("svc_recv: found XPT_CLOSE\n");
+		svc_delete_xprt(xprt);
+	}
+
 	/* No data, incomplete (TCP) read, or accept() */
 	if (len == 0 || len == -EAGAIN) {
 		rqstp->rq_res.len = 0;
@@ -808,6 +811,7 @@ int svc_send(struct svc_rqst *rqstp)
 	else
 		len = xprt->xpt_ops->xpo_sendto(rqstp);
 	mutex_unlock(&xprt->xpt_mutex);
+	rpc_wake_up(&xprt->xpt_bc_pending);
 	svc_xprt_release(rqstp);
 
 	if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
@@ -1166,11 +1170,6 @@ static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos)
 
 	dprintk("svc_pool_stats_start, *pidx=%u\n", pidx);
 
-	lock_kernel();
-	/* bump up the pseudo refcount while traversing */
-	svc_get(serv);
-	unlock_kernel();
-
 	if (!pidx)
 		return SEQ_START_TOKEN;
 	return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]);
@@ -1198,12 +1197,6 @@ static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos)
 
 static void svc_pool_stats_stop(struct seq_file *m, void *p)
 {
-	struct svc_serv *serv = m->private;
-
-	lock_kernel();
-	/* this function really, really should have been called svc_put() */
-	svc_destroy(serv);
-	unlock_kernel();
 }
 
 static int svc_pool_stats_show(struct seq_file *m, void *p)
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 6caffa34ac01..117f68a8aa40 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -668,6 +668,7 @@ static int unix_gid_find(uid_t uid, struct group_info **gip,
 	case 0:
 		*gip = ug->gi;
 		get_group_info(*gip);
+		cache_put(&ug->h, &unix_gid_cache);
 		return 0;
 	default:
 		return -EAGAIN;
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 23128ee191ae..ccc5e83cae5d 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -49,6 +49,7 @@
 #include <linux/sunrpc/msg_prot.h>
 #include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/xprt.h>
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
@@ -153,49 +154,27 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
 }
 
 /*
- * Generic sendto routine
+ * send routine intended to be shared by the fore- and back-channel
  */
-static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
+int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
+		    struct page *headpage, unsigned long headoffset,
+		    struct page *tailpage, unsigned long tailoffset)
 {
-	struct svc_sock	*svsk =
-		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
-	struct socket	*sock = svsk->sk_sock;
-	int		slen;
-	union {
-		struct cmsghdr	hdr;
-		long		all[SVC_PKTINFO_SPACE / sizeof(long)];
-	} buffer;
-	struct cmsghdr *cmh = &buffer.hdr;
-	int		len = 0;
 	int		result;
 	int		size;
 	struct page	**ppage = xdr->pages;
 	size_t		base = xdr->page_base;
 	unsigned int	pglen = xdr->page_len;
 	unsigned int	flags = MSG_MORE;
-	RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
+	int		slen;
+	int		len = 0;
 
 	slen = xdr->len;
 
-	if (rqstp->rq_prot == IPPROTO_UDP) {
-		struct msghdr msg = {
-			.msg_name	= &rqstp->rq_addr,
-			.msg_namelen	= rqstp->rq_addrlen,
-			.msg_control	= cmh,
-			.msg_controllen	= sizeof(buffer),
-			.msg_flags	= MSG_MORE,
-		};
-
-		svc_set_cmsg_data(rqstp, cmh);
-
-		if (sock_sendmsg(sock, &msg, 0) < 0)
-			goto out;
-	}
-
 	/* send head */
 	if (slen == xdr->head[0].iov_len)
 		flags = 0;
-	len = kernel_sendpage(sock, rqstp->rq_respages[0], 0,
+	len = kernel_sendpage(sock, headpage, headoffset,
 				  xdr->head[0].iov_len, flags);
 	if (len != xdr->head[0].iov_len)
 		goto out;
@@ -219,16 +198,58 @@ static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
 		base = 0;
 		ppage++;
 	}
+
 	/* send tail */
 	if (xdr->tail[0].iov_len) {
-		result = kernel_sendpage(sock, rqstp->rq_respages[0],
-					     ((unsigned long)xdr->tail[0].iov_base)
-						& (PAGE_SIZE-1),
-					     xdr->tail[0].iov_len, 0);
-
+		result = kernel_sendpage(sock, tailpage, tailoffset,
+				   xdr->tail[0].iov_len, 0);
 		if (result > 0)
 			len += result;
 	}
+
+out:
+	return len;
+}
+
+
+/*
+ * Generic sendto routine
+ */
+static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
+{
+	struct svc_sock	*svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	struct socket	*sock = svsk->sk_sock;
+	union {
+		struct cmsghdr	hdr;
+		long		all[SVC_PKTINFO_SPACE / sizeof(long)];
+	} buffer;
+	struct cmsghdr *cmh = &buffer.hdr;
+	int		len = 0;
+	unsigned long tailoff;
+	unsigned long headoff;
+	RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
+
+	if (rqstp->rq_prot == IPPROTO_UDP) {
+		struct msghdr msg = {
+			.msg_name	= &rqstp->rq_addr,
+			.msg_namelen	= rqstp->rq_addrlen,
+			.msg_control	= cmh,
+			.msg_controllen	= sizeof(buffer),
+			.msg_flags	= MSG_MORE,
+		};
+
+		svc_set_cmsg_data(rqstp, cmh);
+
+		if (sock_sendmsg(sock, &msg, 0) < 0)
+			goto out;
+	}
+
+	tailoff = ((unsigned long)xdr->tail[0].iov_base) & (PAGE_SIZE-1);
+	headoff = 0;
+	len = svc_send_common(sock, xdr, rqstp->rq_respages[0], headoff,
+			       rqstp->rq_respages[0], tailoff);
+
 out:
 	dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n",
 		svsk, xdr->head[0].iov_base, xdr->head[0].iov_len,
@@ -432,29 +453,49 @@ static void svc_tcp_write_space(struct sock *sk)
 }
 
 /*
+ * See net/ipv6/ip_sockglue.c : ip_cmsg_recv_pktinfo
+ */
+static int svc_udp_get_dest_address4(struct svc_rqst *rqstp,
+				     struct cmsghdr *cmh)
+{
+	struct in_pktinfo *pki = CMSG_DATA(cmh);
+	if (cmh->cmsg_type != IP_PKTINFO)
+		return 0;
+	rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr;
+	return 1;
+}
+
+/*
+ * See net/ipv6/datagram.c : datagram_recv_ctl
+ */
+static int svc_udp_get_dest_address6(struct svc_rqst *rqstp,
+				     struct cmsghdr *cmh)
+{
+	struct in6_pktinfo *pki = CMSG_DATA(cmh);
+	if (cmh->cmsg_type != IPV6_PKTINFO)
+		return 0;
+	ipv6_addr_copy(&rqstp->rq_daddr.addr6, &pki->ipi6_addr);
+	return 1;
+}
+
+/*
  * Copy the UDP datagram's destination address to the rqstp structure.
  * The 'destination' address in this case is the address to which the
  * peer sent the datagram, i.e. our local address. For multihomed
  * hosts, this can change from msg to msg. Note that only the IP
  * address changes, the port number should remain the same.
  */
-static void svc_udp_get_dest_address(struct svc_rqst *rqstp,
-				     struct cmsghdr *cmh)
+static int svc_udp_get_dest_address(struct svc_rqst *rqstp,
+				    struct cmsghdr *cmh)
 {
-	struct svc_sock *svsk =
-		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
-	switch (svsk->sk_sk->sk_family) {
-	case AF_INET: {
-		struct in_pktinfo *pki = CMSG_DATA(cmh);
-		rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr;
-		break;
-		}
-	case AF_INET6: {
-		struct in6_pktinfo *pki = CMSG_DATA(cmh);
-		ipv6_addr_copy(&rqstp->rq_daddr.addr6, &pki->ipi6_addr);
-		break;
-		}
+	switch (cmh->cmsg_level) {
+	case SOL_IP:
+		return svc_udp_get_dest_address4(rqstp, cmh);
+	case SOL_IPV6:
+		return svc_udp_get_dest_address6(rqstp, cmh);
 	}
+
+	return 0;
 }
 
 /*
@@ -531,16 +572,15 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
 
 	rqstp->rq_prot = IPPROTO_UDP;
 
-	if (cmh->cmsg_level != IPPROTO_IP ||
-	    cmh->cmsg_type != IP_PKTINFO) {
+	if (!svc_udp_get_dest_address(rqstp, cmh)) {
 		if (net_ratelimit())
-			printk("rpcsvc: received unknown control message:"
-			       "%d/%d\n",
-			       cmh->cmsg_level, cmh->cmsg_type);
+			printk(KERN_WARNING
+				"svc: received unknown control message %d/%d; "
+				"dropping RPC reply datagram\n",
+					cmh->cmsg_level, cmh->cmsg_type);
 		skb_free_datagram(svsk->sk_sk, skb);
 		return 0;
 	}
-	svc_udp_get_dest_address(rqstp, cmh);
 
 	if (skb_is_nonlinear(skb)) {
 		/* we have to copy */
@@ -651,8 +691,7 @@ static struct svc_xprt_class svc_udp_class = {
 
 static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
 {
-	int one = 1;
-	mm_segment_t oldfs;
+	int err, level, optname, one = 1;
 
 	svc_xprt_init(&svc_udp_class, &svsk->sk_xprt, serv);
 	clear_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
@@ -671,12 +710,22 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
 	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 	set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
 
-	oldfs = get_fs();
-	set_fs(KERNEL_DS);
 	/* make sure we get destination address info */
-	svsk->sk_sock->ops->setsockopt(svsk->sk_sock, IPPROTO_IP, IP_PKTINFO,
-				       (char __user *)&one, sizeof(one));
-	set_fs(oldfs);
+	switch (svsk->sk_sk->sk_family) {
+	case AF_INET:
+		level = SOL_IP;
+		optname = IP_PKTINFO;
+		break;
+	case AF_INET6:
+		level = SOL_IPV6;
+		optname = IPV6_RECVPKTINFO;
+		break;
+	default:
+		BUG();
+	}
+	err = kernel_setsockopt(svsk->sk_sock, level, optname,
+					(char *)&one, sizeof(one));
+	dprintk("svc: kernel_setsockopt returned %d\n", err);
 }
 
 /*
@@ -826,21 +875,15 @@ failed:
 }
 
 /*
- * Receive data from a TCP socket.
+ * Receive data.
+ * If we haven't gotten the record length yet, get the next four bytes.
+ * Otherwise try to gobble up as much as possible up to the complete
+ * record length.
  */
-static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
+static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
 {
-	struct svc_sock	*svsk =
-		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
 	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
-	int		len;
-	struct kvec *vec;
-	int pnum, vlen;
-
-	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
-		svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
-		test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags),
-		test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
+	int len;
 
 	if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags))
 		/* sndbuf needs to have room for one request
@@ -861,10 +904,6 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 
 	clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
-	/* Receive data. If we haven't got the record length yet, get
-	 * the next four bytes. Otherwise try to gobble up as much as
-	 * possible up to the complete record length.
-	 */
 	if (svsk->sk_tcplen < sizeof(rpc_fraghdr)) {
 		int		want = sizeof(rpc_fraghdr) - svsk->sk_tcplen;
 		struct kvec	iov;
@@ -879,7 +918,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 			dprintk("svc: short recvfrom while reading record "
 				"length (%d of %d)\n", len, want);
 			svc_xprt_received(&svsk->sk_xprt);
-			return -EAGAIN; /* record header not complete */
+			goto err_again; /* record header not complete */
 		}
 
 		svsk->sk_reclen = ntohl(svsk->sk_reclen);
@@ -894,6 +933,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 					"per record not supported\n");
 			goto err_delete;
 		}
+
 		svsk->sk_reclen &= RPC_FRAGMENT_SIZE_MASK;
 		dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen);
 		if (svsk->sk_reclen > serv->sv_max_mesg) {
@@ -914,17 +954,121 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 		dprintk("svc: incomplete TCP record (%d of %d)\n",
 			len, svsk->sk_reclen);
 		svc_xprt_received(&svsk->sk_xprt);
-		return -EAGAIN;	/* record not complete */
+		goto err_again;	/* record not complete */
 	}
 	len = svsk->sk_reclen;
 	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
+	return len;
+ error:
+	if (len == -EAGAIN) {
+		dprintk("RPC: TCP recv_record got EAGAIN\n");
+		svc_xprt_received(&svsk->sk_xprt);
+	}
+	return len;
+ err_delete:
+	set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
+ err_again:
+	return -EAGAIN;
+}
+
+static int svc_process_calldir(struct svc_sock *svsk, struct svc_rqst *rqstp,
+			       struct rpc_rqst **reqpp, struct kvec *vec)
+{
+	struct rpc_rqst *req = NULL;
+	u32 *p;
+	u32 xid;
+	u32 calldir;
+	int len;
+
+	len = svc_recvfrom(rqstp, vec, 1, 8);
+	if (len < 0)
+		goto error;
+
+	p = (u32 *)rqstp->rq_arg.head[0].iov_base;
+	xid = *p++;
+	calldir = *p;
+
+	if (calldir == 0) {
+		/* REQUEST is the most common case */
+		vec[0] = rqstp->rq_arg.head[0];
+	} else {
+		/* REPLY */
+		if (svsk->sk_bc_xprt)
+			req = xprt_lookup_rqst(svsk->sk_bc_xprt, xid);
+
+		if (!req) {
+			printk(KERN_NOTICE
+				"%s: Got unrecognized reply: "
+				"calldir 0x%x sk_bc_xprt %p xid %08x\n",
+				__func__, ntohl(calldir),
+				svsk->sk_bc_xprt, xid);
+			vec[0] = rqstp->rq_arg.head[0];
+			goto out;
+		}
+
+		memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
+		       sizeof(struct xdr_buf));
+		/* copy the xid and call direction */
+		memcpy(req->rq_private_buf.head[0].iov_base,
+		       rqstp->rq_arg.head[0].iov_base, 8);
+		vec[0] = req->rq_private_buf.head[0];
+	}
+ out:
+	vec[0].iov_base += 8;
+	vec[0].iov_len -= 8;
+	len = svsk->sk_reclen - 8;
+ error:
+	*reqpp = req;
+	return len;
+}
+
+/*
+ * Receive data from a TCP socket.
+ */
+static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
+{
+	struct svc_sock	*svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
+	int		len;
+	struct kvec *vec;
+	int pnum, vlen;
+	struct rpc_rqst *req = NULL;
+
+	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
+		svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
+		test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags),
+		test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
+
+	len = svc_tcp_recv_record(svsk, rqstp);
+	if (len < 0)
+		goto error;
+
 	vec = rqstp->rq_vec;
 	vec[0] = rqstp->rq_arg.head[0];
 	vlen = PAGE_SIZE;
+
+	/*
+	 * We have enough data for the whole tcp record. Let's try and read the
+	 * first 8 bytes to get the xid and the call direction. We can use this
+	 * to figure out if this is a call or a reply to a callback. If
+	 * sk_reclen is < 8 (xid and calldir), then this is a malformed packet.
+	 * In that case, don't bother with the calldir and just read the data.
+	 * It will be rejected in svc_process.
+	 */
+	if (len >= 8) {
+		len = svc_process_calldir(svsk, rqstp, &req, vec);
+		if (len < 0)
+			goto err_again;
+		vlen -= 8;
+	}
+
 	pnum = 1;
 	while (vlen < len) {
-		vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]);
+		vec[pnum].iov_base = (req) ?
+			page_address(req->rq_private_buf.pages[pnum - 1]) :
+			page_address(rqstp->rq_pages[pnum]);
 		vec[pnum].iov_len = PAGE_SIZE;
 		pnum++;
 		vlen += PAGE_SIZE;
@@ -934,8 +1078,18 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 	/* Now receive data */
 	len = svc_recvfrom(rqstp, vec, pnum, len);
 	if (len < 0)
-		goto error;
+		goto err_again;
 
+	/*
+	 * Account for the 8 bytes we read earlier
+	 */
+	len += 8;
+
+	if (req) {
+		xprt_complete_rqst(req->rq_task, len);
+		len = 0;
+		goto out;
+	}
 	dprintk("svc: TCP complete record (%d bytes)\n", len);
 	rqstp->rq_arg.len = len;
 	rqstp->rq_arg.page_base = 0;
@@ -949,6 +1103,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 	rqstp->rq_xprt_ctxt   = NULL;
 	rqstp->rq_prot	      = IPPROTO_TCP;
 
+out:
 	/* Reset TCP read info */
 	svsk->sk_reclen = 0;
 	svsk->sk_tcplen = 0;
@@ -960,21 +1115,19 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 
 	return len;
 
- err_delete:
-	set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
-	return -EAGAIN;
-
- error:
+err_again:
 	if (len == -EAGAIN) {
 		dprintk("RPC: TCP recvfrom got EAGAIN\n");
 		svc_xprt_received(&svsk->sk_xprt);
-	} else {
+		return len;
+	}
+error:
+	if (len != -EAGAIN) {
 		printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
 		       svsk->sk_xprt.xpt_server->sv_name, -len);
-		goto err_delete;
+		set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
 	}
-
-	return len;
+	return -EAGAIN;
 }
 
 /*
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index f412a852bc73..fd46d42afa89 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -832,6 +832,11 @@ static void xprt_timer(struct rpc_task *task)
 	spin_unlock_bh(&xprt->transport_lock);
 }
 
+static inline int xprt_has_timer(struct rpc_xprt *xprt)
+{
+	return xprt->idle_timeout != 0;
+}
+
 /**
  * xprt_prepare_transmit - reserve the transport before sending a request
  * @task: RPC task about to send a request
@@ -1013,7 +1018,7 @@ void xprt_release(struct rpc_task *task)
 	if (!list_empty(&req->rq_list))
 		list_del(&req->rq_list);
 	xprt->last_used = jiffies;
-	if (list_empty(&xprt->recv))
+	if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
 		mod_timer(&xprt->timer,
 				xprt->last_used + xprt->idle_timeout);
 	spin_unlock_bh(&xprt->transport_lock);
@@ -1082,8 +1087,11 @@ found:
 #endif /* CONFIG_NFS_V4_1 */
 
 	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
-	setup_timer(&xprt->timer, xprt_init_autodisconnect,
-			(unsigned long)xprt);
+	if (xprt_has_timer(xprt))
+		setup_timer(&xprt->timer, xprt_init_autodisconnect,
+			    (unsigned long)xprt);
+	else
+		init_timer(&xprt->timer);
 	xprt->last_used = jiffies;
 	xprt->cwnd = RPC_INITCWND;
 	xprt->bind_index = 0;
@@ -1102,7 +1110,6 @@ found:
 
 	dprintk("RPC:       created transport %p with %u slots\n", xprt,
 			xprt->max_reqs);
-
 	return xprt;
 }
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5151f9f6c573..0cf5e8c27a10 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -730,12 +730,12 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
 		goto err;
 
 	mr = ib_alloc_fast_reg_mr(xprt->sc_pd, RPCSVC_MAXPAGES);
-	if (!mr)
+	if (IS_ERR(mr))
 		goto err_free_frmr;
 
 	pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,
 					 RPCSVC_MAXPAGES);
-	if (!pl)
+	if (IS_ERR(pl))
 		goto err_free_mr;
 
 	frmr->mr = mr;
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 62438f3a914d..bee415465754 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -32,6 +32,7 @@
 #include <linux/tcp.h>
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/xprtsock.h>
 #include <linux/file.h>
 #ifdef CONFIG_NFS_V4_1
@@ -43,6 +44,7 @@
 #include <net/udp.h>
 #include <net/tcp.h>
 
+#include "sunrpc.h"
 /*
  * xprtsock tunables
  */
@@ -2098,6 +2100,134 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 			xprt->stat.bklog_u);
 }
 
+/*
+ * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
+ * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
+ * to use the server side send routines.
+ */
+void *bc_malloc(struct rpc_task *task, size_t size)
+{
+	struct page *page;
+	struct rpc_buffer *buf;
+
+	BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
+	page = alloc_page(GFP_KERNEL);
+
+	if (!page)
+		return NULL;
+
+	buf = page_address(page);
+	buf->len = PAGE_SIZE;
+
+	return buf->data;
+}
+
+/*
+ * Free the space allocated in the bc_alloc routine
+ */
+void bc_free(void *buffer)
+{
+	struct rpc_buffer *buf;
+
+	if (!buffer)
+		return;
+
+	buf = container_of(buffer, struct rpc_buffer, data);
+	free_page((unsigned long)buf);
+}
+
+/*
+ * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
+ * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
+ */
+static int bc_sendto(struct rpc_rqst *req)
+{
+	int len;
+	struct xdr_buf *xbufp = &req->rq_snd_buf;
+	struct rpc_xprt *xprt = req->rq_xprt;
+	struct sock_xprt *transport =
+				container_of(xprt, struct sock_xprt, xprt);
+	struct socket *sock = transport->sock;
+	unsigned long headoff;
+	unsigned long tailoff;
+
+	/*
+	 * Set up the rpc header and record marker stuff
+	 */
+	xs_encode_tcp_record_marker(xbufp);
+
+	tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
+	headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
+	len = svc_send_common(sock, xbufp,
+			      virt_to_page(xbufp->head[0].iov_base), headoff,
+			      xbufp->tail[0].iov_base, tailoff);
+
+	if (len != xbufp->len) {
+		printk(KERN_NOTICE "Error sending entire callback!\n");
+		len = -EAGAIN;
+	}
+
+	return len;
+}
+
+/*
+ * The send routine. Borrows from svc_send
+ */
+static int bc_send_request(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct svc_xprt	*xprt;
+	struct svc_sock         *svsk;
+	u32                     len;
+
+	dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
+	/*
+	 * Get the server socket associated with this callback xprt
+	 */
+	xprt = req->rq_xprt->bc_xprt;
+	svsk = container_of(xprt, struct svc_sock, sk_xprt);
+
+	/*
+	 * Grab the mutex to serialize data as the connection is shared
+	 * with the fore channel
+	 */
+	if (!mutex_trylock(&xprt->xpt_mutex)) {
+		rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
+		if (!mutex_trylock(&xprt->xpt_mutex))
+			return -EAGAIN;
+		rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
+	}
+	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
+		len = -ENOTCONN;
+	else
+		len = bc_sendto(req);
+	mutex_unlock(&xprt->xpt_mutex);
+
+	if (len > 0)
+		len = 0;
+
+	return len;
+}
+
+/*
+ * The close routine. Since this is client initiated, we do nothing
+ */
+
+static void bc_close(struct rpc_xprt *xprt)
+{
+	return;
+}
+
+/*
+ * The xprt destroy routine. Again, because this connection is client
+ * initiated, we do nothing
+ */
+
+static void bc_destroy(struct rpc_xprt *xprt)
+{
+	return;
+}
+
 static struct rpc_xprt_ops xs_udp_ops = {
 	.set_buffer_size	= xs_udp_set_buffer_size,
 	.reserve_xprt		= xprt_reserve_xprt_cong,
@@ -2134,6 +2264,22 @@ static struct rpc_xprt_ops xs_tcp_ops = {
 	.print_stats		= xs_tcp_print_stats,
 };
 
+/*
+ * The rpc_xprt_ops for the server backchannel
+ */
+
+static struct rpc_xprt_ops bc_tcp_ops = {
+	.reserve_xprt		= xprt_reserve_xprt,
+	.release_xprt		= xprt_release_xprt,
+	.buf_alloc		= bc_malloc,
+	.buf_free		= bc_free,
+	.send_request		= bc_send_request,
+	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
+	.close			= bc_close,
+	.destroy		= bc_destroy,
+	.print_stats		= xs_tcp_print_stats,
+};
+
 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
 				      unsigned int slot_table_size)
 {
@@ -2322,11 +2468,93 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
 	return ERR_PTR(-EINVAL);
 }
 
+/**
+ * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
+ * @args: rpc transport creation arguments
+ *
+ */
+static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
+{
+	struct sockaddr *addr = args->dstaddr;
+	struct rpc_xprt *xprt;
+	struct sock_xprt *transport;
+	struct svc_sock *bc_sock;
+
+	if (!args->bc_xprt)
+		ERR_PTR(-EINVAL);
+
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	if (IS_ERR(xprt))
+		return xprt;
+	transport = container_of(xprt, struct sock_xprt, xprt);
+
+	xprt->prot = IPPROTO_TCP;
+	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
+	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
+	xprt->timeout = &xs_tcp_default_timeout;
+
+	/* backchannel */
+	xprt_set_bound(xprt);
+	xprt->bind_timeout = 0;
+	xprt->connect_timeout = 0;
+	xprt->reestablish_timeout = 0;
+	xprt->idle_timeout = 0;
+
+	/*
+	 * The backchannel uses the same socket connection as the
+	 * forechannel
+	 */
+	xprt->bc_xprt = args->bc_xprt;
+	bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
+	bc_sock->sk_bc_xprt = xprt;
+	transport->sock = bc_sock->sk_sock;
+	transport->inet = bc_sock->sk_sk;
+
+	xprt->ops = &bc_tcp_ops;
+
+	switch (addr->sa_family) {
+	case AF_INET:
+		xs_format_peer_addresses(xprt, "tcp",
+					 RPCBIND_NETID_TCP);
+		break;
+	case AF_INET6:
+		xs_format_peer_addresses(xprt, "tcp",
+				   RPCBIND_NETID_TCP6);
+		break;
+	default:
+		kfree(xprt);
+		return ERR_PTR(-EAFNOSUPPORT);
+	}
+
+	if (xprt_bound(xprt))
+		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
+				xprt->address_strings[RPC_DISPLAY_ADDR],
+				xprt->address_strings[RPC_DISPLAY_PORT],
+				xprt->address_strings[RPC_DISPLAY_PROTO]);
+	else
+		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
+				xprt->address_strings[RPC_DISPLAY_ADDR],
+				xprt->address_strings[RPC_DISPLAY_PROTO]);
+
+	/*
+	 * Since we don't want connections for the backchannel, we set
+	 * the xprt status to connected
+	 */
+	xprt_set_connected(xprt);
+
+
+	if (try_module_get(THIS_MODULE))
+		return xprt;
+	kfree(xprt->slot);
+	kfree(xprt);
+	return ERR_PTR(-EINVAL);
+}
+
 static struct xprt_class	xs_udp_transport = {
 	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
 	.name		= "udp",
 	.owner		= THIS_MODULE,
-	.ident		= IPPROTO_UDP,
+	.ident		= XPRT_TRANSPORT_UDP,
 	.setup		= xs_setup_udp,
 };
 
@@ -2334,10 +2562,18 @@ static struct xprt_class	xs_tcp_transport = {
 	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
 	.name		= "tcp",
 	.owner		= THIS_MODULE,
-	.ident		= IPPROTO_TCP,
+	.ident		= XPRT_TRANSPORT_TCP,
 	.setup		= xs_setup_tcp,
 };
 
+static struct xprt_class	xs_bc_tcp_transport = {
+	.list		= LIST_HEAD_INIT(xs_bc_tcp_transport.list),
+	.name		= "tcp NFSv4.1 backchannel",
+	.owner		= THIS_MODULE,
+	.ident		= XPRT_TRANSPORT_BC_TCP,
+	.setup		= xs_setup_bc_tcp,
+};
+
 /**
  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
  *
@@ -2351,6 +2587,7 @@ int init_socket_xprt(void)
 
 	xprt_register_transport(&xs_udp_transport);
 	xprt_register_transport(&xs_tcp_transport);
+	xprt_register_transport(&xs_bc_tcp_transport);
 
 	return 0;
 }
@@ -2370,6 +2607,7 @@ void cleanup_socket_xprt(void)
 
 	xprt_unregister_transport(&xs_udp_transport);
 	xprt_unregister_transport(&xs_tcp_transport);
+	xprt_unregister_transport(&xs_bc_tcp_transport);
 }
 
 static int param_set_uint_minmax(const char *val, struct kernel_param *kp,