summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2008-02-02 14:31:28 +1100
committerLinus Torvalds <torvalds@linux-foundation.org>2008-02-02 14:31:28 +1100
commit63e9b66e29357dd12e8b1d3ebf7036e7591f81e3 (patch)
tree5aa6a70a8f4bbf306e2825a1e2fa2660c2c1c187 /net
parent687fcdf741e4a268c2c7bac8b3734de761bb9719 (diff)
parentea339d46b93c7b16e067a29aad1812f7a389815a (diff)
downloadlinux-63e9b66e29357dd12e8b1d3ebf7036e7591f81e3.tar.gz
Merge branch 'for-linus' of git://linux-nfs.org/~bfields/linux
* 'for-linus' of git://linux-nfs.org/~bfields/linux: (100 commits)
  SUNRPC: RPC program information is stored in unsigned integers
  SUNRPC: Move exported symbol definitions after function declaration part 2
  NLM: tear down RPC clients in nlm_shutdown_hosts
  SUNRPC: spin svc_rqst initialization to its own function
  nfsd: more careful input validation in nfsctl write methods
  lockd: minor log message fix
  knfsd: don't bother mapping putrootfh enoent to eperm
  rdma: makefile
  rdma: ONCRPC RDMA protocol marshalling
  rdma: SVCRDMA sendto
  rdma: SVCRDMA recvfrom
  rdma: SVCRDMA Core Transport Services
  rdma: SVCRDMA Transport Module
  rdma: SVCRMDA Header File
  svc: Add svc_xprt_names service to replace svc_sock_names
  knfsd: Support adding transports by writing portlist file
  svc: Add svc API that queries for a transport instance
  svc: Add /proc/sys/sunrpc/transport files
  svc: Add transport hdr size for defer/revisit
  svc: Move the xprt independent code to the svc_xprt.c file
  ...
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/Makefile3
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c93
-rw-r--r--net/sunrpc/cache.c152
-rw-r--r--net/sunrpc/stats.c7
-rw-r--r--net/sunrpc/sunrpc_syms.c52
-rw-r--r--net/sunrpc/svc.c90
-rw-r--r--net/sunrpc/svc_xprt.c1055
-rw-r--r--net/sunrpc/svcauth.c6
-rw-r--r--net/sunrpc/svcauth_unix.c59
-rw-r--r--net/sunrpc/svcsock.c1311
-rw-r--r--net/sunrpc/sysctl.c31
-rw-r--r--net/sunrpc/xdr.c8
-rw-r--r--net/sunrpc/xprtrdma/Makefile5
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c266
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c412
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c586
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c520
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c1080
18 files changed, 4530 insertions, 1206 deletions
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 5c69a725e530..92e1dbe50947 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -11,6 +11,7 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
 	    auth.o auth_null.o auth_unix.o \
 	    svc.o svcsock.o svcauth.o svcauth_unix.o \
 	    rpcb_clnt.o timer.o xdr.o \
-	    sunrpc_syms.o cache.o rpc_pipe.o
+	    sunrpc_syms.o cache.o rpc_pipe.o \
+	    svc_xprt.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
 sunrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 73940df6c460..481f984e9a22 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -224,38 +224,34 @@ static int rsi_parse(struct cache_detail *cd,
 
 	/* major/minor */
 	len = qword_get(&mesg, buf, mlen);
-	if (len < 0)
+	if (len <= 0)
 		goto out;
-	if (len == 0) {
+	rsii.major_status = simple_strtoul(buf, &ep, 10);
+	if (*ep)
+		goto out;
+	len = qword_get(&mesg, buf, mlen);
+	if (len <= 0)
+		goto out;
+	rsii.minor_status = simple_strtoul(buf, &ep, 10);
+	if (*ep)
 		goto out;
-	} else {
-		rsii.major_status = simple_strtoul(buf, &ep, 10);
-		if (*ep)
-			goto out;
-		len = qword_get(&mesg, buf, mlen);
-		if (len <= 0)
-			goto out;
-		rsii.minor_status = simple_strtoul(buf, &ep, 10);
-		if (*ep)
-			goto out;
 
-		/* out_handle */
-		len = qword_get(&mesg, buf, mlen);
-		if (len < 0)
-			goto out;
-		status = -ENOMEM;
-		if (dup_to_netobj(&rsii.out_handle, buf, len))
-			goto out;
+	/* out_handle */
+	len = qword_get(&mesg, buf, mlen);
+	if (len < 0)
+		goto out;
+	status = -ENOMEM;
+	if (dup_to_netobj(&rsii.out_handle, buf, len))
+		goto out;
 
-		/* out_token */
-		len = qword_get(&mesg, buf, mlen);
-		status = -EINVAL;
-		if (len < 0)
-			goto out;
-		status = -ENOMEM;
-		if (dup_to_netobj(&rsii.out_token, buf, len))
-			goto out;
-	}
+	/* out_token */
+	len = qword_get(&mesg, buf, mlen);
+	status = -EINVAL;
+	if (len < 0)
+		goto out;
+	status = -ENOMEM;
+	if (dup_to_netobj(&rsii.out_token, buf, len))
+		goto out;
 	rsii.h.expiry_time = expiry;
 	rsip = rsi_update(&rsii, rsip);
 	status = 0;
@@ -975,6 +971,7 @@ static int svcauth_gss_handle_init(struct svc_rqst *rqstp,
 	struct kvec *resv = &rqstp->rq_res.head[0];
 	struct xdr_netobj tmpobj;
 	struct rsi *rsip, rsikey;
+	int ret;
 
 	/* Read the verifier; should be NULL: */
 	*authp = rpc_autherr_badverf;
@@ -1014,23 +1011,27 @@ static int svcauth_gss_handle_init(struct svc_rqst *rqstp,
 		/* No upcall result: */
 		return SVC_DROP;
 	case 0:
+		ret = SVC_DROP;
 		/* Got an answer to the upcall; use it: */
 		if (gss_write_init_verf(rqstp, rsip))
-			return SVC_DROP;
+			goto out;
 		if (resv->iov_len + 4 > PAGE_SIZE)
-			return SVC_DROP;
+			goto out;
 		svc_putnl(resv, RPC_SUCCESS);
 		if (svc_safe_putnetobj(resv, &rsip->out_handle))
-			return SVC_DROP;
+			goto out;
 		if (resv->iov_len + 3 * 4 > PAGE_SIZE)
-			return SVC_DROP;
+			goto out;
 		svc_putnl(resv, rsip->major_status);
 		svc_putnl(resv, rsip->minor_status);
 		svc_putnl(resv, GSS_SEQ_WIN);
 		if (svc_safe_putnetobj(resv, &rsip->out_token))
-			return SVC_DROP;
+			goto out;
 	}
-	return SVC_COMPLETE;
+	ret = SVC_COMPLETE;
+out:
+	cache_put(&rsip->h, &rsi_cache);
+	return ret;
 }
 
 /*
@@ -1125,6 +1126,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)
 	case RPC_GSS_PROC_DESTROY:
 		if (gss_write_verf(rqstp, rsci->mechctx, gc->gc_seq))
 			goto auth_err;
+		rsci->h.expiry_time = get_seconds();
 		set_bit(CACHE_NEGATIVE, &rsci->h.flags);
 		if (resv->iov_len + 4 > PAGE_SIZE)
 			goto drop;
@@ -1386,19 +1388,26 @@ int
 gss_svc_init(void)
 {
 	int rv = svc_auth_register(RPC_AUTH_GSS, &svcauthops_gss);
-	if (rv == 0) {
-		cache_register(&rsc_cache);
-		cache_register(&rsi_cache);
-	}
+	if (rv)
+		return rv;
+	rv = cache_register(&rsc_cache);
+	if (rv)
+		goto out1;
+	rv = cache_register(&rsi_cache);
+	if (rv)
+		goto out2;
+	return 0;
+out2:
+	cache_unregister(&rsc_cache);
+out1:
+	svc_auth_unregister(RPC_AUTH_GSS);
 	return rv;
 }
 
 void
 gss_svc_shutdown(void)
 {
-	if (cache_unregister(&rsc_cache))
-		printk(KERN_ERR "auth_rpcgss: failed to unregister rsc cache\n");
-	if (cache_unregister(&rsi_cache))
-		printk(KERN_ERR "auth_rpcgss: failed to unregister rsi cache\n");
+	cache_unregister(&rsc_cache);
+	cache_unregister(&rsi_cache);
 	svc_auth_unregister(RPC_AUTH_GSS);
 }
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 73f053d0cc7a..636c8e04e0be 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -245,6 +245,7 @@ int cache_check(struct cache_detail *detail,
 		cache_put(h, detail);
 	return rv;
 }
+EXPORT_SYMBOL(cache_check);
 
 /*
  * caches need to be periodically cleaned.
@@ -290,44 +291,78 @@ static const struct file_operations cache_flush_operations;
 static void do_cache_clean(struct work_struct *work);
 static DECLARE_DELAYED_WORK(cache_cleaner, do_cache_clean);
 
-void cache_register(struct cache_detail *cd)
+static void remove_cache_proc_entries(struct cache_detail *cd)
 {
-	cd->proc_ent = proc_mkdir(cd->name, proc_net_rpc);
-	if (cd->proc_ent) {
-		struct proc_dir_entry *p;
-		cd->proc_ent->owner = cd->owner;
-		cd->channel_ent = cd->content_ent = NULL;
+	if (cd->proc_ent == NULL)
+		return;
+	if (cd->flush_ent)
+		remove_proc_entry("flush", cd->proc_ent);
+	if (cd->channel_ent)
+		remove_proc_entry("channel", cd->proc_ent);
+	if (cd->content_ent)
+		remove_proc_entry("content", cd->proc_ent);
+	cd->proc_ent = NULL;
+	remove_proc_entry(cd->name, proc_net_rpc);
+}
 
-		p = create_proc_entry("flush", S_IFREG|S_IRUSR|S_IWUSR,
-				      cd->proc_ent);
-		cd->flush_ent =  p;
-		if (p) {
-			p->proc_fops = &cache_flush_operations;
-			p->owner = cd->owner;
-			p->data = cd;
-		}
+#ifdef CONFIG_PROC_FS
+static int create_cache_proc_entries(struct cache_detail *cd)
+{
+	struct proc_dir_entry *p;
 
-		if (cd->cache_request || cd->cache_parse) {
-			p = create_proc_entry("channel", S_IFREG|S_IRUSR|S_IWUSR,
-					      cd->proc_ent);
-			cd->channel_ent = p;
-			if (p) {
-				p->proc_fops = &cache_file_operations;
-				p->owner = cd->owner;
-				p->data = cd;
-			}
-		}
-		if (cd->cache_show) {
-			p = create_proc_entry("content", S_IFREG|S_IRUSR|S_IWUSR,
-					      cd->proc_ent);
-			cd->content_ent = p;
-			if (p) {
-				p->proc_fops = &content_file_operations;
-				p->owner = cd->owner;
-				p->data = cd;
-			}
-		}
+	cd->proc_ent = proc_mkdir(cd->name, proc_net_rpc);
+	if (cd->proc_ent == NULL)
+		goto out_nomem;
+	cd->proc_ent->owner = cd->owner;
+	cd->channel_ent = cd->content_ent = NULL;
+
+	p = create_proc_entry("flush", S_IFREG|S_IRUSR|S_IWUSR, cd->proc_ent);
+	cd->flush_ent = p;
+	if (p == NULL)
+		goto out_nomem;
+	p->proc_fops = &cache_flush_operations;
+	p->owner = cd->owner;
+	p->data = cd;
+
+	if (cd->cache_request || cd->cache_parse) {
+		p = create_proc_entry("channel", S_IFREG|S_IRUSR|S_IWUSR,
+				      cd->proc_ent);
+		cd->channel_ent = p;
+		if (p == NULL)
+			goto out_nomem;
+		p->proc_fops = &cache_file_operations;
+		p->owner = cd->owner;
+		p->data = cd;
 	}
+	if (cd->cache_show) {
+		p = create_proc_entry("content", S_IFREG|S_IRUSR|S_IWUSR,
+				      cd->proc_ent);
+		cd->content_ent = p;
+		if (p == NULL)
+			goto out_nomem;
+		p->proc_fops = &content_file_operations;
+		p->owner = cd->owner;
+		p->data = cd;
+	}
+	return 0;
+out_nomem:
+	remove_cache_proc_entries(cd);
+	return -ENOMEM;
+}
+#else /* CONFIG_PROC_FS */
+static int create_cache_proc_entries(struct cache_detail *cd)
+{
+	return 0;
+}
+#endif
+
+int cache_register(struct cache_detail *cd)
+{
+	int ret;
+
+	ret = create_cache_proc_entries(cd);
+	if (ret)
+		return ret;
 	rwlock_init(&cd->hash_lock);
 	INIT_LIST_HEAD(&cd->queue);
 	spin_lock(&cache_list_lock);
@@ -341,9 +376,11 @@ void cache_register(struct cache_detail *cd)
 
 	/* start the cleaning process */
 	schedule_delayed_work(&cache_cleaner, 0);
+	return 0;
 }
+EXPORT_SYMBOL(cache_register);
 
-int cache_unregister(struct cache_detail *cd)
+void cache_unregister(struct cache_detail *cd)
 {
 	cache_purge(cd);
 	spin_lock(&cache_list_lock);
@@ -351,30 +388,23 @@ int cache_unregister(struct cache_detail *cd)
 	if (cd->entries || atomic_read(&cd->inuse)) {
 		write_unlock(&cd->hash_lock);
 		spin_unlock(&cache_list_lock);
-		return -EBUSY;
+		goto out;
 	}
 	if (current_detail == cd)
 		current_detail = NULL;
 	list_del_init(&cd->others);
 	write_unlock(&cd->hash_lock);
 	spin_unlock(&cache_list_lock);
-	if (cd->proc_ent) {
-		if (cd->flush_ent)
-			remove_proc_entry("flush", cd->proc_ent);
-		if (cd->channel_ent)
-			remove_proc_entry("channel", cd->proc_ent);
-		if (cd->content_ent)
-			remove_proc_entry("content", cd->proc_ent);
-
-		cd->proc_ent = NULL;
-		remove_proc_entry(cd->name, proc_net_rpc);
-	}
+	remove_cache_proc_entries(cd);
 	if (list_empty(&cache_list)) {
 		/* module must be being unloaded so its safe to kill the worker */
 		cancel_delayed_work_sync(&cache_cleaner);
 	}
-	return 0;
+	return;
+out:
+	printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
 }
+EXPORT_SYMBOL(cache_unregister);
 
 /* clean cache tries to find something to clean
  * and cleans it.
@@ -489,6 +519,7 @@ void cache_flush(void)
 	while (cache_clean() != -1)
 		cond_resched();
 }
+EXPORT_SYMBOL(cache_flush);
 
 void cache_purge(struct cache_detail *detail)
 {
@@ -497,7 +528,7 @@ void cache_purge(struct cache_detail *detail)
 	cache_flush();
 	detail->flush_time = 1;
 }
-
+EXPORT_SYMBOL(cache_purge);
 
 
 /*
@@ -634,13 +665,13 @@ void cache_clean_deferred(void *owner)
 /*
  * communicate with user-space
  *
- * We have a magic /proc file - /proc/sunrpc/cache
- * On read, you get a full request, or block
- * On write, an update request is processed
- * Poll works if anything to read, and always allows write
+ * We have a magic /proc file - /proc/sunrpc/<cachename>/channel.
+ * On read, you get a full request, or block.
+ * On write, an update request is processed.
+ * Poll works if anything to read, and always allows write.
  *
  * Implemented by linked list of requests.  Each open file has
- * a ->private that also exists in this list.  New request are added
+ * a ->private that also exists in this list.  New requests are added
  * to the end and may wakeup and preceding readers.
  * New readers are added to the head.  If, on read, an item is found with
  * CACHE_UPCALLING clear, we free it from the list.
@@ -963,6 +994,7 @@ void qword_add(char **bpp, int *lp, char *str)
 	*bpp = bp;
 	*lp = len;
 }
+EXPORT_SYMBOL(qword_add);
 
 void qword_addhex(char **bpp, int *lp, char *buf, int blen)
 {
@@ -991,6 +1023,7 @@ void qword_addhex(char **bpp, int *lp, char *buf, int blen)
 	*bpp = bp;
 	*lp = len;
 }
+EXPORT_SYMBOL(qword_addhex);
 
 static void warn_no_listener(struct cache_detail *detail)
 {
@@ -1113,6 +1146,7 @@ int qword_get(char **bpp, char *dest, int bufsize)
 	*dest = '\0';
 	return len;
 }
+EXPORT_SYMBOL(qword_get);
 
 
 /*
@@ -1244,18 +1278,18 @@ static ssize_t read_flush(struct file *file, char __user *buf,
 	struct cache_detail *cd = PDE(file->f_path.dentry->d_inode)->data;
 	char tbuf[20];
 	unsigned long p = *ppos;
-	int len;
+	size_t len;
 
 	sprintf(tbuf, "%lu\n", cd->flush_time);
 	len = strlen(tbuf);
 	if (p >= len)
 		return 0;
 	len -= p;
-	if (len > count) len = count;
+	if (len > count)
+		len = count;
 	if (copy_to_user(buf, (void*)(tbuf+p), len))
-		len = -EFAULT;
-	else
-		*ppos += len;
+		return -EFAULT;
+	*ppos += len;
 	return len;
 }
 
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index 74df2d358e61..5a16875f5ac8 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -33,7 +33,7 @@ struct proc_dir_entry	*proc_net_rpc = NULL;
 static int rpc_proc_show(struct seq_file *seq, void *v) {
 	const struct rpc_stat	*statp = seq->private;
 	const struct rpc_program *prog = statp->program;
-	int		i, j;
+	unsigned int i, j;
 
 	seq_printf(seq,
 		"net %u %u %u %u\n",
@@ -81,7 +81,7 @@ void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) {
 	const struct svc_program *prog = statp->program;
 	const struct svc_procedure *proc;
 	const struct svc_version *vers;
-	int		i, j;
+	unsigned int i, j;
 
 	seq_printf(seq,
 		"net %u %u %u %u\n",
@@ -106,6 +106,7 @@ void svc_seq_show(struct seq_file *seq, const struct svc_stat *statp) {
 		seq_putc(seq, '\n');
 	}
 }
+EXPORT_SYMBOL(svc_seq_show);
 
 /**
  * rpc_alloc_iostats - allocate an rpc_iostats structure
@@ -255,12 +256,14 @@ svc_proc_register(struct svc_stat *statp, const struct file_operations *fops)
 {
 	return do_register(statp->program->pg_name, statp, fops);
 }
+EXPORT_SYMBOL(svc_proc_register);
 
 void
 svc_proc_unregister(const char *name)
 {
 	remove_proc_entry(name, proc_net_rpc);
 }
+EXPORT_SYMBOL(svc_proc_unregister);
 
 void
 rpc_proc_init(void)
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index 1a7e309d008b..843629f55763 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -22,48 +22,6 @@
 #include <linux/sunrpc/rpc_pipe_fs.h>
 #include <linux/sunrpc/xprtsock.h>
 
-/* RPC server stuff */
-EXPORT_SYMBOL(svc_create);
-EXPORT_SYMBOL(svc_create_thread);
-EXPORT_SYMBOL(svc_create_pooled);
-EXPORT_SYMBOL(svc_set_num_threads);
-EXPORT_SYMBOL(svc_exit_thread);
-EXPORT_SYMBOL(svc_destroy);
-EXPORT_SYMBOL(svc_drop);
-EXPORT_SYMBOL(svc_process);
-EXPORT_SYMBOL(svc_recv);
-EXPORT_SYMBOL(svc_wake_up);
-EXPORT_SYMBOL(svc_makesock);
-EXPORT_SYMBOL(svc_reserve);
-EXPORT_SYMBOL(svc_auth_register);
-EXPORT_SYMBOL(auth_domain_lookup);
-EXPORT_SYMBOL(svc_authenticate);
-EXPORT_SYMBOL(svc_set_client);
-
-/* RPC statistics */
-#ifdef CONFIG_PROC_FS
-EXPORT_SYMBOL(svc_proc_register);
-EXPORT_SYMBOL(svc_proc_unregister);
-EXPORT_SYMBOL(svc_seq_show);
-#endif
-
-/* caching... */
-EXPORT_SYMBOL(auth_domain_find);
-EXPORT_SYMBOL(auth_domain_put);
-EXPORT_SYMBOL(auth_unix_add_addr);
-EXPORT_SYMBOL(auth_unix_forget_old);
-EXPORT_SYMBOL(auth_unix_lookup);
-EXPORT_SYMBOL(cache_check);
-EXPORT_SYMBOL(cache_flush);
-EXPORT_SYMBOL(cache_purge);
-EXPORT_SYMBOL(cache_register);
-EXPORT_SYMBOL(cache_unregister);
-EXPORT_SYMBOL(qword_add);
-EXPORT_SYMBOL(qword_addhex);
-EXPORT_SYMBOL(qword_get);
-EXPORT_SYMBOL(svcauth_unix_purge);
-EXPORT_SYMBOL(unix_domain_find);
-
 extern struct cache_detail ip_map_cache, unix_gid_cache;
 
 static int __init
@@ -85,7 +43,8 @@ init_sunrpc(void)
 #endif
 	cache_register(&ip_map_cache);
 	cache_register(&unix_gid_cache);
-	init_socket_xprt();
+	svc_init_xprt_sock();	/* svc sock transport */
+	init_socket_xprt();	/* clnt sock transport */
 	rpcauth_init_module();
 out:
 	return err;
@@ -96,12 +55,11 @@ cleanup_sunrpc(void)
 {
 	rpcauth_remove_module();
 	cleanup_socket_xprt();
+	svc_cleanup_xprt_sock();
 	unregister_rpc_pipefs();
 	rpc_destroy_mempool();
-	if (cache_unregister(&ip_map_cache))
-		printk(KERN_ERR "sunrpc: failed to unregister ip_map cache\n");
-	if (cache_unregister(&unix_gid_cache))
-	      printk(KERN_ERR "sunrpc: failed to unregister unix_gid cache\n");
+	cache_unregister(&ip_map_cache);
+	cache_unregister(&unix_gid_cache);
 #ifdef RPC_DEBUG
 	rpc_unregister_sysctl();
 #endif
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 4ad5fbbb18b4..a290e1523297 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -364,7 +364,7 @@ __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
 	   void (*shutdown)(struct svc_serv *serv))
 {
 	struct svc_serv	*serv;
-	int vers;
+	unsigned int vers;
 	unsigned int xdrsize;
 	unsigned int i;
 
@@ -433,6 +433,7 @@ svc_create(struct svc_program *prog, unsigned int bufsize,
 {
 	return __svc_create(prog, bufsize, /*npools*/1, shutdown);
 }
+EXPORT_SYMBOL(svc_create);
 
 struct svc_serv *
 svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
@@ -452,6 +453,7 @@ svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 
 	return serv;
 }
+EXPORT_SYMBOL(svc_create_pooled);
 
 /*
  * Destroy an RPC service.  Should be called with the BKL held
@@ -459,9 +461,6 @@ svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
 void
 svc_destroy(struct svc_serv *serv)
 {
-	struct svc_sock	*svsk;
-	struct svc_sock *tmp;
-
 	dprintk("svc: svc_destroy(%s, %d)\n",
 				serv->sv_program->pg_name,
 				serv->sv_nrthreads);
@@ -476,14 +475,12 @@ svc_destroy(struct svc_serv *serv)
 
 	del_timer_sync(&serv->sv_temptimer);
 
-	list_for_each_entry_safe(svsk, tmp, &serv->sv_tempsocks, sk_list)
-		svc_force_close_socket(svsk);
+	svc_close_all(&serv->sv_tempsocks);
 
 	if (serv->sv_shutdown)
 		serv->sv_shutdown(serv);
 
-	list_for_each_entry_safe(svsk, tmp, &serv->sv_permsocks, sk_list)
-		svc_force_close_socket(svsk);
+	svc_close_all(&serv->sv_permsocks);
 
 	BUG_ON(!list_empty(&serv->sv_permsocks));
 	BUG_ON(!list_empty(&serv->sv_tempsocks));
@@ -498,6 +495,7 @@ svc_destroy(struct svc_serv *serv)
 	kfree(serv->sv_pools);
 	kfree(serv);
 }
+EXPORT_SYMBOL(svc_destroy);
 
 /*
  * Allocate an RPC server's buffer space.
@@ -536,31 +534,17 @@ svc_release_buffer(struct svc_rqst *rqstp)
 			put_page(rqstp->rq_pages[i]);
 }
 
-/*
- * Create a thread in the given pool.  Caller must hold BKL.
- * On a NUMA or SMP machine, with a multi-pool serv, the thread
- * will be restricted to run on the cpus belonging to the pool.
- */
-static int
-__svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
-		    struct svc_pool *pool)
+struct svc_rqst *
+svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool)
 {
 	struct svc_rqst	*rqstp;
-	int		error = -ENOMEM;
-	int		have_oldmask = 0;
-	cpumask_t	oldmask;
 
 	rqstp = kzalloc(sizeof(*rqstp), GFP_KERNEL);
 	if (!rqstp)
-		goto out;
+		goto out_enomem;
 
 	init_waitqueue_head(&rqstp->rq_wait);
 
-	if (!(rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL))
-	 || !(rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL))
-	 || !svc_init_buffer(rqstp, serv->sv_max_mesg))
-		goto out_thread;
-
 	serv->sv_nrthreads++;
 	spin_lock_bh(&pool->sp_lock);
 	pool->sp_nrthreads++;
@@ -569,6 +553,45 @@ __svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
 	rqstp->rq_server = serv;
 	rqstp->rq_pool = pool;
 
+	rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
+	if (!rqstp->rq_argp)
+		goto out_thread;
+
+	rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
+	if (!rqstp->rq_resp)
+		goto out_thread;
+
+	if (!svc_init_buffer(rqstp, serv->sv_max_mesg))
+		goto out_thread;
+
+	return rqstp;
+out_thread:
+	svc_exit_thread(rqstp);
+out_enomem:
+	return ERR_PTR(-ENOMEM);
+}
+EXPORT_SYMBOL(svc_prepare_thread);
+
+/*
+ * Create a thread in the given pool.  Caller must hold BKL.
+ * On a NUMA or SMP machine, with a multi-pool serv, the thread
+ * will be restricted to run on the cpus belonging to the pool.
+ */
+static int
+__svc_create_thread(svc_thread_fn func, struct svc_serv *serv,
+		    struct svc_pool *pool)
+{
+	struct svc_rqst	*rqstp;
+	int		error = -ENOMEM;
+	int		have_oldmask = 0;
+	cpumask_t	oldmask;
+
+	rqstp = svc_prepare_thread(serv, pool);
+	if (IS_ERR(rqstp)) {
+		error = PTR_ERR(rqstp);
+		goto out;
+	}
+
 	if (serv->sv_nrpools > 1)
 		have_oldmask = svc_pool_map_set_cpumask(pool->sp_id, &oldmask);
 
@@ -597,6 +620,7 @@ svc_create_thread(svc_thread_fn func, struct svc_serv *serv)
 {
 	return __svc_create_thread(func, serv, &serv->sv_pools[0]);
 }
+EXPORT_SYMBOL(svc_create_thread);
 
 /*
  * Choose a pool in which to create a new thread, for svc_set_num_threads
@@ -700,6 +724,7 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
 
 	return error;
 }
+EXPORT_SYMBOL(svc_set_num_threads);
 
 /*
  * Called from a server thread as it's exiting.  Caller must hold BKL.
@@ -726,6 +751,7 @@ svc_exit_thread(struct svc_rqst *rqstp)
 	if (serv)
 		svc_destroy(serv);
 }
+EXPORT_SYMBOL(svc_exit_thread);
 
 /*
  * Register an RPC service with the local portmapper.
@@ -737,7 +763,8 @@ svc_register(struct svc_serv *serv, int proto, unsigned short port)
 {
 	struct svc_program	*progp;
 	unsigned long		flags;
-	int			i, error = 0, dummy;
+	unsigned int		i;
+	int			error = 0, dummy;
 
 	if (!port)
 		clear_thread_flag(TIF_SIGPENDING);
@@ -840,9 +867,9 @@ svc_process(struct svc_rqst *rqstp)
 	rqstp->rq_res.tail[0].iov_len = 0;
 	/* Will be turned off only in gss privacy case: */
 	rqstp->rq_splice_ok = 1;
-	/* tcp needs a space for the record length... */
-	if (rqstp->rq_prot == IPPROTO_TCP)
-		svc_putnl(resv, 0);
+
+	/* Setup reply header */
+	rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
 
 	rqstp->rq_xid = svc_getu32(argv);
 	svc_putu32(resv, rqstp->rq_xid);
@@ -1049,16 +1076,15 @@ err_bad:
 	svc_putnl(resv, ntohl(rpc_stat));
 	goto sendit;
 }
+EXPORT_SYMBOL(svc_process);
 
 /*
  * Return (transport-specific) limit on the rpc payload.
  */
 u32 svc_max_payload(const struct svc_rqst *rqstp)
 {
-	int max = RPCSVC_MAXPAYLOAD_TCP;
+	u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
 
-	if (rqstp->rq_sock->sk_sock->type == SOCK_DGRAM)
-		max = RPCSVC_MAXPAYLOAD_UDP;
 	if (rqstp->rq_server->sv_max_payload < max)
 		max = rqstp->rq_server->sv_max_payload;
 	return max;
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
new file mode 100644
index 000000000000..ea377e06afae
--- /dev/null
+++ b/net/sunrpc/svc_xprt.c
@@ -0,0 +1,1055 @@
+/*
+ * linux/net/sunrpc/svc_xprt.c
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sched.h>
+#include <linux/errno.h>
+#include <linux/fcntl.h>
+#include <linux/net.h>
+#include <linux/in.h>
+#include <linux/inet.h>
+#include <linux/udp.h>
+#include <linux/tcp.h>
+#include <linux/unistd.h>
+#include <linux/slab.h>
+#include <linux/netdevice.h>
+#include <linux/skbuff.h>
+#include <linux/file.h>
+#include <linux/freezer.h>
+#include <net/sock.h>
+#include <net/checksum.h>
+#include <net/ip.h>
+#include <net/ipv6.h>
+#include <net/tcp_states.h>
+#include <linux/uaccess.h>
+#include <asm/ioctls.h>
+
+#include <linux/sunrpc/types.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/svc_xprt.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
+static int svc_deferred_recv(struct svc_rqst *rqstp);
+static struct cache_deferred_req *svc_defer(struct cache_req *req);
+static void svc_age_temp_xprts(unsigned long closure);
+
+/* apparently the "standard" is that clients close
+ * idle connections after 5 minutes, servers after
+ * 6 minutes
+ *   http://www.connectathon.org/talks96/nfstcp.pdf
+ */
+static int svc_conn_age_period = 6*60;
+
+/* List of registered transport classes */
+static DEFINE_SPINLOCK(svc_xprt_class_lock);
+static LIST_HEAD(svc_xprt_class_list);
+
+/* SMP locking strategy:
+ *
+ *	svc_pool->sp_lock protects most of the fields of that pool.
+ *	svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
+ *	when both need to be taken (rare), svc_serv->sv_lock is first.
+ *	BKL protects svc_serv->sv_nrthread.
+ *	svc_sock->sk_lock protects the svc_sock->sk_deferred list
+ *             and the ->sk_info_authunix cache.
+ *
+ *	The XPT_BUSY bit in xprt->xpt_flags prevents a transport being
+ *	enqueued multiply. During normal transport processing this bit
+ *	is set by svc_xprt_enqueue and cleared by svc_xprt_received.
+ *	Providers should not manipulate this bit directly.
+ *
+ *	Some flags can be set to certain values at any time
+ *	providing that certain rules are followed:
+ *
+ *	XPT_CONN, XPT_DATA:
+ *		- Can be set or cleared at any time.
+ *		- After a set, svc_xprt_enqueue must be called to enqueue
+ *		  the transport for processing.
+ *		- After a clear, the transport must be read/accepted.
+ *		  If this succeeds, it must be set again.
+ *	XPT_CLOSE:
+ *		- Can set at any time. It is never cleared.
+ *      XPT_DEAD:
+ *		- Can only be set while XPT_BUSY is held which ensures
+ *		  that no other thread will be using the transport or will
+ *		  try to set XPT_DEAD.
+ */
+
+int svc_reg_xprt_class(struct svc_xprt_class *xcl)
+{
+	struct svc_xprt_class *cl;
+	int res = -EEXIST;
+
+	dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name);
+
+	INIT_LIST_HEAD(&xcl->xcl_list);
+	spin_lock(&svc_xprt_class_lock);
+	/* Make sure there isn't already a class with the same name */
+	list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) {
+		if (strcmp(xcl->xcl_name, cl->xcl_name) == 0)
+			goto out;
+	}
+	list_add_tail(&xcl->xcl_list, &svc_xprt_class_list);
+	res = 0;
+out:
+	spin_unlock(&svc_xprt_class_lock);
+	return res;
+}
+EXPORT_SYMBOL_GPL(svc_reg_xprt_class);
+
+void svc_unreg_xprt_class(struct svc_xprt_class *xcl)
+{
+	dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name);
+	spin_lock(&svc_xprt_class_lock);
+	list_del_init(&xcl->xcl_list);
+	spin_unlock(&svc_xprt_class_lock);
+}
+EXPORT_SYMBOL_GPL(svc_unreg_xprt_class);
+
+/*
+ * Format the transport list for printing
+ */
+int svc_print_xprts(char *buf, int maxlen)
+{
+	struct list_head *le;
+	char tmpstr[80];
+	int len = 0;
+	buf[0] = '\0';
+
+	spin_lock(&svc_xprt_class_lock);
+	list_for_each(le, &svc_xprt_class_list) {
+		int slen;
+		struct svc_xprt_class *xcl =
+			list_entry(le, struct svc_xprt_class, xcl_list);
+
+		sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload);
+		slen = strlen(tmpstr);
+		if (len + slen > maxlen)
+			break;
+		len += slen;
+		strcat(buf, tmpstr);
+	}
+	spin_unlock(&svc_xprt_class_lock);
+
+	return len;
+}
+
+static void svc_xprt_free(struct kref *kref)
+{
+	struct svc_xprt *xprt =
+		container_of(kref, struct svc_xprt, xpt_ref);
+	struct module *owner = xprt->xpt_class->xcl_owner;
+	if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)
+	    && xprt->xpt_auth_cache != NULL)
+		svcauth_unix_info_release(xprt->xpt_auth_cache);
+	xprt->xpt_ops->xpo_free(xprt);
+	module_put(owner);
+}
+
+void svc_xprt_put(struct svc_xprt *xprt)
+{
+	kref_put(&xprt->xpt_ref, svc_xprt_free);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_put);
+
+/*
+ * Called by transport drivers to initialize the transport independent
+ * portion of the transport instance.
+ */
+void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt,
+		   struct svc_serv *serv)
+{
+	memset(xprt, 0, sizeof(*xprt));
+	xprt->xpt_class = xcl;
+	xprt->xpt_ops = xcl->xcl_ops;
+	kref_init(&xprt->xpt_ref);
+	xprt->xpt_server = serv;
+	INIT_LIST_HEAD(&xprt->xpt_list);
+	INIT_LIST_HEAD(&xprt->xpt_ready);
+	INIT_LIST_HEAD(&xprt->xpt_deferred);
+	mutex_init(&xprt->xpt_mutex);
+	spin_lock_init(&xprt->xpt_lock);
+	set_bit(XPT_BUSY, &xprt->xpt_flags);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_init);
+
+int svc_create_xprt(struct svc_serv *serv, char *xprt_name, unsigned short port,
+		    int flags)
+{
+	struct svc_xprt_class *xcl;
+	struct sockaddr_in sin = {
+		.sin_family		= AF_INET,
+		.sin_addr.s_addr	= INADDR_ANY,
+		.sin_port		= htons(port),
+	};
+	dprintk("svc: creating transport %s[%d]\n", xprt_name, port);
+	spin_lock(&svc_xprt_class_lock);
+	list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) {
+		struct svc_xprt *newxprt;
+
+		if (strcmp(xprt_name, xcl->xcl_name))
+			continue;
+
+		if (!try_module_get(xcl->xcl_owner))
+			goto err;
+
+		spin_unlock(&svc_xprt_class_lock);
+		newxprt = xcl->xcl_ops->
+			xpo_create(serv, (struct sockaddr *)&sin, sizeof(sin),
+				   flags);
+		if (IS_ERR(newxprt)) {
+			module_put(xcl->xcl_owner);
+			return PTR_ERR(newxprt);
+		}
+
+		clear_bit(XPT_TEMP, &newxprt->xpt_flags);
+		spin_lock_bh(&serv->sv_lock);
+		list_add(&newxprt->xpt_list, &serv->sv_permsocks);
+		spin_unlock_bh(&serv->sv_lock);
+		clear_bit(XPT_BUSY, &newxprt->xpt_flags);
+		return svc_xprt_local_port(newxprt);
+	}
+ err:
+	spin_unlock(&svc_xprt_class_lock);
+	dprintk("svc: transport %s not found\n", xprt_name);
+	return -ENOENT;
+}
+EXPORT_SYMBOL_GPL(svc_create_xprt);
+
+/*
+ * Copy the local and remote xprt addresses to the rqstp structure
+ */
+void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+	struct sockaddr *sin;
+
+	memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen);
+	rqstp->rq_addrlen = xprt->xpt_remotelen;
+
+	/*
+	 * Destination address in request is needed for binding the
+	 * source address in RPC replies/callbacks later.
+	 */
+	sin = (struct sockaddr *)&xprt->xpt_local;
+	switch (sin->sa_family) {
+	case AF_INET:
+		rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr;
+		break;
+	case AF_INET6:
+		rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr;
+		break;
+	}
+}
+EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs);
+
+/**
+ * svc_print_addr - Format rq_addr field for printing
+ * @rqstp: svc_rqst struct containing address to print
+ * @buf: target buffer for formatted address
+ * @len: length of target buffer
+ *
+ */
+char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
+{
+	return __svc_print_addr(svc_addr(rqstp), buf, len);
+}
+EXPORT_SYMBOL_GPL(svc_print_addr);
+
+/*
+ * Queue up an idle server thread.  Must have pool->sp_lock held.
+ * Note: this is really a stack rather than a queue, so that we only
+ * use as many different threads as we need, and the rest don't pollute
+ * the cache.
+ */
+static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
+{
+	list_add(&rqstp->rq_list, &pool->sp_threads);
+}
+
+/*
+ * Dequeue an nfsd thread.  Must have pool->sp_lock held.
+ */
+static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
+{
+	list_del(&rqstp->rq_list);
+}
+
+/*
+ * Queue up a transport with data pending. If there are idle nfsd
+ * processes, wake 'em up.
+ *
+ */
+void svc_xprt_enqueue(struct svc_xprt *xprt)
+{
+	struct svc_serv	*serv = xprt->xpt_server;
+	struct svc_pool *pool;
+	struct svc_rqst	*rqstp;
+	int cpu;
+
+	if (!(xprt->xpt_flags &
+	      ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
+		return;
+	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
+		return;
+
+	cpu = get_cpu();
+	pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
+	put_cpu();
+
+	spin_lock_bh(&pool->sp_lock);
+
+	if (!list_empty(&pool->sp_threads) &&
+	    !list_empty(&pool->sp_sockets))
+		printk(KERN_ERR
+		       "svc_xprt_enqueue: "
+		       "threads and transports both waiting??\n");
+
+	if (test_bit(XPT_DEAD, &xprt->xpt_flags)) {
+		/* Don't enqueue dead transports */
+		dprintk("svc: transport %p is dead, not enqueued\n", xprt);
+		goto out_unlock;
+	}
+
+	/* Mark transport as busy. It will remain in this state until
+	 * the provider calls svc_xprt_received. We update XPT_BUSY
+	 * atomically because it also guards against trying to enqueue
+	 * the transport twice.
+	 */
+	if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) {
+		/* Don't enqueue transport while already enqueued */
+		dprintk("svc: transport %p busy, not enqueued\n", xprt);
+		goto out_unlock;
+	}
+	BUG_ON(xprt->xpt_pool != NULL);
+	xprt->xpt_pool = pool;
+
+	/* Handle pending connection */
+	if (test_bit(XPT_CONN, &xprt->xpt_flags))
+		goto process;
+
+	/* Handle close in-progress */
+	if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
+		goto process;
+
+	/* Check if we have space to reply to a request */
+	if (!xprt->xpt_ops->xpo_has_wspace(xprt)) {
+		/* Don't enqueue while not enough space for reply */
+		dprintk("svc: no write space, transport %p  not enqueued\n",
+			xprt);
+		xprt->xpt_pool = NULL;
+		clear_bit(XPT_BUSY, &xprt->xpt_flags);
+		goto out_unlock;
+	}
+
+ process:
+	if (!list_empty(&pool->sp_threads)) {
+		rqstp = list_entry(pool->sp_threads.next,
+				   struct svc_rqst,
+				   rq_list);
+		dprintk("svc: transport %p served by daemon %p\n",
+			xprt, rqstp);
+		svc_thread_dequeue(pool, rqstp);
+		if (rqstp->rq_xprt)
+			printk(KERN_ERR
+				"svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
+				rqstp, rqstp->rq_xprt);
+		rqstp->rq_xprt = xprt;
+		svc_xprt_get(xprt);
+		rqstp->rq_reserved = serv->sv_max_mesg;
+		atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
+		BUG_ON(xprt->xpt_pool != pool);
+		wake_up(&rqstp->rq_wait);
+	} else {
+		dprintk("svc: transport %p put into queue\n", xprt);
+		list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
+		BUG_ON(xprt->xpt_pool != pool);
+	}
+
+out_unlock:
+	spin_unlock_bh(&pool->sp_lock);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
+
+/*
+ * Dequeue the first transport.  Must be called with the pool->sp_lock held.
+ */
+static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
+{
+	struct svc_xprt	*xprt;
+
+	if (list_empty(&pool->sp_sockets))
+		return NULL;
+
+	xprt = list_entry(pool->sp_sockets.next,
+			  struct svc_xprt, xpt_ready);
+	list_del_init(&xprt->xpt_ready);
+
+	dprintk("svc: transport %p dequeued, inuse=%d\n",
+		xprt, atomic_read(&xprt->xpt_ref.refcount));
+
+	return xprt;
+}
+
+/*
+ * svc_xprt_received conditionally queues the transport for processing
+ * by another thread. The caller must hold the XPT_BUSY bit and must
+ * not thereafter touch transport data.
+ *
+ * Note: XPT_DATA only gets cleared when a read-attempt finds no (or
+ * insufficient) data.
+ */
+void svc_xprt_received(struct svc_xprt *xprt)
+{
+	BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags));
+	xprt->xpt_pool = NULL;
+	clear_bit(XPT_BUSY, &xprt->xpt_flags);
+	svc_xprt_enqueue(xprt);
+}
+EXPORT_SYMBOL_GPL(svc_xprt_received);
+
+/**
+ * svc_reserve - change the space reserved for the reply to a request.
+ * @rqstp:  The request in question
+ * @space: new max space to reserve
+ *
+ * Each request reserves some space on the output queue of the transport
+ * to make sure the reply fits.  This function reduces that reserved
+ * space to be the amount of space used already, plus @space.
+ *
+ */
+void svc_reserve(struct svc_rqst *rqstp, int space)
+{
+	space += rqstp->rq_res.head[0].iov_len;
+
+	if (space < rqstp->rq_reserved) {
+		struct svc_xprt *xprt = rqstp->rq_xprt;
+		atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved);
+		rqstp->rq_reserved = space;
+
+		svc_xprt_enqueue(xprt);
+	}
+}
+EXPORT_SYMBOL(svc_reserve);
+
+static void svc_xprt_release(struct svc_rqst *rqstp)
+{
+	struct svc_xprt	*xprt = rqstp->rq_xprt;
+
+	rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
+
+	svc_free_res_pages(rqstp);
+	rqstp->rq_res.page_len = 0;
+	rqstp->rq_res.page_base = 0;
+
+	/* Reset response buffer and release
+	 * the reservation.
+	 * But first, check that enough space was reserved
+	 * for the reply, otherwise we have a bug!
+	 */
+	if ((rqstp->rq_res.len) >  rqstp->rq_reserved)
+		printk(KERN_ERR "RPC request reserved %d but used %d\n",
+		       rqstp->rq_reserved,
+		       rqstp->rq_res.len);
+
+	rqstp->rq_res.head[0].iov_len = 0;
+	svc_reserve(rqstp, 0);
+	rqstp->rq_xprt = NULL;
+
+	svc_xprt_put(xprt);
+}
+
+/*
+ * External function to wake up a server waiting for data
+ * This really only makes sense for services like lockd
+ * which have exactly one thread anyway.
+ */
+void svc_wake_up(struct svc_serv *serv)
+{
+	struct svc_rqst	*rqstp;
+	unsigned int i;
+	struct svc_pool *pool;
+
+	for (i = 0; i < serv->sv_nrpools; i++) {
+		pool = &serv->sv_pools[i];
+
+		spin_lock_bh(&pool->sp_lock);
+		if (!list_empty(&pool->sp_threads)) {
+			rqstp = list_entry(pool->sp_threads.next,
+					   struct svc_rqst,
+					   rq_list);
+			dprintk("svc: daemon %p woken up.\n", rqstp);
+			/*
+			svc_thread_dequeue(pool, rqstp);
+			rqstp->rq_xprt = NULL;
+			 */
+			wake_up(&rqstp->rq_wait);
+		}
+		spin_unlock_bh(&pool->sp_lock);
+	}
+}
+EXPORT_SYMBOL(svc_wake_up);
+
+int svc_port_is_privileged(struct sockaddr *sin)
+{
+	switch (sin->sa_family) {
+	case AF_INET:
+		return ntohs(((struct sockaddr_in *)sin)->sin_port)
+			< PROT_SOCK;
+	case AF_INET6:
+		return ntohs(((struct sockaddr_in6 *)sin)->sin6_port)
+			< PROT_SOCK;
+	default:
+		return 0;
+	}
+}
+
+/*
+ * Make sure that we don't have too many active connections.  If we
+ * have, something must be dropped.
+ *
+ * There's no point in trying to do random drop here for DoS
+ * prevention. The NFS clients does 1 reconnect in 15 seconds. An
+ * attacker can easily beat that.
+ *
+ * The only somewhat efficient mechanism would be if drop old
+ * connections from the same IP first. But right now we don't even
+ * record the client IP in svc_sock.
+ */
+static void svc_check_conn_limits(struct svc_serv *serv)
+{
+	if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) {
+		struct svc_xprt *xprt = NULL;
+		spin_lock_bh(&serv->sv_lock);
+		if (!list_empty(&serv->sv_tempsocks)) {
+			if (net_ratelimit()) {
+				/* Try to help the admin */
+				printk(KERN_NOTICE "%s: too many open  "
+				       "connections, consider increasing the "
+				       "number of nfsd threads\n",
+				       serv->sv_name);
+			}
+			/*
+			 * Always select the oldest connection. It's not fair,
+			 * but so is life
+			 */
+			xprt = list_entry(serv->sv_tempsocks.prev,
+					  struct svc_xprt,
+					  xpt_list);
+			set_bit(XPT_CLOSE, &xprt->xpt_flags);
+			svc_xprt_get(xprt);
+		}
+		spin_unlock_bh(&serv->sv_lock);
+
+		if (xprt) {
+			svc_xprt_enqueue(xprt);
+			svc_xprt_put(xprt);
+		}
+	}
+}
+
+/*
+ * Receive the next request on any transport.  This code is carefully
+ * organised not to touch any cachelines in the shared svc_serv
+ * structure, only cachelines in the local svc_pool.
+ */
+int svc_recv(struct svc_rqst *rqstp, long timeout)
+{
+	struct svc_xprt		*xprt = NULL;
+	struct svc_serv		*serv = rqstp->rq_server;
+	struct svc_pool		*pool = rqstp->rq_pool;
+	int			len, i;
+	int			pages;
+	struct xdr_buf		*arg;
+	DECLARE_WAITQUEUE(wait, current);
+
+	dprintk("svc: server %p waiting for data (to = %ld)\n",
+		rqstp, timeout);
+
+	if (rqstp->rq_xprt)
+		printk(KERN_ERR
+			"svc_recv: service %p, transport not NULL!\n",
+			 rqstp);
+	if (waitqueue_active(&rqstp->rq_wait))
+		printk(KERN_ERR
+			"svc_recv: service %p, wait queue active!\n",
+			 rqstp);
+
+	/* now allocate needed pages.  If we get a failure, sleep briefly */
+	pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
+	for (i = 0; i < pages ; i++)
+		while (rqstp->rq_pages[i] == NULL) {
+			struct page *p = alloc_page(GFP_KERNEL);
+			if (!p) {
+				int j = msecs_to_jiffies(500);
+				schedule_timeout_uninterruptible(j);
+			}
+			rqstp->rq_pages[i] = p;
+		}
+	rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
+	BUG_ON(pages >= RPCSVC_MAXPAGES);
+
+	/* Make arg->head point to first page and arg->pages point to rest */
+	arg = &rqstp->rq_arg;
+	arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
+	arg->head[0].iov_len = PAGE_SIZE;
+	arg->pages = rqstp->rq_pages + 1;
+	arg->page_base = 0;
+	/* save at least one page for response */
+	arg->page_len = (pages-2)*PAGE_SIZE;
+	arg->len = (pages-1)*PAGE_SIZE;
+	arg->tail[0].iov_len = 0;
+
+	try_to_freeze();
+	cond_resched();
+	if (signalled())
+		return -EINTR;
+
+	spin_lock_bh(&pool->sp_lock);
+	xprt = svc_xprt_dequeue(pool);
+	if (xprt) {
+		rqstp->rq_xprt = xprt;
+		svc_xprt_get(xprt);
+		rqstp->rq_reserved = serv->sv_max_mesg;
+		atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
+	} else {
+		/* No data pending. Go to sleep */
+		svc_thread_enqueue(pool, rqstp);
+
+		/*
+		 * We have to be able to interrupt this wait
+		 * to bring down the daemons ...
+		 */
+		set_current_state(TASK_INTERRUPTIBLE);
+		add_wait_queue(&rqstp->rq_wait, &wait);
+		spin_unlock_bh(&pool->sp_lock);
+
+		schedule_timeout(timeout);
+
+		try_to_freeze();
+
+		spin_lock_bh(&pool->sp_lock);
+		remove_wait_queue(&rqstp->rq_wait, &wait);
+
+		xprt = rqstp->rq_xprt;
+		if (!xprt) {
+			svc_thread_dequeue(pool, rqstp);
+			spin_unlock_bh(&pool->sp_lock);
+			dprintk("svc: server %p, no data yet\n", rqstp);
+			return signalled()? -EINTR : -EAGAIN;
+		}
+	}
+	spin_unlock_bh(&pool->sp_lock);
+
+	len = 0;
+	if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
+		dprintk("svc_recv: found XPT_CLOSE\n");
+		svc_delete_xprt(xprt);
+	} else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
+		struct svc_xprt *newxpt;
+		newxpt = xprt->xpt_ops->xpo_accept(xprt);
+		if (newxpt) {
+			/*
+			 * We know this module_get will succeed because the
+			 * listener holds a reference too
+			 */
+			__module_get(newxpt->xpt_class->xcl_owner);
+			svc_check_conn_limits(xprt->xpt_server);
+			spin_lock_bh(&serv->sv_lock);
+			set_bit(XPT_TEMP, &newxpt->xpt_flags);
+			list_add(&newxpt->xpt_list, &serv->sv_tempsocks);
+			serv->sv_tmpcnt++;
+			if (serv->sv_temptimer.function == NULL) {
+				/* setup timer to age temp transports */
+				setup_timer(&serv->sv_temptimer,
+					    svc_age_temp_xprts,
+					    (unsigned long)serv);
+				mod_timer(&serv->sv_temptimer,
+					  jiffies + svc_conn_age_period * HZ);
+			}
+			spin_unlock_bh(&serv->sv_lock);
+			svc_xprt_received(newxpt);
+		}
+		svc_xprt_received(xprt);
+	} else {
+		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
+			rqstp, pool->sp_id, xprt,
+			atomic_read(&xprt->xpt_ref.refcount));
+		rqstp->rq_deferred = svc_deferred_dequeue(xprt);
+		if (rqstp->rq_deferred) {
+			svc_xprt_received(xprt);
+			len = svc_deferred_recv(rqstp);
+		} else
+			len = xprt->xpt_ops->xpo_recvfrom(rqstp);
+		dprintk("svc: got len=%d\n", len);
+	}
+
+	/* No data, incomplete (TCP) read, or accept() */
+	if (len == 0 || len == -EAGAIN) {
+		rqstp->rq_res.len = 0;
+		svc_xprt_release(rqstp);
+		return -EAGAIN;
+	}
+	clear_bit(XPT_OLD, &xprt->xpt_flags);
+
+	rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
+	rqstp->rq_chandle.defer = svc_defer;
+
+	if (serv->sv_stats)
+		serv->sv_stats->netcnt++;
+	return len;
+}
+EXPORT_SYMBOL(svc_recv);
+
+/*
+ * Drop request
+ */
+void svc_drop(struct svc_rqst *rqstp)
+{
+	dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt);
+	svc_xprt_release(rqstp);
+}
+EXPORT_SYMBOL(svc_drop);
+
+/*
+ * Return reply to client.
+ */
+int svc_send(struct svc_rqst *rqstp)
+{
+	struct svc_xprt	*xprt;
+	int		len;
+	struct xdr_buf	*xb;
+
+	xprt = rqstp->rq_xprt;
+	if (!xprt)
+		return -EFAULT;
+
+	/* release the receive skb before sending the reply */
+	rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp);
+
+	/* calculate over-all length */
+	xb = &rqstp->rq_res;
+	xb->len = xb->head[0].iov_len +
+		xb->page_len +
+		xb->tail[0].iov_len;
+
+	/* Grab mutex to serialize outgoing data. */
+	mutex_lock(&xprt->xpt_mutex);
+	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
+		len = -ENOTCONN;
+	else
+		len = xprt->xpt_ops->xpo_sendto(rqstp);
+	mutex_unlock(&xprt->xpt_mutex);
+	svc_xprt_release(rqstp);
+
+	if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
+		return 0;
+	return len;
+}
+
+/*
+ * Timer function to close old temporary transports, using
+ * a mark-and-sweep algorithm.
+ */
+static void svc_age_temp_xprts(unsigned long closure)
+{
+	struct svc_serv *serv = (struct svc_serv *)closure;
+	struct svc_xprt *xprt;
+	struct list_head *le, *next;
+	LIST_HEAD(to_be_aged);
+
+	dprintk("svc_age_temp_xprts\n");
+
+	if (!spin_trylock_bh(&serv->sv_lock)) {
+		/* busy, try again 1 sec later */
+		dprintk("svc_age_temp_xprts: busy\n");
+		mod_timer(&serv->sv_temptimer, jiffies + HZ);
+		return;
+	}
+
+	list_for_each_safe(le, next, &serv->sv_tempsocks) {
+		xprt = list_entry(le, struct svc_xprt, xpt_list);
+
+		/* First time through, just mark it OLD. Second time
+		 * through, close it. */
+		if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags))
+			continue;
+		if (atomic_read(&xprt->xpt_ref.refcount) > 1
+		    || test_bit(XPT_BUSY, &xprt->xpt_flags))
+			continue;
+		svc_xprt_get(xprt);
+		list_move(le, &to_be_aged);
+		set_bit(XPT_CLOSE, &xprt->xpt_flags);
+		set_bit(XPT_DETACHED, &xprt->xpt_flags);
+	}
+	spin_unlock_bh(&serv->sv_lock);
+
+	while (!list_empty(&to_be_aged)) {
+		le = to_be_aged.next;
+		/* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */
+		list_del_init(le);
+		xprt = list_entry(le, struct svc_xprt, xpt_list);
+
+		dprintk("queuing xprt %p for closing\n", xprt);
+
+		/* a thread will dequeue and close it soon */
+		svc_xprt_enqueue(xprt);
+		svc_xprt_put(xprt);
+	}
+
+	mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);
+}
+
+/*
+ * Remove a dead transport
+ */
+void svc_delete_xprt(struct svc_xprt *xprt)
+{
+	struct svc_serv	*serv = xprt->xpt_server;
+
+	dprintk("svc: svc_delete_xprt(%p)\n", xprt);
+	xprt->xpt_ops->xpo_detach(xprt);
+
+	spin_lock_bh(&serv->sv_lock);
+	if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags))
+		list_del_init(&xprt->xpt_list);
+	/*
+	 * We used to delete the transport from whichever list
+	 * it's sk_xprt.xpt_ready node was on, but we don't actually
+	 * need to.  This is because the only time we're called
+	 * while still attached to a queue, the queue itself
+	 * is about to be destroyed (in svc_destroy).
+	 */
+	if (!test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) {
+		BUG_ON(atomic_read(&xprt->xpt_ref.refcount) < 2);
+		if (test_bit(XPT_TEMP, &xprt->xpt_flags))
+			serv->sv_tmpcnt--;
+		svc_xprt_put(xprt);
+	}
+	spin_unlock_bh(&serv->sv_lock);
+}
+
+void svc_close_xprt(struct svc_xprt *xprt)
+{
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+	if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags))
+		/* someone else will have to effect the close */
+		return;
+
+	svc_xprt_get(xprt);
+	svc_delete_xprt(xprt);
+	clear_bit(XPT_BUSY, &xprt->xpt_flags);
+	svc_xprt_put(xprt);
+}
+EXPORT_SYMBOL_GPL(svc_close_xprt);
+
+void svc_close_all(struct list_head *xprt_list)
+{
+	struct svc_xprt *xprt;
+	struct svc_xprt *tmp;
+
+	list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) {
+		set_bit(XPT_CLOSE, &xprt->xpt_flags);
+		if (test_bit(XPT_BUSY, &xprt->xpt_flags)) {
+			/* Waiting to be processed, but no threads left,
+			 * So just remove it from the waiting list
+			 */
+			list_del_init(&xprt->xpt_ready);
+			clear_bit(XPT_BUSY, &xprt->xpt_flags);
+		}
+		svc_close_xprt(xprt);
+	}
+}
+
+/*
+ * Handle defer and revisit of requests
+ */
+
+static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
+{
+	struct svc_deferred_req *dr =
+		container_of(dreq, struct svc_deferred_req, handle);
+	struct svc_xprt *xprt = dr->xprt;
+
+	if (too_many) {
+		svc_xprt_put(xprt);
+		kfree(dr);
+		return;
+	}
+	dprintk("revisit queued\n");
+	dr->xprt = NULL;
+	spin_lock(&xprt->xpt_lock);
+	list_add(&dr->handle.recent, &xprt->xpt_deferred);
+	spin_unlock(&xprt->xpt_lock);
+	set_bit(XPT_DEFERRED, &xprt->xpt_flags);
+	svc_xprt_enqueue(xprt);
+	svc_xprt_put(xprt);
+}
+
+/*
+ * Save the request off for later processing. The request buffer looks
+ * like this:
+ *
+ * <xprt-header><rpc-header><rpc-pagelist><rpc-tail>
+ *
+ * This code can only handle requests that consist of an xprt-header
+ * and rpc-header.
+ */
+static struct cache_deferred_req *svc_defer(struct cache_req *req)
+{
+	struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
+	struct svc_deferred_req *dr;
+
+	if (rqstp->rq_arg.page_len)
+		return NULL; /* if more than a page, give up FIXME */
+	if (rqstp->rq_deferred) {
+		dr = rqstp->rq_deferred;
+		rqstp->rq_deferred = NULL;
+	} else {
+		size_t skip;
+		size_t size;
+		/* FIXME maybe discard if size too large */
+		size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len;
+		dr = kmalloc(size, GFP_KERNEL);
+		if (dr == NULL)
+			return NULL;
+
+		dr->handle.owner = rqstp->rq_server;
+		dr->prot = rqstp->rq_prot;
+		memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen);
+		dr->addrlen = rqstp->rq_addrlen;
+		dr->daddr = rqstp->rq_daddr;
+		dr->argslen = rqstp->rq_arg.len >> 2;
+		dr->xprt_hlen = rqstp->rq_xprt_hlen;
+
+		/* back up head to the start of the buffer and copy */
+		skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
+		memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip,
+		       dr->argslen << 2);
+	}
+	svc_xprt_get(rqstp->rq_xprt);
+	dr->xprt = rqstp->rq_xprt;
+
+	dr->handle.revisit = svc_revisit;
+	return &dr->handle;
+}
+
+/*
+ * recv data from a deferred request into an active one
+ */
+static int svc_deferred_recv(struct svc_rqst *rqstp)
+{
+	struct svc_deferred_req *dr = rqstp->rq_deferred;
+
+	/* setup iov_base past transport header */
+	rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2);
+	/* The iov_len does not include the transport header bytes */
+	rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen;
+	rqstp->rq_arg.page_len = 0;
+	/* The rq_arg.len includes the transport header bytes */
+	rqstp->rq_arg.len     = dr->argslen<<2;
+	rqstp->rq_prot        = dr->prot;
+	memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
+	rqstp->rq_addrlen     = dr->addrlen;
+	/* Save off transport header len in case we get deferred again */
+	rqstp->rq_xprt_hlen   = dr->xprt_hlen;
+	rqstp->rq_daddr       = dr->daddr;
+	rqstp->rq_respages    = rqstp->rq_pages;
+	return (dr->argslen<<2) - dr->xprt_hlen;
+}
+
+
+static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt)
+{
+	struct svc_deferred_req *dr = NULL;
+
+	if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags))
+		return NULL;
+	spin_lock(&xprt->xpt_lock);
+	clear_bit(XPT_DEFERRED, &xprt->xpt_flags);
+	if (!list_empty(&xprt->xpt_deferred)) {
+		dr = list_entry(xprt->xpt_deferred.next,
+				struct svc_deferred_req,
+				handle.recent);
+		list_del_init(&dr->handle.recent);
+		set_bit(XPT_DEFERRED, &xprt->xpt_flags);
+	}
+	spin_unlock(&xprt->xpt_lock);
+	return dr;
+}
+
+/*
+ * Return the transport instance pointer for the endpoint accepting
+ * connections/peer traffic from the specified transport class,
+ * address family and port.
+ *
+ * Specifying 0 for the address family or port is effectively a
+ * wild-card, and will result in matching the first transport in the
+ * service's list that has a matching class name.
+ */
+struct svc_xprt *svc_find_xprt(struct svc_serv *serv, char *xcl_name,
+			       int af, int port)
+{
+	struct svc_xprt *xprt;
+	struct svc_xprt *found = NULL;
+
+	/* Sanity check the args */
+	if (!serv || !xcl_name)
+		return found;
+
+	spin_lock_bh(&serv->sv_lock);
+	list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
+		if (strcmp(xprt->xpt_class->xcl_name, xcl_name))
+			continue;
+		if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family)
+			continue;
+		if (port && port != svc_xprt_local_port(xprt))
+			continue;
+		found = xprt;
+		svc_xprt_get(xprt);
+		break;
+	}
+	spin_unlock_bh(&serv->sv_lock);
+	return found;
+}
+EXPORT_SYMBOL_GPL(svc_find_xprt);
+
+/*
+ * Format a buffer with a list of the active transports. A zero for
+ * the buflen parameter disables target buffer overflow checking.
+ */
+int svc_xprt_names(struct svc_serv *serv, char *buf, int buflen)
+{
+	struct svc_xprt *xprt;
+	char xprt_str[64];
+	int totlen = 0;
+	int len;
+
+	/* Sanity check args */
+	if (!serv)
+		return 0;
+
+	spin_lock_bh(&serv->sv_lock);
+	list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) {
+		len = snprintf(xprt_str, sizeof(xprt_str),
+			       "%s %d\n", xprt->xpt_class->xcl_name,
+			       svc_xprt_local_port(xprt));
+		/* If the string was truncated, replace with error string */
+		if (len >= sizeof(xprt_str))
+			strcpy(xprt_str, "name-too-long\n");
+		/* Don't overflow buffer */
+		len = strlen(xprt_str);
+		if (buflen && (len + totlen >= buflen))
+			break;
+		strcpy(buf+totlen, xprt_str);
+		totlen += len;
+	}
+	spin_unlock_bh(&serv->sv_lock);
+	return totlen;
+}
+EXPORT_SYMBOL_GPL(svc_xprt_names);
diff --git a/net/sunrpc/svcauth.c b/net/sunrpc/svcauth.c
index af7c5f05c6e1..8a73cbb16052 100644
--- a/net/sunrpc/svcauth.c
+++ b/net/sunrpc/svcauth.c
@@ -57,11 +57,13 @@ svc_authenticate(struct svc_rqst *rqstp, __be32 *authp)
 	rqstp->rq_authop = aops;
 	return aops->accept(rqstp, authp);
 }
+EXPORT_SYMBOL(svc_authenticate);
 
 int svc_set_client(struct svc_rqst *rqstp)
 {
 	return rqstp->rq_authop->set_client(rqstp);
 }
+EXPORT_SYMBOL(svc_set_client);
 
 /* A request, which was authenticated, has now executed.
  * Time to finalise the credentials and verifier
@@ -93,6 +95,7 @@ svc_auth_register(rpc_authflavor_t flavor, struct auth_ops *aops)
 	spin_unlock(&authtab_lock);
 	return rv;
 }
+EXPORT_SYMBOL(svc_auth_register);
 
 void
 svc_auth_unregister(rpc_authflavor_t flavor)
@@ -129,6 +132,7 @@ void auth_domain_put(struct auth_domain *dom)
 		spin_unlock(&auth_domain_lock);
 	}
 }
+EXPORT_SYMBOL(auth_domain_put);
 
 struct auth_domain *
 auth_domain_lookup(char *name, struct auth_domain *new)
@@ -153,8 +157,10 @@ auth_domain_lookup(char *name, struct auth_domain *new)
 	spin_unlock(&auth_domain_lock);
 	return new;
 }
+EXPORT_SYMBOL(auth_domain_lookup);
 
 struct auth_domain *auth_domain_find(char *name)
 {
 	return auth_domain_lookup(name, NULL);
 }
+EXPORT_SYMBOL(auth_domain_find);
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 411479411b21..3c64051e4555 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -63,6 +63,7 @@ struct auth_domain *unix_domain_find(char *name)
 		rv = auth_domain_lookup(name, &new->h);
 	}
 }
+EXPORT_SYMBOL(unix_domain_find);
 
 static void svcauth_unix_domain_release(struct auth_domain *dom)
 {
@@ -340,6 +341,7 @@ int auth_unix_add_addr(struct in_addr addr, struct auth_domain *dom)
 	else
 		return -ENOMEM;
 }
+EXPORT_SYMBOL(auth_unix_add_addr);
 
 int auth_unix_forget_old(struct auth_domain *dom)
 {
@@ -351,6 +353,7 @@ int auth_unix_forget_old(struct auth_domain *dom)
 	udom->addr_changes++;
 	return 0;
 }
+EXPORT_SYMBOL(auth_unix_forget_old);
 
 struct auth_domain *auth_unix_lookup(struct in_addr addr)
 {
@@ -375,50 +378,56 @@ struct auth_domain *auth_unix_lookup(struct in_addr addr)
 	cache_put(&ipm->h, &ip_map_cache);
 	return rv;
 }
+EXPORT_SYMBOL(auth_unix_lookup);
 
 void svcauth_unix_purge(void)
 {
 	cache_purge(&ip_map_cache);
 }
+EXPORT_SYMBOL(svcauth_unix_purge);
 
 static inline struct ip_map *
 ip_map_cached_get(struct svc_rqst *rqstp)
 {
-	struct ip_map *ipm;
-	struct svc_sock *svsk = rqstp->rq_sock;
-	spin_lock(&svsk->sk_lock);
-	ipm = svsk->sk_info_authunix;
-	if (ipm != NULL) {
-		if (!cache_valid(&ipm->h)) {
-			/*
-			 * The entry has been invalidated since it was
-			 * remembered, e.g. by a second mount from the
-			 * same IP address.
-			 */
-			svsk->sk_info_authunix = NULL;
-			spin_unlock(&svsk->sk_lock);
-			cache_put(&ipm->h, &ip_map_cache);
-			return NULL;
+	struct ip_map *ipm = NULL;
+	struct svc_xprt *xprt = rqstp->rq_xprt;
+
+	if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) {
+		spin_lock(&xprt->xpt_lock);
+		ipm = xprt->xpt_auth_cache;
+		if (ipm != NULL) {
+			if (!cache_valid(&ipm->h)) {
+				/*
+				 * The entry has been invalidated since it was
+				 * remembered, e.g. by a second mount from the
+				 * same IP address.
+				 */
+				xprt->xpt_auth_cache = NULL;
+				spin_unlock(&xprt->xpt_lock);
+				cache_put(&ipm->h, &ip_map_cache);
+				return NULL;
+			}
+			cache_get(&ipm->h);
 		}
-		cache_get(&ipm->h);
+		spin_unlock(&xprt->xpt_lock);
 	}
-	spin_unlock(&svsk->sk_lock);
 	return ipm;
 }
 
 static inline void
 ip_map_cached_put(struct svc_rqst *rqstp, struct ip_map *ipm)
 {
-	struct svc_sock *svsk = rqstp->rq_sock;
+	struct svc_xprt *xprt = rqstp->rq_xprt;
 
-	spin_lock(&svsk->sk_lock);
-	if (svsk->sk_sock->type == SOCK_STREAM &&
-	    svsk->sk_info_authunix == NULL) {
-		/* newly cached, keep the reference */
-		svsk->sk_info_authunix = ipm;
-		ipm = NULL;
+	if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) {
+		spin_lock(&xprt->xpt_lock);
+		if (xprt->xpt_auth_cache == NULL) {
+			/* newly cached, keep the reference */
+			xprt->xpt_auth_cache = ipm;
+			ipm = NULL;
+		}
+		spin_unlock(&xprt->xpt_lock);
 	}
-	spin_unlock(&svsk->sk_lock);
 	if (ipm)
 		cache_put(&ipm->h, &ip_map_cache);
 }
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index c75bffeb89eb..1d3e5fcc2cc4 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -5,7 +5,7 @@
  *
  * The server scheduling algorithm does not always distribute the load
  * evenly when servicing a single client. May need to modify the
- * svc_sock_enqueue procedure...
+ * svc_xprt_enqueue procedure...
  *
  * TCP support is largely untested and may be a little slow. The problem
  * is that we currently do two separate recvfrom's, one for the 4-byte
@@ -48,72 +48,40 @@
 #include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/stats.h>
 
-/* SMP locking strategy:
- *
- *	svc_pool->sp_lock protects most of the fields of that pool.
- * 	svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt.
- *	when both need to be taken (rare), svc_serv->sv_lock is first.
- *	BKL protects svc_serv->sv_nrthread.
- *	svc_sock->sk_lock protects the svc_sock->sk_deferred list
- *             and the ->sk_info_authunix cache.
- *	svc_sock->sk_flags.SK_BUSY prevents a svc_sock being enqueued multiply.
- *
- *	Some flags can be set to certain values at any time
- *	providing that certain rules are followed:
- *
- *	SK_CONN, SK_DATA, can be set or cleared at any time.
- *		after a set, svc_sock_enqueue must be called.
- *		after a clear, the socket must be read/accepted
- *		 if this succeeds, it must be set again.
- *	SK_CLOSE can set at any time. It is never cleared.
- *      sk_inuse contains a bias of '1' until SK_DEAD is set.
- *             so when sk_inuse hits zero, we know the socket is dead
- *             and no-one is using it.
- *      SK_DEAD can only be set while SK_BUSY is held which ensures
- *             no other thread will be using the socket or will try to
- *	       set SK_DEAD.
- *
- */
-
-#define RPCDBG_FACILITY	RPCDBG_SVCSOCK
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
 
 static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *,
 					 int *errp, int flags);
-static void		svc_delete_socket(struct svc_sock *svsk);
 static void		svc_udp_data_ready(struct sock *, int);
 static int		svc_udp_recvfrom(struct svc_rqst *);
 static int		svc_udp_sendto(struct svc_rqst *);
-static void		svc_close_socket(struct svc_sock *svsk);
-
-static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk);
-static int svc_deferred_recv(struct svc_rqst *rqstp);
-static struct cache_deferred_req *svc_defer(struct cache_req *req);
-
-/* apparently the "standard" is that clients close
- * idle connections after 5 minutes, servers after
- * 6 minutes
- *   http://www.connectathon.org/talks96/nfstcp.pdf
- */
-static int svc_conn_age_period = 6*60;
+static void		svc_sock_detach(struct svc_xprt *);
+static void		svc_sock_free(struct svc_xprt *);
 
+static struct svc_xprt *svc_create_socket(struct svc_serv *, int,
+					  struct sockaddr *, int, int);
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
 static struct lock_class_key svc_key[2];
 static struct lock_class_key svc_slock_key[2];
 
-static inline void svc_reclassify_socket(struct socket *sock)
+static void svc_reclassify_socket(struct socket *sock)
 {
 	struct sock *sk = sock->sk;
 	BUG_ON(sock_owned_by_user(sk));
 	switch (sk->sk_family) {
 	case AF_INET:
 		sock_lock_init_class_and_name(sk, "slock-AF_INET-NFSD",
-		    &svc_slock_key[0], "sk_lock-AF_INET-NFSD", &svc_key[0]);
+					      &svc_slock_key[0],
+					      "sk_xprt.xpt_lock-AF_INET-NFSD",
+					      &svc_key[0]);
 		break;
 
 	case AF_INET6:
 		sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFSD",
-		    &svc_slock_key[1], "sk_lock-AF_INET6-NFSD", &svc_key[1]);
+					      &svc_slock_key[1],
+					      "sk_xprt.xpt_lock-AF_INET6-NFSD",
+					      &svc_key[1]);
 		break;
 
 	default:
@@ -121,81 +89,26 @@ static inline void svc_reclassify_socket(struct socket *sock)
 	}
 }
 #else
-static inline void svc_reclassify_socket(struct socket *sock)
+static void svc_reclassify_socket(struct socket *sock)
 {
 }
 #endif
 
-static char *__svc_print_addr(struct sockaddr *addr, char *buf, size_t len)
-{
-	switch (addr->sa_family) {
-	case AF_INET:
-		snprintf(buf, len, "%u.%u.%u.%u, port=%u",
-			NIPQUAD(((struct sockaddr_in *) addr)->sin_addr),
-			ntohs(((struct sockaddr_in *) addr)->sin_port));
-		break;
-
-	case AF_INET6:
-		snprintf(buf, len, "%x:%x:%x:%x:%x:%x:%x:%x, port=%u",
-			NIP6(((struct sockaddr_in6 *) addr)->sin6_addr),
-			ntohs(((struct sockaddr_in6 *) addr)->sin6_port));
-		break;
-
-	default:
-		snprintf(buf, len, "unknown address type: %d", addr->sa_family);
-		break;
-	}
-	return buf;
-}
-
-/**
- * svc_print_addr - Format rq_addr field for printing
- * @rqstp: svc_rqst struct containing address to print
- * @buf: target buffer for formatted address
- * @len: length of target buffer
- *
- */
-char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
-{
-	return __svc_print_addr(svc_addr(rqstp), buf, len);
-}
-EXPORT_SYMBOL_GPL(svc_print_addr);
-
-/*
- * Queue up an idle server thread.  Must have pool->sp_lock held.
- * Note: this is really a stack rather than a queue, so that we only
- * use as many different threads as we need, and the rest don't pollute
- * the cache.
- */
-static inline void
-svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
-	list_add(&rqstp->rq_list, &pool->sp_threads);
-}
-
-/*
- * Dequeue an nfsd thread.  Must have pool->sp_lock held.
- */
-static inline void
-svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
-	list_del(&rqstp->rq_list);
-}
-
 /*
  * Release an skbuff after use
  */
-static inline void
-svc_release_skb(struct svc_rqst *rqstp)
+static void svc_release_skb(struct svc_rqst *rqstp)
 {
-	struct sk_buff *skb = rqstp->rq_skbuff;
+	struct sk_buff *skb = rqstp->rq_xprt_ctxt;
 	struct svc_deferred_req *dr = rqstp->rq_deferred;
 
 	if (skb) {
-		rqstp->rq_skbuff = NULL;
+		struct svc_sock *svsk =
+			container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+		rqstp->rq_xprt_ctxt = NULL;
 
 		dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
-		skb_free_datagram(rqstp->rq_sock->sk_sk, skb);
+		skb_free_datagram(svsk->sk_sk, skb);
 	}
 	if (dr) {
 		rqstp->rq_deferred = NULL;
@@ -203,253 +116,6 @@ svc_release_skb(struct svc_rqst *rqstp)
 	}
 }
 
-/*
- * Any space to write?
- */
-static inline unsigned long
-svc_sock_wspace(struct svc_sock *svsk)
-{
-	int wspace;
-
-	if (svsk->sk_sock->type == SOCK_STREAM)
-		wspace = sk_stream_wspace(svsk->sk_sk);
-	else
-		wspace = sock_wspace(svsk->sk_sk);
-
-	return wspace;
-}
-
-/*
- * Queue up a socket with data pending. If there are idle nfsd
- * processes, wake 'em up.
- *
- */
-static void
-svc_sock_enqueue(struct svc_sock *svsk)
-{
-	struct svc_serv	*serv = svsk->sk_server;
-	struct svc_pool *pool;
-	struct svc_rqst	*rqstp;
-	int cpu;
-
-	if (!(svsk->sk_flags &
-	      ( (1<<SK_CONN)|(1<<SK_DATA)|(1<<SK_CLOSE)|(1<<SK_DEFERRED)) ))
-		return;
-	if (test_bit(SK_DEAD, &svsk->sk_flags))
-		return;
-
-	cpu = get_cpu();
-	pool = svc_pool_for_cpu(svsk->sk_server, cpu);
-	put_cpu();
-
-	spin_lock_bh(&pool->sp_lock);
-
-	if (!list_empty(&pool->sp_threads) &&
-	    !list_empty(&pool->sp_sockets))
-		printk(KERN_ERR
-			"svc_sock_enqueue: threads and sockets both waiting??\n");
-
-	if (test_bit(SK_DEAD, &svsk->sk_flags)) {
-		/* Don't enqueue dead sockets */
-		dprintk("svc: socket %p is dead, not enqueued\n", svsk->sk_sk);
-		goto out_unlock;
-	}
-
-	/* Mark socket as busy. It will remain in this state until the
-	 * server has processed all pending data and put the socket back
-	 * on the idle list.  We update SK_BUSY atomically because
-	 * it also guards against trying to enqueue the svc_sock twice.
-	 */
-	if (test_and_set_bit(SK_BUSY, &svsk->sk_flags)) {
-		/* Don't enqueue socket while already enqueued */
-		dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk);
-		goto out_unlock;
-	}
-	BUG_ON(svsk->sk_pool != NULL);
-	svsk->sk_pool = pool;
-
-	set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
-	if (((atomic_read(&svsk->sk_reserved) + serv->sv_max_mesg)*2
-	     > svc_sock_wspace(svsk))
-	    && !test_bit(SK_CLOSE, &svsk->sk_flags)
-	    && !test_bit(SK_CONN, &svsk->sk_flags)) {
-		/* Don't enqueue while not enough space for reply */
-		dprintk("svc: socket %p  no space, %d*2 > %ld, not enqueued\n",
-			svsk->sk_sk, atomic_read(&svsk->sk_reserved)+serv->sv_max_mesg,
-			svc_sock_wspace(svsk));
-		svsk->sk_pool = NULL;
-		clear_bit(SK_BUSY, &svsk->sk_flags);
-		goto out_unlock;
-	}
-	clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
-
-
-	if (!list_empty(&pool->sp_threads)) {
-		rqstp = list_entry(pool->sp_threads.next,
-				   struct svc_rqst,
-				   rq_list);
-		dprintk("svc: socket %p served by daemon %p\n",
-			svsk->sk_sk, rqstp);
-		svc_thread_dequeue(pool, rqstp);
-		if (rqstp->rq_sock)
-			printk(KERN_ERR
-				"svc_sock_enqueue: server %p, rq_sock=%p!\n",
-				rqstp, rqstp->rq_sock);
-		rqstp->rq_sock = svsk;
-		atomic_inc(&svsk->sk_inuse);
-		rqstp->rq_reserved = serv->sv_max_mesg;
-		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
-		BUG_ON(svsk->sk_pool != pool);
-		wake_up(&rqstp->rq_wait);
-	} else {
-		dprintk("svc: socket %p put into queue\n", svsk->sk_sk);
-		list_add_tail(&svsk->sk_ready, &pool->sp_sockets);
-		BUG_ON(svsk->sk_pool != pool);
-	}
-
-out_unlock:
-	spin_unlock_bh(&pool->sp_lock);
-}
-
-/*
- * Dequeue the first socket.  Must be called with the pool->sp_lock held.
- */
-static inline struct svc_sock *
-svc_sock_dequeue(struct svc_pool *pool)
-{
-	struct svc_sock	*svsk;
-
-	if (list_empty(&pool->sp_sockets))
-		return NULL;
-
-	svsk = list_entry(pool->sp_sockets.next,
-			  struct svc_sock, sk_ready);
-	list_del_init(&svsk->sk_ready);
-
-	dprintk("svc: socket %p dequeued, inuse=%d\n",
-		svsk->sk_sk, atomic_read(&svsk->sk_inuse));
-
-	return svsk;
-}
-
-/*
- * Having read something from a socket, check whether it
- * needs to be re-enqueued.
- * Note: SK_DATA only gets cleared when a read-attempt finds
- * no (or insufficient) data.
- */
-static inline void
-svc_sock_received(struct svc_sock *svsk)
-{
-	svsk->sk_pool = NULL;
-	clear_bit(SK_BUSY, &svsk->sk_flags);
-	svc_sock_enqueue(svsk);
-}
-
-
-/**
- * svc_reserve - change the space reserved for the reply to a request.
- * @rqstp:  The request in question
- * @space: new max space to reserve
- *
- * Each request reserves some space on the output queue of the socket
- * to make sure the reply fits.  This function reduces that reserved
- * space to be the amount of space used already, plus @space.
- *
- */
-void svc_reserve(struct svc_rqst *rqstp, int space)
-{
-	space += rqstp->rq_res.head[0].iov_len;
-
-	if (space < rqstp->rq_reserved) {
-		struct svc_sock *svsk = rqstp->rq_sock;
-		atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved);
-		rqstp->rq_reserved = space;
-
-		svc_sock_enqueue(svsk);
-	}
-}
-
-/*
- * Release a socket after use.
- */
-static inline void
-svc_sock_put(struct svc_sock *svsk)
-{
-	if (atomic_dec_and_test(&svsk->sk_inuse)) {
-		BUG_ON(! test_bit(SK_DEAD, &svsk->sk_flags));
-
-		dprintk("svc: releasing dead socket\n");
-		if (svsk->sk_sock->file)
-			sockfd_put(svsk->sk_sock);
-		else
-			sock_release(svsk->sk_sock);
-		if (svsk->sk_info_authunix != NULL)
-			svcauth_unix_info_release(svsk->sk_info_authunix);
-		kfree(svsk);
-	}
-}
-
-static void
-svc_sock_release(struct svc_rqst *rqstp)
-{
-	struct svc_sock	*svsk = rqstp->rq_sock;
-
-	svc_release_skb(rqstp);
-
-	svc_free_res_pages(rqstp);
-	rqstp->rq_res.page_len = 0;
-	rqstp->rq_res.page_base = 0;
-
-
-	/* Reset response buffer and release
-	 * the reservation.
-	 * But first, check that enough space was reserved
-	 * for the reply, otherwise we have a bug!
-	 */
-	if ((rqstp->rq_res.len) >  rqstp->rq_reserved)
-		printk(KERN_ERR "RPC request reserved %d but used %d\n",
-		       rqstp->rq_reserved,
-		       rqstp->rq_res.len);
-
-	rqstp->rq_res.head[0].iov_len = 0;
-	svc_reserve(rqstp, 0);
-	rqstp->rq_sock = NULL;
-
-	svc_sock_put(svsk);
-}
-
-/*
- * External function to wake up a server waiting for data
- * This really only makes sense for services like lockd
- * which have exactly one thread anyway.
- */
-void
-svc_wake_up(struct svc_serv *serv)
-{
-	struct svc_rqst	*rqstp;
-	unsigned int i;
-	struct svc_pool *pool;
-
-	for (i = 0; i < serv->sv_nrpools; i++) {
-		pool = &serv->sv_pools[i];
-
-		spin_lock_bh(&pool->sp_lock);
-		if (!list_empty(&pool->sp_threads)) {
-			rqstp = list_entry(pool->sp_threads.next,
-					   struct svc_rqst,
-					   rq_list);
-			dprintk("svc: daemon %p woken up.\n", rqstp);
-			/*
-			svc_thread_dequeue(pool, rqstp);
-			rqstp->rq_sock = NULL;
-			 */
-			wake_up(&rqstp->rq_wait);
-		}
-		spin_unlock_bh(&pool->sp_lock);
-	}
-}
-
 union svc_pktinfo_u {
 	struct in_pktinfo pkti;
 	struct in6_pktinfo pkti6;
@@ -459,7 +125,9 @@ union svc_pktinfo_u {
 
 static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
 {
-	switch (rqstp->rq_sock->sk_sk->sk_family) {
+	struct svc_sock *svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	switch (svsk->sk_sk->sk_family) {
 	case AF_INET: {
 			struct in_pktinfo *pki = CMSG_DATA(cmh);
 
@@ -489,10 +157,10 @@ static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh)
 /*
  * Generic sendto routine
  */
-static int
-svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
+static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
 {
-	struct svc_sock	*svsk = rqstp->rq_sock;
+	struct svc_sock	*svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
 	struct socket	*sock = svsk->sk_sock;
 	int		slen;
 	union {
@@ -565,7 +233,7 @@ svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
 	}
 out:
 	dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n",
-		rqstp->rq_sock, xdr->head[0].iov_base, xdr->head[0].iov_len,
+		svsk, xdr->head[0].iov_base, xdr->head[0].iov_len,
 		xdr->len, len, svc_print_addr(rqstp, buf, sizeof(buf)));
 
 	return len;
@@ -602,7 +270,7 @@ svc_sock_names(char *buf, struct svc_serv *serv, char *toclose)
 	if (!serv)
 		return 0;
 	spin_lock_bh(&serv->sv_lock);
-	list_for_each_entry(svsk, &serv->sv_permsocks, sk_list) {
+	list_for_each_entry(svsk, &serv->sv_permsocks, sk_xprt.xpt_list) {
 		int onelen = one_sock_name(buf+len, svsk);
 		if (toclose && strcmp(toclose, buf+len) == 0)
 			closesk = svsk;
@@ -614,7 +282,7 @@ svc_sock_names(char *buf, struct svc_serv *serv, char *toclose)
 		/* Should unregister with portmap, but you cannot
 		 * unregister just one protocol...
 		 */
-		svc_close_socket(closesk);
+		svc_close_xprt(&closesk->sk_xprt);
 	else if (toclose)
 		return -ENOENT;
 	return len;
@@ -624,8 +292,7 @@ EXPORT_SYMBOL(svc_sock_names);
 /*
  * Check input queue length
  */
-static int
-svc_recv_available(struct svc_sock *svsk)
+static int svc_recv_available(struct svc_sock *svsk)
 {
 	struct socket	*sock = svsk->sk_sock;
 	int		avail, err;
@@ -638,48 +305,31 @@ svc_recv_available(struct svc_sock *svsk)
 /*
  * Generic recvfrom routine.
  */
-static int
-svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, int buflen)
+static int svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr,
+			int buflen)
 {
-	struct svc_sock *svsk = rqstp->rq_sock;
+	struct svc_sock *svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
 	struct msghdr msg = {
 		.msg_flags	= MSG_DONTWAIT,
 	};
-	struct sockaddr *sin;
 	int len;
 
+	rqstp->rq_xprt_hlen = 0;
+
 	len = kernel_recvmsg(svsk->sk_sock, &msg, iov, nr, buflen,
 				msg.msg_flags);
 
-	/* sock_recvmsg doesn't fill in the name/namelen, so we must..
-	 */
-	memcpy(&rqstp->rq_addr, &svsk->sk_remote, svsk->sk_remotelen);
-	rqstp->rq_addrlen = svsk->sk_remotelen;
-
-	/* Destination address in request is needed for binding the
-	 * source address in RPC callbacks later.
-	 */
-	sin = (struct sockaddr *)&svsk->sk_local;
-	switch (sin->sa_family) {
-	case AF_INET:
-		rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr;
-		break;
-	case AF_INET6:
-		rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr;
-		break;
-	}
-
 	dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n",
 		svsk, iov[0].iov_base, iov[0].iov_len, len);
-
 	return len;
 }
 
 /*
  * Set socket snd and rcv buffer lengths
  */
-static inline void
-svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
+static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,
+				unsigned int rcv)
 {
 #if 0
 	mm_segment_t	oldfs;
@@ -704,16 +354,16 @@ svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv)
 /*
  * INET callback when data has been received on the socket.
  */
-static void
-svc_udp_data_ready(struct sock *sk, int count)
+static void svc_udp_data_ready(struct sock *sk, int count)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
 
 	if (svsk) {
 		dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n",
-			svsk, sk, count, test_bit(SK_BUSY, &svsk->sk_flags));
-		set_bit(SK_DATA, &svsk->sk_flags);
-		svc_sock_enqueue(svsk);
+			svsk, sk, count,
+			test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
+		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+		svc_xprt_enqueue(&svsk->sk_xprt);
 	}
 	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
 		wake_up_interruptible(sk->sk_sleep);
@@ -722,15 +372,14 @@ svc_udp_data_ready(struct sock *sk, int count)
 /*
  * INET callback when space is newly available on the socket.
  */
-static void
-svc_write_space(struct sock *sk)
+static void svc_write_space(struct sock *sk)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)(sk->sk_user_data);
 
 	if (svsk) {
 		dprintk("svc: socket %p(inet %p), write_space busy=%d\n",
-			svsk, sk, test_bit(SK_BUSY, &svsk->sk_flags));
-		svc_sock_enqueue(svsk);
+			svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags));
+		svc_xprt_enqueue(&svsk->sk_xprt);
 	}
 
 	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
@@ -740,10 +389,19 @@ svc_write_space(struct sock *sk)
 	}
 }
 
-static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp,
-					    struct cmsghdr *cmh)
+/*
+ * Copy the UDP datagram's destination address to the rqstp structure.
+ * The 'destination' address in this case is the address to which the
+ * peer sent the datagram, i.e. our local address. For multihomed
+ * hosts, this can change from msg to msg. Note that only the IP
+ * address changes, the port number should remain the same.
+ */
+static void svc_udp_get_dest_address(struct svc_rqst *rqstp,
+				     struct cmsghdr *cmh)
 {
-	switch (rqstp->rq_sock->sk_sk->sk_family) {
+	struct svc_sock *svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	switch (svsk->sk_sk->sk_family) {
 	case AF_INET: {
 		struct in_pktinfo *pki = CMSG_DATA(cmh);
 		rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr;
@@ -760,11 +418,11 @@ static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp,
 /*
  * Receive a datagram from a UDP socket.
  */
-static int
-svc_udp_recvfrom(struct svc_rqst *rqstp)
+static int svc_udp_recvfrom(struct svc_rqst *rqstp)
 {
-	struct svc_sock	*svsk = rqstp->rq_sock;
-	struct svc_serv	*serv = svsk->sk_server;
+	struct svc_sock	*svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
 	struct sk_buff	*skb;
 	union {
 		struct cmsghdr	hdr;
@@ -779,7 +437,7 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
 		.msg_flags = MSG_DONTWAIT,
 	};
 
-	if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
+	if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags))
 	    /* udp sockets need large rcvbuf as all pending
 	     * requests are still in that buffer.  sndbuf must
 	     * also be large enough that there is enough space
@@ -792,17 +450,7 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
 				(serv->sv_nrthreads+3) * serv->sv_max_mesg,
 				(serv->sv_nrthreads+3) * serv->sv_max_mesg);
 
-	if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
-		svc_sock_received(svsk);
-		return svc_deferred_recv(rqstp);
-	}
-
-	if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
-		svc_delete_socket(svsk);
-		return 0;
-	}
-
-	clear_bit(SK_DATA, &svsk->sk_flags);
+	clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 	skb = NULL;
 	err = kernel_recvmsg(svsk->sk_sock, &msg, NULL,
 			     0, 0, MSG_PEEK | MSG_DONTWAIT);
@@ -813,24 +461,27 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
 		if (err != -EAGAIN) {
 			/* possibly an icmp error */
 			dprintk("svc: recvfrom returned error %d\n", -err);
-			set_bit(SK_DATA, &svsk->sk_flags);
+			set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 		}
-		svc_sock_received(svsk);
+		svc_xprt_received(&svsk->sk_xprt);
 		return -EAGAIN;
 	}
-	rqstp->rq_addrlen = sizeof(rqstp->rq_addr);
+	len = svc_addr_len(svc_addr(rqstp));
+	if (len < 0)
+		return len;
+	rqstp->rq_addrlen = len;
 	if (skb->tstamp.tv64 == 0) {
 		skb->tstamp = ktime_get_real();
 		/* Don't enable netstamp, sunrpc doesn't
 		   need that much accuracy */
 	}
 	svsk->sk_sk->sk_stamp = skb->tstamp;
-	set_bit(SK_DATA, &svsk->sk_flags); /* there may be more data... */
+	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* there may be more data... */
 
 	/*
 	 * Maybe more packets - kick another thread ASAP.
 	 */
-	svc_sock_received(svsk);
+	svc_xprt_received(&svsk->sk_xprt);
 
 	len  = skb->len - sizeof(struct udphdr);
 	rqstp->rq_arg.len = len;
@@ -861,13 +512,14 @@ svc_udp_recvfrom(struct svc_rqst *rqstp)
 		skb_free_datagram(svsk->sk_sk, skb);
 	} else {
 		/* we can use it in-place */
-		rqstp->rq_arg.head[0].iov_base = skb->data + sizeof(struct udphdr);
+		rqstp->rq_arg.head[0].iov_base = skb->data +
+			sizeof(struct udphdr);
 		rqstp->rq_arg.head[0].iov_len = len;
 		if (skb_checksum_complete(skb)) {
 			skb_free_datagram(svsk->sk_sk, skb);
 			return 0;
 		}
-		rqstp->rq_skbuff = skb;
+		rqstp->rq_xprt_ctxt = skb;
 	}
 
 	rqstp->rq_arg.page_base = 0;
@@ -900,27 +552,81 @@ svc_udp_sendto(struct svc_rqst *rqstp)
 	return error;
 }
 
-static void
-svc_udp_init(struct svc_sock *svsk)
+static void svc_udp_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+}
+
+static int svc_udp_has_wspace(struct svc_xprt *xprt)
+{
+	struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+	struct svc_serv	*serv = xprt->xpt_server;
+	unsigned long required;
+
+	/*
+	 * Set the SOCK_NOSPACE flag before checking the available
+	 * sock space.
+	 */
+	set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+	required = atomic_read(&svsk->sk_xprt.xpt_reserved) + serv->sv_max_mesg;
+	if (required*2 > sock_wspace(svsk->sk_sk))
+		return 0;
+	clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+	return 1;
+}
+
+static struct svc_xprt *svc_udp_accept(struct svc_xprt *xprt)
+{
+	BUG();
+	return NULL;
+}
+
+static struct svc_xprt *svc_udp_create(struct svc_serv *serv,
+				       struct sockaddr *sa, int salen,
+				       int flags)
+{
+	return svc_create_socket(serv, IPPROTO_UDP, sa, salen, flags);
+}
+
+static struct svc_xprt_ops svc_udp_ops = {
+	.xpo_create = svc_udp_create,
+	.xpo_recvfrom = svc_udp_recvfrom,
+	.xpo_sendto = svc_udp_sendto,
+	.xpo_release_rqst = svc_release_skb,
+	.xpo_detach = svc_sock_detach,
+	.xpo_free = svc_sock_free,
+	.xpo_prep_reply_hdr = svc_udp_prep_reply_hdr,
+	.xpo_has_wspace = svc_udp_has_wspace,
+	.xpo_accept = svc_udp_accept,
+};
+
+static struct svc_xprt_class svc_udp_class = {
+	.xcl_name = "udp",
+	.xcl_owner = THIS_MODULE,
+	.xcl_ops = &svc_udp_ops,
+	.xcl_max_payload = RPCSVC_MAXPAYLOAD_UDP,
+};
+
+static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
 {
 	int one = 1;
 	mm_segment_t oldfs;
 
+	svc_xprt_init(&svc_udp_class, &svsk->sk_xprt, serv);
+	clear_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
 	svsk->sk_sk->sk_data_ready = svc_udp_data_ready;
 	svsk->sk_sk->sk_write_space = svc_write_space;
-	svsk->sk_recvfrom = svc_udp_recvfrom;
-	svsk->sk_sendto = svc_udp_sendto;
 
 	/* initialise setting must have enough space to
 	 * receive and respond to one request.
 	 * svc_udp_recvfrom will re-adjust if necessary
 	 */
 	svc_sock_setbufsize(svsk->sk_sock,
-			    3 * svsk->sk_server->sv_max_mesg,
-			    3 * svsk->sk_server->sv_max_mesg);
+			    3 * svsk->sk_xprt.xpt_server->sv_max_mesg,
+			    3 * svsk->sk_xprt.xpt_server->sv_max_mesg);
 
-	set_bit(SK_DATA, &svsk->sk_flags); /* might have come in before data_ready set up */
-	set_bit(SK_CHNGBUF, &svsk->sk_flags);
+	/* data might have come in before data_ready set up */
+	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+	set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
 
 	oldfs = get_fs();
 	set_fs(KERNEL_DS);
@@ -934,8 +640,7 @@ svc_udp_init(struct svc_sock *svsk)
  * A data_ready event on a listening socket means there's a connection
  * pending. Do not use state_change as a substitute for it.
  */
-static void
-svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
+static void svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
 
@@ -954,8 +659,8 @@ svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
 	 */
 	if (sk->sk_state == TCP_LISTEN) {
 		if (svsk) {
-			set_bit(SK_CONN, &svsk->sk_flags);
-			svc_sock_enqueue(svsk);
+			set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
+			svc_xprt_enqueue(&svsk->sk_xprt);
 		} else
 			printk("svc: socket %p: no user data\n", sk);
 	}
@@ -967,8 +672,7 @@ svc_tcp_listen_data_ready(struct sock *sk, int count_unused)
 /*
  * A state change on a connected socket means it's dying or dead.
  */
-static void
-svc_tcp_state_change(struct sock *sk)
+static void svc_tcp_state_change(struct sock *sk)
 {
 	struct svc_sock	*svsk = (struct svc_sock *)sk->sk_user_data;
 
@@ -978,51 +682,36 @@ svc_tcp_state_change(struct sock *sk)
 	if (!svsk)
 		printk("svc: socket %p: no user data\n", sk);
 	else {
-		set_bit(SK_CLOSE, &svsk->sk_flags);
-		svc_sock_enqueue(svsk);
+		set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
+		svc_xprt_enqueue(&svsk->sk_xprt);
 	}
 	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
 		wake_up_interruptible_all(sk->sk_sleep);
 }
 
-static void
-svc_tcp_data_ready(struct sock *sk, int count)
+static void svc_tcp_data_ready(struct sock *sk, int count)
 {
 	struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data;
 
 	dprintk("svc: socket %p TCP data ready (svsk %p)\n",
 		sk, sk->sk_user_data);
 	if (svsk) {
-		set_bit(SK_DATA, &svsk->sk_flags);
-		svc_sock_enqueue(svsk);
+		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+		svc_xprt_enqueue(&svsk->sk_xprt);
 	}
 	if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
 		wake_up_interruptible(sk->sk_sleep);
 }
 
-static inline int svc_port_is_privileged(struct sockaddr *sin)
-{
-	switch (sin->sa_family) {
-	case AF_INET:
-		return ntohs(((struct sockaddr_in *)sin)->sin_port)
-			< PROT_SOCK;
-	case AF_INET6:
-		return ntohs(((struct sockaddr_in6 *)sin)->sin6_port)
-			< PROT_SOCK;
-	default:
-		return 0;
-	}
-}
-
 /*
  * Accept a TCP connection
  */
-static void
-svc_tcp_accept(struct svc_sock *svsk)
+static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
 {
+	struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
 	struct sockaddr_storage addr;
 	struct sockaddr	*sin = (struct sockaddr *) &addr;
-	struct svc_serv	*serv = svsk->sk_server;
+	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
 	struct socket	*sock = svsk->sk_sock;
 	struct socket	*newsock;
 	struct svc_sock	*newsvsk;
@@ -1031,9 +720,9 @@ svc_tcp_accept(struct svc_sock *svsk)
 
 	dprintk("svc: tcp_accept %p sock %p\n", svsk, sock);
 	if (!sock)
-		return;
+		return NULL;
 
-	clear_bit(SK_CONN, &svsk->sk_flags);
+	clear_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
 	err = kernel_accept(sock, &newsock, O_NONBLOCK);
 	if (err < 0) {
 		if (err == -ENOMEM)
@@ -1042,11 +731,9 @@ svc_tcp_accept(struct svc_sock *svsk)
 		else if (err != -EAGAIN && net_ratelimit())
 			printk(KERN_WARNING "%s: accept failed (err %d)!\n",
 				   serv->sv_name, -err);
-		return;
+		return NULL;
 	}
-
-	set_bit(SK_CONN, &svsk->sk_flags);
-	svc_sock_enqueue(svsk);
+	set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
 
 	err = kernel_getpeername(newsock, sin, &slen);
 	if (err < 0) {
@@ -1077,106 +764,42 @@ svc_tcp_accept(struct svc_sock *svsk)
 	if (!(newsvsk = svc_setup_socket(serv, newsock, &err,
 				 (SVC_SOCK_ANONYMOUS | SVC_SOCK_TEMPORARY))))
 		goto failed;
-	memcpy(&newsvsk->sk_remote, sin, slen);
-	newsvsk->sk_remotelen = slen;
+	svc_xprt_set_remote(&newsvsk->sk_xprt, sin, slen);
 	err = kernel_getsockname(newsock, sin, &slen);
 	if (unlikely(err < 0)) {
 		dprintk("svc_tcp_accept: kernel_getsockname error %d\n", -err);
 		slen = offsetof(struct sockaddr, sa_data);
 	}
-	memcpy(&newsvsk->sk_local, sin, slen);
-
-	svc_sock_received(newsvsk);
-
-	/* make sure that we don't have too many active connections.
-	 * If we have, something must be dropped.
-	 *
-	 * There's no point in trying to do random drop here for
-	 * DoS prevention. The NFS clients does 1 reconnect in 15
-	 * seconds. An attacker can easily beat that.
-	 *
-	 * The only somewhat efficient mechanism would be if drop
-	 * old connections from the same IP first. But right now
-	 * we don't even record the client IP in svc_sock.
-	 */
-	if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) {
-		struct svc_sock *svsk = NULL;
-		spin_lock_bh(&serv->sv_lock);
-		if (!list_empty(&serv->sv_tempsocks)) {
-			if (net_ratelimit()) {
-				/* Try to help the admin */
-				printk(KERN_NOTICE "%s: too many open TCP "
-					"sockets, consider increasing the "
-					"number of nfsd threads\n",
-						   serv->sv_name);
-				printk(KERN_NOTICE
-				       "%s: last TCP connect from %s\n",
-				       serv->sv_name, __svc_print_addr(sin,
-							buf, sizeof(buf)));
-			}
-			/*
-			 * Always select the oldest socket. It's not fair,
-			 * but so is life
-			 */
-			svsk = list_entry(serv->sv_tempsocks.prev,
-					  struct svc_sock,
-					  sk_list);
-			set_bit(SK_CLOSE, &svsk->sk_flags);
-			atomic_inc(&svsk->sk_inuse);
-		}
-		spin_unlock_bh(&serv->sv_lock);
-
-		if (svsk) {
-			svc_sock_enqueue(svsk);
-			svc_sock_put(svsk);
-		}
-
-	}
+	svc_xprt_set_local(&newsvsk->sk_xprt, sin, slen);
 
 	if (serv->sv_stats)
 		serv->sv_stats->nettcpconn++;
 
-	return;
+	return &newsvsk->sk_xprt;
 
 failed:
 	sock_release(newsock);
-	return;
+	return NULL;
 }
 
 /*
  * Receive data from a TCP socket.
  */
-static int
-svc_tcp_recvfrom(struct svc_rqst *rqstp)
+static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
 {
-	struct svc_sock	*svsk = rqstp->rq_sock;
-	struct svc_serv	*serv = svsk->sk_server;
+	struct svc_sock	*svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
 	int		len;
 	struct kvec *vec;
 	int pnum, vlen;
 
 	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
-		svsk, test_bit(SK_DATA, &svsk->sk_flags),
-		test_bit(SK_CONN, &svsk->sk_flags),
-		test_bit(SK_CLOSE, &svsk->sk_flags));
+		svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
+		test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags),
+		test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
 
-	if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) {
-		svc_sock_received(svsk);
-		return svc_deferred_recv(rqstp);
-	}
-
-	if (test_bit(SK_CLOSE, &svsk->sk_flags)) {
-		svc_delete_socket(svsk);
-		return 0;
-	}
-
-	if (svsk->sk_sk->sk_state == TCP_LISTEN) {
-		svc_tcp_accept(svsk);
-		svc_sock_received(svsk);
-		return 0;
-	}
-
-	if (test_and_clear_bit(SK_CHNGBUF, &svsk->sk_flags))
+	if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags))
 		/* sndbuf needs to have room for one request
 		 * per thread, otherwise we can stall even when the
 		 * network isn't a bottleneck.
@@ -1193,7 +816,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
 				    (serv->sv_nrthreads+3) * serv->sv_max_mesg,
 				    3 * serv->sv_max_mesg);
 
-	clear_bit(SK_DATA, &svsk->sk_flags);
+	clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
 	/* Receive data. If we haven't got the record length yet, get
 	 * the next four bytes. Otherwise try to gobble up as much as
@@ -1212,7 +835,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
 		if (len < want) {
 			dprintk("svc: short recvfrom while reading record length (%d of %lu)\n",
 				len, want);
-			svc_sock_received(svsk);
+			svc_xprt_received(&svsk->sk_xprt);
 			return -EAGAIN; /* record header not complete */
 		}
 
@@ -1248,11 +871,11 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
 	if (len < svsk->sk_reclen) {
 		dprintk("svc: incomplete TCP record (%d of %d)\n",
 			len, svsk->sk_reclen);
-		svc_sock_received(svsk);
+		svc_xprt_received(&svsk->sk_xprt);
 		return -EAGAIN;	/* record not complete */
 	}
 	len = svsk->sk_reclen;
-	set_bit(SK_DATA, &svsk->sk_flags);
+	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
 	vec = rqstp->rq_vec;
 	vec[0] = rqstp->rq_arg.head[0];
@@ -1281,30 +904,31 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
 		rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
 	}
 
-	rqstp->rq_skbuff      = NULL;
+	rqstp->rq_xprt_ctxt   = NULL;
 	rqstp->rq_prot	      = IPPROTO_TCP;
 
 	/* Reset TCP read info */
 	svsk->sk_reclen = 0;
 	svsk->sk_tcplen = 0;
 
-	svc_sock_received(svsk);
+	svc_xprt_copy_addrs(rqstp, &svsk->sk_xprt);
+	svc_xprt_received(&svsk->sk_xprt);
 	if (serv->sv_stats)
 		serv->sv_stats->nettcpcnt++;
 
 	return len;
 
  err_delete:
-	svc_delete_socket(svsk);
+	set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
 	return -EAGAIN;
 
  error:
 	if (len == -EAGAIN) {
 		dprintk("RPC: TCP recvfrom got EAGAIN\n");
-		svc_sock_received(svsk);
+		svc_xprt_received(&svsk->sk_xprt);
 	} else {
 		printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
-					svsk->sk_server->sv_name, -len);
+		       svsk->sk_xprt.xpt_server->sv_name, -len);
 		goto err_delete;
 	}
 
@@ -1314,8 +938,7 @@ svc_tcp_recvfrom(struct svc_rqst *rqstp)
 /*
  * Send out data on TCP socket.
  */
-static int
-svc_tcp_sendto(struct svc_rqst *rqstp)
+static int svc_tcp_sendto(struct svc_rqst *rqstp)
 {
 	struct xdr_buf	*xbufp = &rqstp->rq_res;
 	int sent;
@@ -1328,35 +951,109 @@ svc_tcp_sendto(struct svc_rqst *rqstp)
 	reclen = htonl(0x80000000|((xbufp->len ) - 4));
 	memcpy(xbufp->head[0].iov_base, &reclen, 4);
 
-	if (test_bit(SK_DEAD, &rqstp->rq_sock->sk_flags))
+	if (test_bit(XPT_DEAD, &rqstp->rq_xprt->xpt_flags))
 		return -ENOTCONN;
 
 	sent = svc_sendto(rqstp, &rqstp->rq_res);
 	if (sent != xbufp->len) {
-		printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n",
-		       rqstp->rq_sock->sk_server->sv_name,
+		printk(KERN_NOTICE
+		       "rpc-srv/tcp: %s: %s %d when sending %d bytes "
+		       "- shutting down socket\n",
+		       rqstp->rq_xprt->xpt_server->sv_name,
 		       (sent<0)?"got error":"sent only",
 		       sent, xbufp->len);
-		set_bit(SK_CLOSE, &rqstp->rq_sock->sk_flags);
-		svc_sock_enqueue(rqstp->rq_sock);
+		set_bit(XPT_CLOSE, &rqstp->rq_xprt->xpt_flags);
+		svc_xprt_enqueue(rqstp->rq_xprt);
 		sent = -EAGAIN;
 	}
 	return sent;
 }
 
-static void
-svc_tcp_init(struct svc_sock *svsk)
+/*
+ * Setup response header. TCP has a 4B record length field.
+ */
+static void svc_tcp_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+	struct kvec *resv = &rqstp->rq_res.head[0];
+
+	/* tcp needs a space for the record length... */
+	svc_putnl(resv, 0);
+}
+
+static int svc_tcp_has_wspace(struct svc_xprt *xprt)
+{
+	struct svc_sock *svsk =	container_of(xprt, struct svc_sock, sk_xprt);
+	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
+	int required;
+	int wspace;
+
+	/*
+	 * Set the SOCK_NOSPACE flag before checking the available
+	 * sock space.
+	 */
+	set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+	required = atomic_read(&svsk->sk_xprt.xpt_reserved) + serv->sv_max_mesg;
+	wspace = sk_stream_wspace(svsk->sk_sk);
+
+	if (wspace < sk_stream_min_wspace(svsk->sk_sk))
+		return 0;
+	if (required * 2 > wspace)
+		return 0;
+
+	clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
+	return 1;
+}
+
+static struct svc_xprt *svc_tcp_create(struct svc_serv *serv,
+				       struct sockaddr *sa, int salen,
+				       int flags)
+{
+	return svc_create_socket(serv, IPPROTO_TCP, sa, salen, flags);
+}
+
+static struct svc_xprt_ops svc_tcp_ops = {
+	.xpo_create = svc_tcp_create,
+	.xpo_recvfrom = svc_tcp_recvfrom,
+	.xpo_sendto = svc_tcp_sendto,
+	.xpo_release_rqst = svc_release_skb,
+	.xpo_detach = svc_sock_detach,
+	.xpo_free = svc_sock_free,
+	.xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr,
+	.xpo_has_wspace = svc_tcp_has_wspace,
+	.xpo_accept = svc_tcp_accept,
+};
+
+static struct svc_xprt_class svc_tcp_class = {
+	.xcl_name = "tcp",
+	.xcl_owner = THIS_MODULE,
+	.xcl_ops = &svc_tcp_ops,
+	.xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
+};
+
+void svc_init_xprt_sock(void)
+{
+	svc_reg_xprt_class(&svc_tcp_class);
+	svc_reg_xprt_class(&svc_udp_class);
+}
+
+void svc_cleanup_xprt_sock(void)
+{
+	svc_unreg_xprt_class(&svc_tcp_class);
+	svc_unreg_xprt_class(&svc_udp_class);
+}
+
+static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
 {
 	struct sock	*sk = svsk->sk_sk;
 	struct tcp_sock *tp = tcp_sk(sk);
 
-	svsk->sk_recvfrom = svc_tcp_recvfrom;
-	svsk->sk_sendto = svc_tcp_sendto;
-
+	svc_xprt_init(&svc_tcp_class, &svsk->sk_xprt, serv);
+	set_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
 	if (sk->sk_state == TCP_LISTEN) {
 		dprintk("setting up TCP socket for listening\n");
+		set_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags);
 		sk->sk_data_ready = svc_tcp_listen_data_ready;
-		set_bit(SK_CONN, &svsk->sk_flags);
+		set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags);
 	} else {
 		dprintk("setting up TCP socket for reading\n");
 		sk->sk_state_change = svc_tcp_state_change;
@@ -1373,18 +1070,17 @@ svc_tcp_init(struct svc_sock *svsk)
 		 * svc_tcp_recvfrom will re-adjust if necessary
 		 */
 		svc_sock_setbufsize(svsk->sk_sock,
-				    3 * svsk->sk_server->sv_max_mesg,
-				    3 * svsk->sk_server->sv_max_mesg);
+				    3 * svsk->sk_xprt.xpt_server->sv_max_mesg,
+				    3 * svsk->sk_xprt.xpt_server->sv_max_mesg);
 
-		set_bit(SK_CHNGBUF, &svsk->sk_flags);
-		set_bit(SK_DATA, &svsk->sk_flags);
+		set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
+		set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 		if (sk->sk_state != TCP_ESTABLISHED)
-			set_bit(SK_CLOSE, &svsk->sk_flags);
+			set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
 	}
 }
 
-void
-svc_sock_update_bufs(struct svc_serv *serv)
+void svc_sock_update_bufs(struct svc_serv *serv)
 {
 	/*
 	 * The number of server threads has changed. Update
@@ -1395,232 +1091,18 @@ svc_sock_update_bufs(struct svc_serv *serv)
 	spin_lock_bh(&serv->sv_lock);
 	list_for_each(le, &serv->sv_permsocks) {
 		struct svc_sock *svsk =
-			list_entry(le, struct svc_sock, sk_list);
-		set_bit(SK_CHNGBUF, &svsk->sk_flags);
+			list_entry(le, struct svc_sock, sk_xprt.xpt_list);
+		set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
 	}
 	list_for_each(le, &serv->sv_tempsocks) {
 		struct svc_sock *svsk =
-			list_entry(le, struct svc_sock, sk_list);
-		set_bit(SK_CHNGBUF, &svsk->sk_flags);
+			list_entry(le, struct svc_sock, sk_xprt.xpt_list);
+		set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
 	}
 	spin_unlock_bh(&serv->sv_lock);
 }
 
 /*
- * Receive the next request on any socket.  This code is carefully
- * organised not to touch any cachelines in the shared svc_serv
- * structure, only cachelines in the local svc_pool.
- */
-int
-svc_recv(struct svc_rqst *rqstp, long timeout)
-{
-	struct svc_sock		*svsk = NULL;
-	struct svc_serv		*serv = rqstp->rq_server;
-	struct svc_pool		*pool = rqstp->rq_pool;
-	int			len, i;
-	int 			pages;
-	struct xdr_buf		*arg;
-	DECLARE_WAITQUEUE(wait, current);
-
-	dprintk("svc: server %p waiting for data (to = %ld)\n",
-		rqstp, timeout);
-
-	if (rqstp->rq_sock)
-		printk(KERN_ERR
-			"svc_recv: service %p, socket not NULL!\n",
-			 rqstp);
-	if (waitqueue_active(&rqstp->rq_wait))
-		printk(KERN_ERR
-			"svc_recv: service %p, wait queue active!\n",
-			 rqstp);
-
-
-	/* now allocate needed pages.  If we get a failure, sleep briefly */
-	pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
-	for (i=0; i < pages ; i++)
-		while (rqstp->rq_pages[i] == NULL) {
-			struct page *p = alloc_page(GFP_KERNEL);
-			if (!p)
-				schedule_timeout_uninterruptible(msecs_to_jiffies(500));
-			rqstp->rq_pages[i] = p;
-		}
-	rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
-	BUG_ON(pages >= RPCSVC_MAXPAGES);
-
-	/* Make arg->head point to first page and arg->pages point to rest */
-	arg = &rqstp->rq_arg;
-	arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
-	arg->head[0].iov_len = PAGE_SIZE;
-	arg->pages = rqstp->rq_pages + 1;
-	arg->page_base = 0;
-	/* save at least one page for response */
-	arg->page_len = (pages-2)*PAGE_SIZE;
-	arg->len = (pages-1)*PAGE_SIZE;
-	arg->tail[0].iov_len = 0;
-
-	try_to_freeze();
-	cond_resched();
-	if (signalled())
-		return -EINTR;
-
-	spin_lock_bh(&pool->sp_lock);
-	if ((svsk = svc_sock_dequeue(pool)) != NULL) {
-		rqstp->rq_sock = svsk;
-		atomic_inc(&svsk->sk_inuse);
-		rqstp->rq_reserved = serv->sv_max_mesg;
-		atomic_add(rqstp->rq_reserved, &svsk->sk_reserved);
-	} else {
-		/* No data pending. Go to sleep */
-		svc_thread_enqueue(pool, rqstp);
-
-		/*
-		 * We have to be able to interrupt this wait
-		 * to bring down the daemons ...
-		 */
-		set_current_state(TASK_INTERRUPTIBLE);
-		add_wait_queue(&rqstp->rq_wait, &wait);
-		spin_unlock_bh(&pool->sp_lock);
-
-		schedule_timeout(timeout);
-
-		try_to_freeze();
-
-		spin_lock_bh(&pool->sp_lock);
-		remove_wait_queue(&rqstp->rq_wait, &wait);
-
-		if (!(svsk = rqstp->rq_sock)) {
-			svc_thread_dequeue(pool, rqstp);
-			spin_unlock_bh(&pool->sp_lock);
-			dprintk("svc: server %p, no data yet\n", rqstp);
-			return signalled()? -EINTR : -EAGAIN;
-		}
-	}
-	spin_unlock_bh(&pool->sp_lock);
-
-	dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n",
-		 rqstp, pool->sp_id, svsk, atomic_read(&svsk->sk_inuse));
-	len = svsk->sk_recvfrom(rqstp);
-	dprintk("svc: got len=%d\n", len);
-
-	/* No data, incomplete (TCP) read, or accept() */
-	if (len == 0 || len == -EAGAIN) {
-		rqstp->rq_res.len = 0;
-		svc_sock_release(rqstp);
-		return -EAGAIN;
-	}
-	svsk->sk_lastrecv = get_seconds();
-	clear_bit(SK_OLD, &svsk->sk_flags);
-
-	rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
-	rqstp->rq_chandle.defer = svc_defer;
-
-	if (serv->sv_stats)
-		serv->sv_stats->netcnt++;
-	return len;
-}
-
-/*
- * Drop request
- */
-void
-svc_drop(struct svc_rqst *rqstp)
-{
-	dprintk("svc: socket %p dropped request\n", rqstp->rq_sock);
-	svc_sock_release(rqstp);
-}
-
-/*
- * Return reply to client.
- */
-int
-svc_send(struct svc_rqst *rqstp)
-{
-	struct svc_sock	*svsk;
-	int		len;
-	struct xdr_buf	*xb;
-
-	if ((svsk = rqstp->rq_sock) == NULL) {
-		printk(KERN_WARNING "NULL socket pointer in %s:%d\n",
-				__FILE__, __LINE__);
-		return -EFAULT;
-	}
-
-	/* release the receive skb before sending the reply */
-	svc_release_skb(rqstp);
-
-	/* calculate over-all length */
-	xb = & rqstp->rq_res;
-	xb->len = xb->head[0].iov_len +
-		xb->page_len +
-		xb->tail[0].iov_len;
-
-	/* Grab svsk->sk_mutex to serialize outgoing data. */
-	mutex_lock(&svsk->sk_mutex);
-	if (test_bit(SK_DEAD, &svsk->sk_flags))
-		len = -ENOTCONN;
-	else
-		len = svsk->sk_sendto(rqstp);
-	mutex_unlock(&svsk->sk_mutex);
-	svc_sock_release(rqstp);
-
-	if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
-		return 0;
-	return len;
-}
-
-/*
- * Timer function to close old temporary sockets, using
- * a mark-and-sweep algorithm.
- */
-static void
-svc_age_temp_sockets(unsigned long closure)
-{
-	struct svc_serv *serv = (struct svc_serv *)closure;
-	struct svc_sock *svsk;
-	struct list_head *le, *next;
-	LIST_HEAD(to_be_aged);
-
-	dprintk("svc_age_temp_sockets\n");
-
-	if (!spin_trylock_bh(&serv->sv_lock)) {
-		/* busy, try again 1 sec later */
-		dprintk("svc_age_temp_sockets: busy\n");
-		mod_timer(&serv->sv_temptimer, jiffies + HZ);
-		return;
-	}
-
-	list_for_each_safe(le, next, &serv->sv_tempsocks) {
-		svsk = list_entry(le, struct svc_sock, sk_list);
-
-		if (!test_and_set_bit(SK_OLD, &svsk->sk_flags))
-			continue;
-		if (atomic_read(&svsk->sk_inuse) > 1 || test_bit(SK_BUSY, &svsk->sk_flags))
-			continue;
-		atomic_inc(&svsk->sk_inuse);
-		list_move(le, &to_be_aged);
-		set_bit(SK_CLOSE, &svsk->sk_flags);
-		set_bit(SK_DETACHED, &svsk->sk_flags);
-	}
-	spin_unlock_bh(&serv->sv_lock);
-
-	while (!list_empty(&to_be_aged)) {
-		le = to_be_aged.next;
-		/* fiddling the sk_list node is safe 'cos we're SK_DETACHED */
-		list_del_init(le);
-		svsk = list_entry(le, struct svc_sock, sk_list);
-
-		dprintk("queuing svsk %p for closing, %lu seconds old\n",
-			svsk, get_seconds() - svsk->sk_lastrecv);
-
-		/* a thread will dequeue and close it soon */
-		svc_sock_enqueue(svsk);
-		svc_sock_put(svsk);
-	}
-
-	mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ);
-}
-
-/*
  * Initialize socket for RPC use and create svc_sock struct
  * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF.
  */
@@ -1631,7 +1113,6 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
 	struct svc_sock	*svsk;
 	struct sock	*inet;
 	int		pmap_register = !(flags & SVC_SOCK_ANONYMOUS);
-	int		is_temporary = flags & SVC_SOCK_TEMPORARY;
 
 	dprintk("svc: svc_setup_socket %p\n", sock);
 	if (!(svsk = kzalloc(sizeof(*svsk), GFP_KERNEL))) {
@@ -1651,44 +1132,18 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
 		return NULL;
 	}
 
-	set_bit(SK_BUSY, &svsk->sk_flags);
 	inet->sk_user_data = svsk;
 	svsk->sk_sock = sock;
 	svsk->sk_sk = inet;
 	svsk->sk_ostate = inet->sk_state_change;
 	svsk->sk_odata = inet->sk_data_ready;
 	svsk->sk_owspace = inet->sk_write_space;
-	svsk->sk_server = serv;
-	atomic_set(&svsk->sk_inuse, 1);
-	svsk->sk_lastrecv = get_seconds();
-	spin_lock_init(&svsk->sk_lock);
-	INIT_LIST_HEAD(&svsk->sk_deferred);
-	INIT_LIST_HEAD(&svsk->sk_ready);
-	mutex_init(&svsk->sk_mutex);
 
 	/* Initialize the socket */
 	if (sock->type == SOCK_DGRAM)
-		svc_udp_init(svsk);
+		svc_udp_init(svsk, serv);
 	else
-		svc_tcp_init(svsk);
-
-	spin_lock_bh(&serv->sv_lock);
-	if (is_temporary) {
-		set_bit(SK_TEMP, &svsk->sk_flags);
-		list_add(&svsk->sk_list, &serv->sv_tempsocks);
-		serv->sv_tmpcnt++;
-		if (serv->sv_temptimer.function == NULL) {
-			/* setup timer to age temp sockets */
-			setup_timer(&serv->sv_temptimer, svc_age_temp_sockets,
-					(unsigned long)serv);
-			mod_timer(&serv->sv_temptimer,
-					jiffies + svc_conn_age_period * HZ);
-		}
-	} else {
-		clear_bit(SK_TEMP, &svsk->sk_flags);
-		list_add(&svsk->sk_list, &serv->sv_permsocks);
-	}
-	spin_unlock_bh(&serv->sv_lock);
+		svc_tcp_init(svsk, serv);
 
 	dprintk("svc: svc_setup_socket created %p (inet %p)\n",
 				svsk, svsk->sk_sk);
@@ -1717,7 +1172,16 @@ int svc_addsock(struct svc_serv *serv,
 	else {
 		svsk = svc_setup_socket(serv, so, &err, SVC_SOCK_DEFAULTS);
 		if (svsk) {
-			svc_sock_received(svsk);
+			struct sockaddr_storage addr;
+			struct sockaddr *sin = (struct sockaddr *)&addr;
+			int salen;
+			if (kernel_getsockname(svsk->sk_sock, sin, &salen) == 0)
+				svc_xprt_set_local(&svsk->sk_xprt, sin, salen);
+			clear_bit(XPT_TEMP, &svsk->sk_xprt.xpt_flags);
+			spin_lock_bh(&serv->sv_lock);
+			list_add(&svsk->sk_xprt.xpt_list, &serv->sv_permsocks);
+			spin_unlock_bh(&serv->sv_lock);
+			svc_xprt_received(&svsk->sk_xprt);
 			err = 0;
 		}
 	}
@@ -1733,14 +1197,19 @@ EXPORT_SYMBOL_GPL(svc_addsock);
 /*
  * Create socket for RPC service.
  */
-static int svc_create_socket(struct svc_serv *serv, int protocol,
-				struct sockaddr *sin, int len, int flags)
+static struct svc_xprt *svc_create_socket(struct svc_serv *serv,
+					  int protocol,
+					  struct sockaddr *sin, int len,
+					  int flags)
 {
 	struct svc_sock	*svsk;
 	struct socket	*sock;
 	int		error;
 	int		type;
 	char		buf[RPC_MAX_ADDRBUFLEN];
+	struct sockaddr_storage addr;
+	struct sockaddr *newsin = (struct sockaddr *)&addr;
+	int		newlen;
 
 	dprintk("svc: svc_create_socket(%s, %d, %s)\n",
 			serv->sv_program->pg_name, protocol,
@@ -1749,13 +1218,13 @@ static int svc_create_socket(struct svc_serv *serv, int protocol,
 	if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) {
 		printk(KERN_WARNING "svc: only UDP and TCP "
 				"sockets supported\n");
-		return -EINVAL;
+		return ERR_PTR(-EINVAL);
 	}
 	type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
 
 	error = sock_create_kern(sin->sa_family, type, protocol, &sock);
 	if (error < 0)
-		return error;
+		return ERR_PTR(error);
 
 	svc_reclassify_socket(sock);
 
@@ -1765,203 +1234,55 @@ static int svc_create_socket(struct svc_serv *serv, int protocol,
 	if (error < 0)
 		goto bummer;
 
+	newlen = len;
+	error = kernel_getsockname(sock, newsin, &newlen);
+	if (error < 0)
+		goto bummer;
+
 	if (protocol == IPPROTO_TCP) {
 		if ((error = kernel_listen(sock, 64)) < 0)
 			goto bummer;
 	}
 
 	if ((svsk = svc_setup_socket(serv, sock, &error, flags)) != NULL) {
-		svc_sock_received(svsk);
-		return ntohs(inet_sk(svsk->sk_sk)->sport);
+		svc_xprt_set_local(&svsk->sk_xprt, newsin, newlen);
+		return (struct svc_xprt *)svsk;
 	}
 
 bummer:
 	dprintk("svc: svc_create_socket error = %d\n", -error);
 	sock_release(sock);
-	return error;
+	return ERR_PTR(error);
 }
 
 /*
- * Remove a dead socket
+ * Detach the svc_sock from the socket so that no
+ * more callbacks occur.
  */
-static void
-svc_delete_socket(struct svc_sock *svsk)
+static void svc_sock_detach(struct svc_xprt *xprt)
 {
-	struct svc_serv	*serv;
-	struct sock	*sk;
-
-	dprintk("svc: svc_delete_socket(%p)\n", svsk);
+	struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+	struct sock *sk = svsk->sk_sk;
 
-	serv = svsk->sk_server;
-	sk = svsk->sk_sk;
+	dprintk("svc: svc_sock_detach(%p)\n", svsk);
 
+	/* put back the old socket callbacks */
 	sk->sk_state_change = svsk->sk_ostate;
 	sk->sk_data_ready = svsk->sk_odata;
 	sk->sk_write_space = svsk->sk_owspace;
-
-	spin_lock_bh(&serv->sv_lock);
-
-	if (!test_and_set_bit(SK_DETACHED, &svsk->sk_flags))
-		list_del_init(&svsk->sk_list);
-	/*
-	 * We used to delete the svc_sock from whichever list
-	 * it's sk_ready node was on, but we don't actually
-	 * need to.  This is because the only time we're called
-	 * while still attached to a queue, the queue itself
-	 * is about to be destroyed (in svc_destroy).
-	 */
-	if (!test_and_set_bit(SK_DEAD, &svsk->sk_flags)) {
-		BUG_ON(atomic_read(&svsk->sk_inuse)<2);
-		atomic_dec(&svsk->sk_inuse);
-		if (test_bit(SK_TEMP, &svsk->sk_flags))
-			serv->sv_tmpcnt--;
-	}
-
-	spin_unlock_bh(&serv->sv_lock);
-}
-
-static void svc_close_socket(struct svc_sock *svsk)
-{
-	set_bit(SK_CLOSE, &svsk->sk_flags);
-	if (test_and_set_bit(SK_BUSY, &svsk->sk_flags))
-		/* someone else will have to effect the close */
-		return;
-
-	atomic_inc(&svsk->sk_inuse);
-	svc_delete_socket(svsk);
-	clear_bit(SK_BUSY, &svsk->sk_flags);
-	svc_sock_put(svsk);
-}
-
-void svc_force_close_socket(struct svc_sock *svsk)
-{
-	set_bit(SK_CLOSE, &svsk->sk_flags);
-	if (test_bit(SK_BUSY, &svsk->sk_flags)) {
-		/* Waiting to be processed, but no threads left,
-		 * So just remove it from the waiting list
-		 */
-		list_del_init(&svsk->sk_ready);
-		clear_bit(SK_BUSY, &svsk->sk_flags);
-	}
-	svc_close_socket(svsk);
-}
-
-/**
- * svc_makesock - Make a socket for nfsd and lockd
- * @serv: RPC server structure
- * @protocol: transport protocol to use
- * @port: port to use
- * @flags: requested socket characteristics
- *
- */
-int svc_makesock(struct svc_serv *serv, int protocol, unsigned short port,
-			int flags)
-{
-	struct sockaddr_in sin = {
-		.sin_family		= AF_INET,
-		.sin_addr.s_addr	= INADDR_ANY,
-		.sin_port		= htons(port),
-	};
-
-	dprintk("svc: creating socket proto = %d\n", protocol);
-	return svc_create_socket(serv, protocol, (struct sockaddr *) &sin,
-							sizeof(sin), flags);
 }
 
 /*
- * Handle defer and revisit of requests
+ * Free the svc_sock's socket resources and the svc_sock itself.
  */
-
-static void svc_revisit(struct cache_deferred_req *dreq, int too_many)
+static void svc_sock_free(struct svc_xprt *xprt)
 {
-	struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle);
-	struct svc_sock *svsk;
+	struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt);
+	dprintk("svc: svc_sock_free(%p)\n", svsk);
 
-	if (too_many) {
-		svc_sock_put(dr->svsk);
-		kfree(dr);
-		return;
-	}
-	dprintk("revisit queued\n");
-	svsk = dr->svsk;
-	dr->svsk = NULL;
-	spin_lock(&svsk->sk_lock);
-	list_add(&dr->handle.recent, &svsk->sk_deferred);
-	spin_unlock(&svsk->sk_lock);
-	set_bit(SK_DEFERRED, &svsk->sk_flags);
-	svc_sock_enqueue(svsk);
-	svc_sock_put(svsk);
-}
-
-static struct cache_deferred_req *
-svc_defer(struct cache_req *req)
-{
-	struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
-	int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len);
-	struct svc_deferred_req *dr;
-
-	if (rqstp->rq_arg.page_len)
-		return NULL; /* if more than a page, give up FIXME */
-	if (rqstp->rq_deferred) {
-		dr = rqstp->rq_deferred;
-		rqstp->rq_deferred = NULL;
-	} else {
-		int skip  = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len;
-		/* FIXME maybe discard if size too large */
-		dr = kmalloc(size, GFP_KERNEL);
-		if (dr == NULL)
-			return NULL;
-
-		dr->handle.owner = rqstp->rq_server;
-		dr->prot = rqstp->rq_prot;
-		memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen);
-		dr->addrlen = rqstp->rq_addrlen;
-		dr->daddr = rqstp->rq_daddr;
-		dr->argslen = rqstp->rq_arg.len >> 2;
-		memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2);
-	}
-	atomic_inc(&rqstp->rq_sock->sk_inuse);
-	dr->svsk = rqstp->rq_sock;
-
-	dr->handle.revisit = svc_revisit;
-	return &dr->handle;
-}
-
-/*
- * recv data from a deferred request into an active one
- */
-static int svc_deferred_recv(struct svc_rqst *rqstp)
-{
-	struct svc_deferred_req *dr = rqstp->rq_deferred;
-
-	rqstp->rq_arg.head[0].iov_base = dr->args;
-	rqstp->rq_arg.head[0].iov_len = dr->argslen<<2;
-	rqstp->rq_arg.page_len = 0;
-	rqstp->rq_arg.len = dr->argslen<<2;
-	rqstp->rq_prot        = dr->prot;
-	memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen);
-	rqstp->rq_addrlen     = dr->addrlen;
-	rqstp->rq_daddr       = dr->daddr;
-	rqstp->rq_respages    = rqstp->rq_pages;
-	return dr->argslen<<2;
-}
-
-
-static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk)
-{
-	struct svc_deferred_req *dr = NULL;
-
-	if (!test_bit(SK_DEFERRED, &svsk->sk_flags))
-		return NULL;
-	spin_lock(&svsk->sk_lock);
-	clear_bit(SK_DEFERRED, &svsk->sk_flags);
-	if (!list_empty(&svsk->sk_deferred)) {
-		dr = list_entry(svsk->sk_deferred.next,
-				struct svc_deferred_req,
-				handle.recent);
-		list_del_init(&dr->handle.recent);
-		set_bit(SK_DEFERRED, &svsk->sk_flags);
-	}
-	spin_unlock(&svsk->sk_lock);
-	return dr;
+	if (svsk->sk_sock->file)
+		sockfd_put(svsk->sk_sock);
+	else
+		sock_release(svsk->sk_sock);
+	kfree(svsk);
 }
diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c
index bada7de0c2fc..0f8c439b848a 100644
--- a/net/sunrpc/sysctl.c
+++ b/net/sunrpc/sysctl.c
@@ -18,6 +18,7 @@
 #include <linux/sunrpc/types.h>
 #include <linux/sunrpc/sched.h>
 #include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/svc_xprt.h>
 
 /*
  * Declare the debug flags here
@@ -55,6 +56,30 @@ rpc_unregister_sysctl(void)
 	}
 }
 
+static int proc_do_xprt(ctl_table *table, int write, struct file *file,
+			void __user *buffer, size_t *lenp, loff_t *ppos)
+{
+	char tmpbuf[256];
+	int len;
+	if ((*ppos && !write) || !*lenp) {
+		*lenp = 0;
+		return 0;
+	}
+	if (write)
+		return -EINVAL;
+	else {
+		len = svc_print_xprts(tmpbuf, sizeof(tmpbuf));
+		if (!access_ok(VERIFY_WRITE, buffer, len))
+			return -EFAULT;
+
+		if (__copy_to_user(buffer, tmpbuf, len))
+			return -EFAULT;
+	}
+	*lenp -= len;
+	*ppos += len;
+	return 0;
+}
+
 static int
 proc_dodebug(ctl_table *table, int write, struct file *file,
 				void __user *buffer, size_t *lenp, loff_t *ppos)
@@ -147,6 +172,12 @@ static ctl_table debug_table[] = {
 		.mode		= 0644,
 		.proc_handler	= &proc_dodebug
 	},
+	{
+		.procname	= "transports",
+		.maxlen		= 256,
+		.mode		= 0444,
+		.proc_handler	= &proc_do_xprt,
+	},
 	{ .ctl_name = 0 }
 };
 
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 54264062ea69..995c3fdc16c2 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -96,11 +96,13 @@ xdr_encode_string(__be32 *p, const char *string)
 EXPORT_SYMBOL(xdr_encode_string);
 
 __be32 *
-xdr_decode_string_inplace(__be32 *p, char **sp, int *lenp, int maxlen)
+xdr_decode_string_inplace(__be32 *p, char **sp,
+			  unsigned int *lenp, unsigned int maxlen)
 {
-	unsigned int	len;
+	u32 len;
 
-	if ((len = ntohl(*p++)) > maxlen)
+	len = ntohl(*p++);
+	if (len > maxlen)
 		return NULL;
 	*lenp = len;
 	*sp = (char *) p;
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 264f0feeb513..5a8f268bdd30 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,3 +1,8 @@
 obj-$(CONFIG_SUNRPC_XPRT_RDMA) += xprtrdma.o
 
 xprtrdma-y := transport.o rpc_rdma.o verbs.o
+
+obj-$(CONFIG_SUNRPC_XPRT_RDMA) += svcrdma.o
+
+svcrdma-y := svc_rdma.o svc_rdma_transport.o \
+	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
new file mode 100644
index 000000000000..88c0ca20bb1e
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -0,0 +1,266 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/sysctl.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/* RPC/RDMA parameters */
+unsigned int svcrdma_ord = RPCRDMA_ORD;
+static unsigned int min_ord = 1;
+static unsigned int max_ord = 4096;
+unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
+static unsigned int min_max_requests = 4;
+static unsigned int max_max_requests = 16384;
+unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
+static unsigned int min_max_inline = 4096;
+static unsigned int max_max_inline = 65536;
+
+atomic_t rdma_stat_recv;
+atomic_t rdma_stat_read;
+atomic_t rdma_stat_write;
+atomic_t rdma_stat_sq_starve;
+atomic_t rdma_stat_rq_starve;
+atomic_t rdma_stat_rq_poll;
+atomic_t rdma_stat_rq_prod;
+atomic_t rdma_stat_sq_poll;
+atomic_t rdma_stat_sq_prod;
+
+/*
+ * This function implements reading and resetting an atomic_t stat
+ * variable through read/write to a proc file. Any write to the file
+ * resets the associated statistic to zero. Any read returns it's
+ * current value.
+ */
+static int read_reset_stat(ctl_table *table, int write,
+			   struct file *filp, void __user *buffer, size_t *lenp,
+			   loff_t *ppos)
+{
+	atomic_t *stat = (atomic_t *)table->data;
+
+	if (!stat)
+		return -EINVAL;
+
+	if (write)
+		atomic_set(stat, 0);
+	else {
+		char str_buf[32];
+		char *data;
+		int len = snprintf(str_buf, 32, "%d\n", atomic_read(stat));
+		if (len >= 32)
+			return -EFAULT;
+		len = strlen(str_buf);
+		if (*ppos > len) {
+			*lenp = 0;
+			return 0;
+		}
+		data = &str_buf[*ppos];
+		len -= *ppos;
+		if (len > *lenp)
+			len = *lenp;
+		if (len && copy_to_user(buffer, str_buf, len))
+			return -EFAULT;
+		*lenp = len;
+		*ppos += len;
+	}
+	return 0;
+}
+
+static struct ctl_table_header *svcrdma_table_header;
+static ctl_table svcrdma_parm_table[] = {
+	{
+		.procname	= "max_requests",
+		.data		= &svcrdma_max_requests,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_max_requests,
+		.extra2		= &max_max_requests
+	},
+	{
+		.procname	= "max_req_size",
+		.data		= &svcrdma_max_req_size,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_max_inline,
+		.extra2		= &max_max_inline
+	},
+	{
+		.procname	= "max_outbound_read_requests",
+		.data		= &svcrdma_ord,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec_minmax,
+		.strategy	= &sysctl_intvec,
+		.extra1		= &min_ord,
+		.extra2		= &max_ord,
+	},
+
+	{
+		.procname	= "rdma_stat_read",
+		.data		= &rdma_stat_read,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_recv",
+		.data		= &rdma_stat_recv,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_write",
+		.data		= &rdma_stat_write,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_sq_starve",
+		.data		= &rdma_stat_sq_starve,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_rq_starve",
+		.data		= &rdma_stat_rq_starve,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_rq_poll",
+		.data		= &rdma_stat_rq_poll,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_rq_prod",
+		.data		= &rdma_stat_rq_prod,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_sq_poll",
+		.data		= &rdma_stat_sq_poll,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.procname	= "rdma_stat_sq_prod",
+		.data		= &rdma_stat_sq_prod,
+		.maxlen		= sizeof(atomic_t),
+		.mode		= 0644,
+		.proc_handler	= &read_reset_stat,
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+static ctl_table svcrdma_table[] = {
+	{
+		.procname	= "svc_rdma",
+		.mode		= 0555,
+		.child		= svcrdma_parm_table
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+static ctl_table svcrdma_root_table[] = {
+	{
+		.ctl_name	= CTL_SUNRPC,
+		.procname	= "sunrpc",
+		.mode		= 0555,
+		.child		= svcrdma_table
+	},
+	{
+		.ctl_name = 0,
+	},
+};
+
+void svc_rdma_cleanup(void)
+{
+	dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
+	if (svcrdma_table_header) {
+		unregister_sysctl_table(svcrdma_table_header);
+		svcrdma_table_header = NULL;
+	}
+	svc_unreg_xprt_class(&svc_rdma_class);
+}
+
+int svc_rdma_init(void)
+{
+	dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
+	dprintk("\tsvcrdma_ord      : %d\n", svcrdma_ord);
+	dprintk("\tmax_requests     : %d\n", svcrdma_max_requests);
+	dprintk("\tsq_depth         : %d\n",
+		svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
+	dprintk("\tmax_inline       : %d\n", svcrdma_max_req_size);
+	if (!svcrdma_table_header)
+		svcrdma_table_header =
+			register_sysctl_table(svcrdma_root_table);
+
+	/* Register RDMA with the SVC transport switch */
+	svc_reg_xprt_class(&svc_rdma_class);
+	return 0;
+}
+MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
+MODULE_DESCRIPTION("SVC RDMA Transport");
+MODULE_LICENSE("Dual BSD/GPL");
+module_init(svc_rdma_init);
+module_exit(svc_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
new file mode 100644
index 000000000000..9530ef2d40dc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -0,0 +1,412 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/debug.h>
+#include <asm/unaligned.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/*
+ * Decodes a read chunk list. The expected format is as follows:
+ *    descrim  : xdr_one
+ *    position : u32 offset into XDR stream
+ *    handle   : u32 RKEY
+ *    . . .
+ *  end-of-list: xdr_zero
+ */
+static u32 *decode_read_list(u32 *va, u32 *vaend)
+{
+	struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk *)va;
+
+	while (ch->rc_discrim != xdr_zero) {
+		u64 ch_offset;
+
+		if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >
+		    (unsigned long)vaend) {
+			dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);
+			return NULL;
+		}
+
+		ch->rc_discrim = ntohl(ch->rc_discrim);
+		ch->rc_position = ntohl(ch->rc_position);
+		ch->rc_target.rs_handle = ntohl(ch->rc_target.rs_handle);
+		ch->rc_target.rs_length = ntohl(ch->rc_target.rs_length);
+		va = (u32 *)&ch->rc_target.rs_offset;
+		xdr_decode_hyper(va, &ch_offset);
+		put_unaligned(ch_offset, (u64 *)va);
+		ch++;
+	}
+	return (u32 *)&ch->rc_position;
+}
+
+/*
+ * Determine number of chunks and total bytes in chunk list. The chunk
+ * list has already been verified to fit within the RPCRDMA header.
+ */
+void svc_rdma_rcl_chunk_counts(struct rpcrdma_read_chunk *ch,
+			       int *ch_count, int *byte_count)
+{
+	/* compute the number of bytes represented by read chunks */
+	*byte_count = 0;
+	*ch_count = 0;
+	for (; ch->rc_discrim != 0; ch++) {
+		*byte_count = *byte_count + ch->rc_target.rs_length;
+		*ch_count = *ch_count + 1;
+	}
+}
+
+/*
+ * Decodes a write chunk list. The expected format is as follows:
+ *    descrim  : xdr_one
+ *    nchunks  : <count>
+ *       handle   : u32 RKEY              ---+
+ *       length   : u32 <len of segment>     |
+ *       offset   : remove va                + <count>
+ *       . . .                               |
+ *                                        ---+
+ */
+static u32 *decode_write_list(u32 *va, u32 *vaend)
+{
+	int ch_no;
+	struct rpcrdma_write_array *ary =
+		(struct rpcrdma_write_array *)va;
+
+	/* Check for not write-array */
+	if (ary->wc_discrim == xdr_zero)
+		return (u32 *)&ary->wc_nchunks;
+
+	if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+		return NULL;
+	}
+	ary->wc_discrim = ntohl(ary->wc_discrim);
+	ary->wc_nchunks = ntohl(ary->wc_nchunks);
+	if (((unsigned long)&ary->wc_array[0] +
+	     (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+			ary, ary->wc_nchunks, vaend);
+		return NULL;
+	}
+	for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+		u64 ch_offset;
+
+		ary->wc_array[ch_no].wc_target.rs_handle =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+		ary->wc_array[ch_no].wc_target.rs_length =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+		va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+		xdr_decode_hyper(va, &ch_offset);
+		put_unaligned(ch_offset, (u64 *)va);
+	}
+
+	/*
+	 * rs_length is the 2nd 4B field in wc_target and taking its
+	 * address skips the list terminator
+	 */
+	return (u32 *)&ary->wc_array[ch_no].wc_target.rs_length;
+}
+
+static u32 *decode_reply_array(u32 *va, u32 *vaend)
+{
+	int ch_no;
+	struct rpcrdma_write_array *ary =
+		(struct rpcrdma_write_array *)va;
+
+	/* Check for no reply-array */
+	if (ary->wc_discrim == xdr_zero)
+		return (u32 *)&ary->wc_nchunks;
+
+	if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+		return NULL;
+	}
+	ary->wc_discrim = ntohl(ary->wc_discrim);
+	ary->wc_nchunks = ntohl(ary->wc_nchunks);
+	if (((unsigned long)&ary->wc_array[0] +
+	     (sizeof(struct rpcrdma_write_chunk) * ary->wc_nchunks)) >
+	    (unsigned long)vaend) {
+		dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+			ary, ary->wc_nchunks, vaend);
+		return NULL;
+	}
+	for (ch_no = 0; ch_no < ary->wc_nchunks; ch_no++) {
+		u64 ch_offset;
+
+		ary->wc_array[ch_no].wc_target.rs_handle =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_handle);
+		ary->wc_array[ch_no].wc_target.rs_length =
+			ntohl(ary->wc_array[ch_no].wc_target.rs_length);
+		va = (u32 *)&ary->wc_array[ch_no].wc_target.rs_offset;
+		xdr_decode_hyper(va, &ch_offset);
+		put_unaligned(ch_offset, (u64 *)va);
+	}
+
+	return (u32 *)&ary->wc_array[ch_no];
+}
+
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
+			    struct svc_rqst *rqstp)
+{
+	struct rpcrdma_msg *rmsgp = NULL;
+	u32 *va;
+	u32 *vaend;
+	u32 hdr_len;
+
+	rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+	/* Verify that there's enough bytes for header + something */
+	if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+		dprintk("svcrdma: header too short = %d\n",
+			rqstp->rq_arg.len);
+		return -EINVAL;
+	}
+
+	/* Decode the header */
+	rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
+	rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
+	rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
+	rmsgp->rm_type = ntohl(rmsgp->rm_type);
+
+	if (rmsgp->rm_vers != RPCRDMA_VERSION)
+		return -ENOSYS;
+
+	/* Pull in the extra for the padded case and bump our pointer */
+	if (rmsgp->rm_type == RDMA_MSGP) {
+		int hdrlen;
+		rmsgp->rm_body.rm_padded.rm_align =
+			ntohl(rmsgp->rm_body.rm_padded.rm_align);
+		rmsgp->rm_body.rm_padded.rm_thresh =
+			ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
+
+		va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+		rqstp->rq_arg.head[0].iov_base = va;
+		hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+		rqstp->rq_arg.head[0].iov_len -= hdrlen;
+		if (hdrlen > rqstp->rq_arg.len)
+			return -EINVAL;
+		return hdrlen;
+	}
+
+	/* The chunk list may contain either a read chunk list or a write
+	 * chunk list and a reply chunk list.
+	 */
+	va = &rmsgp->rm_body.rm_chunks[0];
+	vaend = (u32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
+	va = decode_read_list(va, vaend);
+	if (!va)
+		return -EINVAL;
+	va = decode_write_list(va, vaend);
+	if (!va)
+		return -EINVAL;
+	va = decode_reply_array(va, vaend);
+	if (!va)
+		return -EINVAL;
+
+	rqstp->rq_arg.head[0].iov_base = va;
+	hdr_len = (unsigned long)va - (unsigned long)rmsgp;
+	rqstp->rq_arg.head[0].iov_len -= hdr_len;
+
+	*rdma_req = rmsgp;
+	return hdr_len;
+}
+
+int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
+{
+	struct rpcrdma_msg *rmsgp = NULL;
+	struct rpcrdma_read_chunk *ch;
+	struct rpcrdma_write_array *ary;
+	u32 *va;
+	u32 hdrlen;
+
+	dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
+		rqstp);
+	rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+	/* Pull in the extra for the padded case and bump our pointer */
+	if (rmsgp->rm_type == RDMA_MSGP) {
+		va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+		rqstp->rq_arg.head[0].iov_base = va;
+		hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+		rqstp->rq_arg.head[0].iov_len -= hdrlen;
+		return hdrlen;
+	}
+
+	/*
+	 * Skip all chunks to find RPC msg. These were previously processed
+	 */
+	va = &rmsgp->rm_body.rm_chunks[0];
+
+	/* Skip read-list */
+	for (ch = (struct rpcrdma_read_chunk *)va;
+	     ch->rc_discrim != xdr_zero; ch++);
+	va = (u32 *)&ch->rc_position;
+
+	/* Skip write-list */
+	ary = (struct rpcrdma_write_array *)va;
+	if (ary->wc_discrim == xdr_zero)
+		va = (u32 *)&ary->wc_nchunks;
+	else
+		/*
+		 * rs_length is the 2nd 4B field in wc_target and taking its
+		 * address skips the list terminator
+		 */
+		va = (u32 *)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
+
+	/* Skip reply-array */
+	ary = (struct rpcrdma_write_array *)va;
+	if (ary->wc_discrim == xdr_zero)
+		va = (u32 *)&ary->wc_nchunks;
+	else
+		va = (u32 *)&ary->wc_array[ary->wc_nchunks];
+
+	rqstp->rq_arg.head[0].iov_base = va;
+	hdrlen = (unsigned long)va - (unsigned long)rmsgp;
+	rqstp->rq_arg.head[0].iov_len -= hdrlen;
+
+	return hdrlen;
+}
+
+int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
+			      struct rpcrdma_msg *rmsgp,
+			      enum rpcrdma_errcode err, u32 *va)
+{
+	u32 *startp = va;
+
+	*va++ = htonl(rmsgp->rm_xid);
+	*va++ = htonl(rmsgp->rm_vers);
+	*va++ = htonl(xprt->sc_max_requests);
+	*va++ = htonl(RDMA_ERROR);
+	*va++ = htonl(err);
+	if (err == ERR_VERS) {
+		*va++ = htonl(RPCRDMA_VERSION);
+		*va++ = htonl(RPCRDMA_VERSION);
+	}
+
+	return (int)((unsigned long)va - (unsigned long)startp);
+}
+
+int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
+{
+	struct rpcrdma_write_array *wr_ary;
+
+	/* There is no read-list in a reply */
+
+	/* skip write list */
+	wr_ary = (struct rpcrdma_write_array *)
+		&rmsgp->rm_body.rm_chunks[1];
+	if (wr_ary->wc_discrim)
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
+			wc_target.rs_length;
+	else
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_nchunks;
+
+	/* skip reply array */
+	if (wr_ary->wc_discrim)
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
+	else
+		wr_ary = (struct rpcrdma_write_array *)
+			&wr_ary->wc_nchunks;
+
+	return (unsigned long) wr_ary - (unsigned long) rmsgp;
+}
+
+void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
+{
+	struct rpcrdma_write_array *ary;
+
+	/* no read-list */
+	rmsgp->rm_body.rm_chunks[0] = xdr_zero;
+
+	/* write-array discrim */
+	ary = (struct rpcrdma_write_array *)
+		&rmsgp->rm_body.rm_chunks[1];
+	ary->wc_discrim = xdr_one;
+	ary->wc_nchunks = htonl(chunks);
+
+	/* write-list terminator */
+	ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
+
+	/* reply-array discriminator */
+	ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
+}
+
+void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
+				 int chunks)
+{
+	ary->wc_discrim = xdr_one;
+	ary->wc_nchunks = htonl(chunks);
+}
+
+void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
+				     int chunk_no,
+				     u32 rs_handle, u64 rs_offset,
+				     u32 write_len)
+{
+	struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
+	seg->rs_handle = htonl(rs_handle);
+	seg->rs_length = htonl(write_len);
+	xdr_encode_hyper((u32 *) &seg->rs_offset, rs_offset);
+}
+
+void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
+				  struct rpcrdma_msg *rdma_argp,
+				  struct rpcrdma_msg *rdma_resp,
+				  enum rpcrdma_proc rdma_type)
+{
+	rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
+	rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
+	rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
+	rdma_resp->rm_type = htonl(rdma_type);
+
+	/* Encode <nul> chunks lists */
+	rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
+	rdma_resp->rm_body.rm_chunks[1] = xdr_zero;
+	rdma_resp->rm_body.rm_chunks[2] = xdr_zero;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
new file mode 100644
index 000000000000..ab54a736486e
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -0,0 +1,586 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/*
+ * Replace the pages in the rq_argpages array with the pages from the SGE in
+ * the RDMA_RECV completion. The SGL should contain full pages up until the
+ * last one.
+ */
+static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
+			       struct svc_rdma_op_ctxt *ctxt,
+			       u32 byte_count)
+{
+	struct page *page;
+	u32 bc;
+	int sge_no;
+
+	/* Swap the page in the SGE with the page in argpages */
+	page = ctxt->pages[0];
+	put_page(rqstp->rq_pages[0]);
+	rqstp->rq_pages[0] = page;
+
+	/* Set up the XDR head */
+	rqstp->rq_arg.head[0].iov_base = page_address(page);
+	rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
+	rqstp->rq_arg.len = byte_count;
+	rqstp->rq_arg.buflen = byte_count;
+
+	/* Compute bytes past head in the SGL */
+	bc = byte_count - rqstp->rq_arg.head[0].iov_len;
+
+	/* If data remains, store it in the pagelist */
+	rqstp->rq_arg.page_len = bc;
+	rqstp->rq_arg.page_base = 0;
+	rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+	sge_no = 1;
+	while (bc && sge_no < ctxt->count) {
+		page = ctxt->pages[sge_no];
+		put_page(rqstp->rq_pages[sge_no]);
+		rqstp->rq_pages[sge_no] = page;
+		bc -= min(bc, ctxt->sge[sge_no].length);
+		rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
+		sge_no++;
+	}
+	rqstp->rq_respages = &rqstp->rq_pages[sge_no];
+
+	/* We should never run out of SGE because the limit is defined to
+	 * support the max allowed RPC data length
+	 */
+	BUG_ON(bc && (sge_no == ctxt->count));
+	BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
+	       != byte_count);
+	BUG_ON(rqstp->rq_arg.len != byte_count);
+
+	/* If not all pages were used from the SGL, free the remaining ones */
+	bc = sge_no;
+	while (sge_no < ctxt->count) {
+		page = ctxt->pages[sge_no++];
+		put_page(page);
+	}
+	ctxt->count = bc;
+
+	/* Set up tail */
+	rqstp->rq_arg.tail[0].iov_base = NULL;
+	rqstp->rq_arg.tail[0].iov_len = 0;
+}
+
+struct chunk_sge {
+	int start;		/* sge no for this chunk */
+	int count;		/* sge count for this chunk */
+};
+
+/* Encode a read-chunk-list as an array of IB SGE
+ *
+ * Assumptions:
+ * - chunk[0]->position points to pages[0] at an offset of 0
+ * - pages[] is not physically or virtually contigous and consists of
+ *   PAGE_SIZE elements.
+ *
+ * Output:
+ * - sge array pointing into pages[] array.
+ * - chunk_sge array specifying sge index and count for each
+ *   chunk in the read list
+ *
+ */
+static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
+			   struct svc_rqst *rqstp,
+			   struct svc_rdma_op_ctxt *head,
+			   struct rpcrdma_msg *rmsgp,
+			   struct ib_sge *sge,
+			   struct chunk_sge *ch_sge_ary,
+			   int ch_count,
+			   int byte_count)
+{
+	int sge_no;
+	int sge_bytes;
+	int page_off;
+	int page_no;
+	int ch_bytes;
+	int ch_no;
+	struct rpcrdma_read_chunk *ch;
+
+	sge_no = 0;
+	page_no = 0;
+	page_off = 0;
+	ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+	ch_no = 0;
+	ch_bytes = ch->rc_target.rs_length;
+	head->arg.head[0] = rqstp->rq_arg.head[0];
+	head->arg.tail[0] = rqstp->rq_arg.tail[0];
+	head->arg.pages = &head->pages[head->count];
+	head->sge[0].length = head->count; /* save count of hdr pages */
+	head->arg.page_base = 0;
+	head->arg.page_len = ch_bytes;
+	head->arg.len = rqstp->rq_arg.len + ch_bytes;
+	head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
+	head->count++;
+	ch_sge_ary[0].start = 0;
+	while (byte_count) {
+		sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
+		sge[sge_no].addr =
+			ib_dma_map_page(xprt->sc_cm_id->device,
+					rqstp->rq_arg.pages[page_no],
+					page_off, sge_bytes,
+					DMA_FROM_DEVICE);
+		sge[sge_no].length = sge_bytes;
+		sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+		/*
+		 * Don't bump head->count here because the same page
+		 * may be used by multiple SGE.
+		 */
+		head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
+		rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+
+		byte_count -= sge_bytes;
+		ch_bytes -= sge_bytes;
+		sge_no++;
+		/*
+		 * If all bytes for this chunk have been mapped to an
+		 * SGE, move to the next SGE
+		 */
+		if (ch_bytes == 0) {
+			ch_sge_ary[ch_no].count =
+				sge_no - ch_sge_ary[ch_no].start;
+			ch_no++;
+			ch++;
+			ch_sge_ary[ch_no].start = sge_no;
+			ch_bytes = ch->rc_target.rs_length;
+			/* If bytes remaining account for next chunk */
+			if (byte_count) {
+				head->arg.page_len += ch_bytes;
+				head->arg.len += ch_bytes;
+				head->arg.buflen += ch_bytes;
+			}
+		}
+		/*
+		 * If this SGE consumed all of the page, move to the
+		 * next page
+		 */
+		if ((sge_bytes + page_off) == PAGE_SIZE) {
+			page_no++;
+			page_off = 0;
+			/*
+			 * If there are still bytes left to map, bump
+			 * the page count
+			 */
+			if (byte_count)
+				head->count++;
+		} else
+			page_off += sge_bytes;
+	}
+	BUG_ON(byte_count != 0);
+	return sge_no;
+}
+
+static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt,
+			      struct ib_sge *sge,
+			      u64 *sgl_offset,
+			      int count)
+{
+	int i;
+
+	ctxt->count = count;
+	for (i = 0; i < count; i++) {
+		ctxt->sge[i].addr = sge[i].addr;
+		ctxt->sge[i].length = sge[i].length;
+		*sgl_offset = *sgl_offset + sge[i].length;
+	}
+}
+
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
+{
+#ifdef RDMA_TRANSPORT_IWARP
+	if ((RDMA_TRANSPORT_IWARP ==
+	     rdma_node_get_transport(xprt->sc_cm_id->
+				     device->node_type))
+	    && sge_count > 1)
+		return 1;
+	else
+#endif
+		return min_t(int, sge_count, xprt->sc_max_sge);
+}
+
+/*
+ * Use RDMA_READ to read data from the advertised client buffer into the
+ * XDR stream starting at rq_arg.head[0].iov_base.
+ * Each chunk in the array
+ * contains the following fields:
+ * discrim      - '1', This isn't used for data placement
+ * position     - The xdr stream offset (the same for every chunk)
+ * handle       - RMR for client memory region
+ * length       - data transfer length
+ * offset       - 64 bit tagged offset in remote memory region
+ *
+ * On our side, we need to read into a pagelist. The first page immediately
+ * follows the RPC header.
+ *
+ * This function returns 1 to indicate success. The data is not yet in
+ * the pagelist and therefore the RPC request must be deferred. The
+ * I/O completion will enqueue the transport again and
+ * svc_rdma_recvfrom will complete the request.
+ *
+ * NOTE: The ctxt must not be touched after the last WR has been posted
+ * because the I/O completion processing may occur on another
+ * processor and free / modify the context. Ne touche pas!
+ */
+static int rdma_read_xdr(struct svcxprt_rdma *xprt,
+			 struct rpcrdma_msg *rmsgp,
+			 struct svc_rqst *rqstp,
+			 struct svc_rdma_op_ctxt *hdr_ctxt)
+{
+	struct ib_send_wr read_wr;
+	int err = 0;
+	int ch_no;
+	struct ib_sge *sge;
+	int ch_count;
+	int byte_count;
+	int sge_count;
+	u64 sgl_offset;
+	struct rpcrdma_read_chunk *ch;
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+	struct svc_rdma_op_ctxt *head;
+	struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+	struct svc_rdma_op_ctxt *tmp_ch_ctxt;
+	struct chunk_sge *ch_sge_ary;
+
+	/* If no read list is present, return 0 */
+	ch = svc_rdma_get_read_chunk(rmsgp);
+	if (!ch)
+		return 0;
+
+	/* Allocate temporary contexts to keep SGE */
+	BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge));
+	tmp_sge_ctxt = svc_rdma_get_context(xprt);
+	sge = tmp_sge_ctxt->sge;
+	tmp_ch_ctxt = svc_rdma_get_context(xprt);
+	ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
+
+	svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
+	sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
+				    sge, ch_sge_ary,
+				    ch_count, byte_count);
+	head = svc_rdma_get_context(xprt);
+	sgl_offset = 0;
+	ch_no = 0;
+
+	for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+	     ch->rc_discrim != 0; ch++, ch_no++) {
+next_sge:
+		if (!ctxt)
+			ctxt = head;
+		else {
+			ctxt->next = svc_rdma_get_context(xprt);
+			ctxt = ctxt->next;
+		}
+		ctxt->next = NULL;
+		ctxt->direction = DMA_FROM_DEVICE;
+		clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+		if ((ch+1)->rc_discrim == 0) {
+			/*
+			 * Checked in sq_cq_reap to see if we need to
+			 * be enqueued
+			 */
+			set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+			ctxt->next = hdr_ctxt;
+			hdr_ctxt->next = head;
+		}
+
+		/* Prepare READ WR */
+		memset(&read_wr, 0, sizeof read_wr);
+		ctxt->wr_op = IB_WR_RDMA_READ;
+		read_wr.wr_id = (unsigned long)ctxt;
+		read_wr.opcode = IB_WR_RDMA_READ;
+		read_wr.send_flags = IB_SEND_SIGNALED;
+		read_wr.wr.rdma.rkey = ch->rc_target.rs_handle;
+		read_wr.wr.rdma.remote_addr =
+			get_unaligned(&(ch->rc_target.rs_offset)) +
+			sgl_offset;
+		read_wr.sg_list = &sge[ch_sge_ary[ch_no].start];
+		read_wr.num_sge =
+			rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count);
+		rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start],
+				  &sgl_offset,
+				  read_wr.num_sge);
+
+		/* Post the read */
+		err = svc_rdma_send(xprt, &read_wr);
+		if (err) {
+			printk(KERN_ERR "svcrdma: Error posting send = %d\n",
+			       err);
+			/*
+			 * Break the circular list so free knows when
+			 * to stop if the error happened to occur on
+			 * the last read
+			 */
+			ctxt->next = NULL;
+			goto out;
+		}
+		atomic_inc(&rdma_stat_read);
+
+		if (read_wr.num_sge < ch_sge_ary[ch_no].count) {
+			ch_sge_ary[ch_no].count -= read_wr.num_sge;
+			ch_sge_ary[ch_no].start += read_wr.num_sge;
+			goto next_sge;
+		}
+		sgl_offset = 0;
+		err = 0;
+	}
+
+ out:
+	svc_rdma_put_context(tmp_sge_ctxt, 0);
+	svc_rdma_put_context(tmp_ch_ctxt, 0);
+
+	/* Detach arg pages. svc_recv will replenish them */
+	for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
+		rqstp->rq_pages[ch_no] = NULL;
+
+	/*
+	 * Detach res pages. svc_release must see a resused count of
+	 * zero or it will attempt to put them.
+	 */
+	while (rqstp->rq_resused)
+		rqstp->rq_respages[--rqstp->rq_resused] = NULL;
+
+	if (err) {
+		printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
+		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+		/* Free the linked list of read contexts */
+		while (head != NULL) {
+			ctxt = head->next;
+			svc_rdma_put_context(head, 1);
+			head = ctxt;
+		}
+		return 0;
+	}
+
+	return 1;
+}
+
+static int rdma_read_complete(struct svc_rqst *rqstp,
+			      struct svc_rdma_op_ctxt *data)
+{
+	struct svc_rdma_op_ctxt *head = data->next;
+	int page_no;
+	int ret;
+
+	BUG_ON(!head);
+
+	/* Copy RPC pages */
+	for (page_no = 0; page_no < head->count; page_no++) {
+		put_page(rqstp->rq_pages[page_no]);
+		rqstp->rq_pages[page_no] = head->pages[page_no];
+	}
+	/* Point rq_arg.pages past header */
+	rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length];
+	rqstp->rq_arg.page_len = head->arg.page_len;
+	rqstp->rq_arg.page_base = head->arg.page_base;
+
+	/* rq_respages starts after the last arg page */
+	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
+	rqstp->rq_resused = 0;
+
+	/* Rebuild rq_arg head and tail. */
+	rqstp->rq_arg.head[0] = head->arg.head[0];
+	rqstp->rq_arg.tail[0] = head->arg.tail[0];
+	rqstp->rq_arg.len = head->arg.len;
+	rqstp->rq_arg.buflen = head->arg.buflen;
+
+	/* XXX: What should this be? */
+	rqstp->rq_prot = IPPROTO_MAX;
+
+	/*
+	 * Free the contexts we used to build the RDMA_READ. We have
+	 * to be careful here because the context list uses the same
+	 * next pointer used to chain the contexts associated with the
+	 * RDMA_READ
+	 */
+	data->next = NULL;	/* terminate circular list */
+	do {
+		data = head->next;
+		svc_rdma_put_context(head, 0);
+		head = data;
+	} while (head != NULL);
+
+	ret = rqstp->rq_arg.head[0].iov_len
+		+ rqstp->rq_arg.page_len
+		+ rqstp->rq_arg.tail[0].iov_len;
+	dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
+		"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+		ret, rqstp->rq_arg.len,	rqstp->rq_arg.head[0].iov_base,
+		rqstp->rq_arg.head[0].iov_len);
+
+	/* Indicate that we've consumed an RQ credit */
+	rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+	svc_xprt_received(rqstp->rq_xprt);
+	return ret;
+}
+
+/*
+ * Set up the rqstp thread context to point to the RQ buffer. If
+ * necessary, pull additional data from the client with an RDMA_READ
+ * request.
+ */
+int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+{
+	struct svc_xprt *xprt = rqstp->rq_xprt;
+	struct svcxprt_rdma *rdma_xprt =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+	struct rpcrdma_msg *rmsgp;
+	int ret = 0;
+	int len;
+
+	dprintk("svcrdma: rqstp=%p\n", rqstp);
+
+	/*
+	 * The rq_xprt_ctxt indicates if we've consumed an RQ credit
+	 * or not. It is used in the rdma xpo_release_rqst function to
+	 * determine whether or not to return an RQ WQE to the RQ.
+	 */
+	rqstp->rq_xprt_ctxt = NULL;
+
+	spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
+	if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
+		ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
+				  struct svc_rdma_op_ctxt,
+				  dto_q);
+		list_del_init(&ctxt->dto_q);
+	}
+	spin_unlock_bh(&rdma_xprt->sc_read_complete_lock);
+	if (ctxt)
+		return rdma_read_complete(rqstp, ctxt);
+
+	spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
+	if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+		ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
+				  struct svc_rdma_op_ctxt,
+				  dto_q);
+		list_del_init(&ctxt->dto_q);
+	} else {
+		atomic_inc(&rdma_stat_rq_starve);
+		clear_bit(XPT_DATA, &xprt->xpt_flags);
+		ctxt = NULL;
+	}
+	spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+	if (!ctxt) {
+		/* This is the EAGAIN path. The svc_recv routine will
+		 * return -EAGAIN, the nfsd thread will go to call into
+		 * svc_recv again and we shouldn't be on the active
+		 * transport list
+		 */
+		if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
+			goto close_out;
+
+		BUG_ON(ret);
+		goto out;
+	}
+	dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
+		ctxt, rdma_xprt, rqstp, ctxt->wc_status);
+	BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
+	atomic_inc(&rdma_stat_recv);
+
+	/* Build up the XDR from the receive buffers. */
+	rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
+
+	/* Decode the RDMA header. */
+	len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+	rqstp->rq_xprt_hlen = len;
+
+	/* If the request is invalid, reply with an error */
+	if (len < 0) {
+		if (len == -ENOSYS)
+			(void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
+		goto close_out;
+	}
+
+	/* Read read-list data. If we would need to wait, defer
+	 * it. Not that in this case, we don't return the RQ credit
+	 * until after the read completes.
+	 */
+	if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
+		svc_xprt_received(xprt);
+		return 0;
+	}
+
+	/* Indicate we've consumed an RQ credit */
+	rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+
+	ret = rqstp->rq_arg.head[0].iov_len
+		+ rqstp->rq_arg.page_len
+		+ rqstp->rq_arg.tail[0].iov_len;
+	svc_rdma_put_context(ctxt, 0);
+ out:
+	dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
+		"rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
+		ret, rqstp->rq_arg.len,
+		rqstp->rq_arg.head[0].iov_base,
+		rqstp->rq_arg.head[0].iov_len);
+	rqstp->rq_prot = IPPROTO_MAX;
+	svc_xprt_copy_addrs(rqstp, xprt);
+	svc_xprt_received(xprt);
+	return ret;
+
+ close_out:
+	if (ctxt) {
+		svc_rdma_put_context(ctxt, 1);
+		/* Indicate we've consumed an RQ credit */
+		rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+	}
+	dprintk("svcrdma: transport %p is closing\n", xprt);
+	/*
+	 * Set the close bit and enqueue it. svc_recv will see the
+	 * close bit and call svc_xprt_delete
+	 */
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+	svc_xprt_received(xprt);
+	return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
new file mode 100644
index 000000000000..3e321949e1dc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -0,0 +1,520 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/* Encode an XDR as an array of IB SGE
+ *
+ * Assumptions:
+ * - head[0] is physically contiguous.
+ * - tail[0] is physically contiguous.
+ * - pages[] is not physically or virtually contigous and consists of
+ *   PAGE_SIZE elements.
+ *
+ * Output:
+ * SGE[0]              reserved for RCPRDMA header
+ * SGE[1]              data from xdr->head[]
+ * SGE[2..sge_count-2] data from xdr->pages[]
+ * SGE[sge_count-1]    data from xdr->tail.
+ *
+ */
+static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt,
+				 struct xdr_buf *xdr,
+				 struct ib_sge *sge,
+				 int *sge_count)
+{
+	/* Max we need is the length of the XDR / pagesize + one for
+	 * head + one for tail + one for RPCRDMA header
+	 */
+	int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
+	int sge_no;
+	u32 byte_count = xdr->len;
+	u32 sge_bytes;
+	u32 page_bytes;
+	int page_off;
+	int page_no;
+
+	/* Skip the first sge, this is for the RPCRDMA header */
+	sge_no = 1;
+
+	/* Head SGE */
+	sge[sge_no].addr = ib_dma_map_single(xprt->sc_cm_id->device,
+					     xdr->head[0].iov_base,
+					     xdr->head[0].iov_len,
+					     DMA_TO_DEVICE);
+	sge_bytes = min_t(u32, byte_count, xdr->head[0].iov_len);
+	byte_count -= sge_bytes;
+	sge[sge_no].length = sge_bytes;
+	sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+	sge_no++;
+
+	/* pages SGE */
+	page_no = 0;
+	page_bytes = xdr->page_len;
+	page_off = xdr->page_base;
+	while (byte_count && page_bytes) {
+		sge_bytes = min_t(u32, byte_count, (PAGE_SIZE-page_off));
+		sge[sge_no].addr =
+			ib_dma_map_page(xprt->sc_cm_id->device,
+					xdr->pages[page_no], page_off,
+					sge_bytes, DMA_TO_DEVICE);
+		sge_bytes = min(sge_bytes, page_bytes);
+		byte_count -= sge_bytes;
+		page_bytes -= sge_bytes;
+		sge[sge_no].length = sge_bytes;
+		sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+
+		sge_no++;
+		page_no++;
+		page_off = 0; /* reset for next time through loop */
+	}
+
+	/* Tail SGE */
+	if (byte_count && xdr->tail[0].iov_len) {
+		sge[sge_no].addr =
+			ib_dma_map_single(xprt->sc_cm_id->device,
+					  xdr->tail[0].iov_base,
+					  xdr->tail[0].iov_len,
+					  DMA_TO_DEVICE);
+		sge_bytes = min_t(u32, byte_count, xdr->tail[0].iov_len);
+		byte_count -= sge_bytes;
+		sge[sge_no].length = sge_bytes;
+		sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+		sge_no++;
+	}
+
+	BUG_ON(sge_no > sge_max);
+	BUG_ON(byte_count != 0);
+
+	*sge_count = sge_no;
+	return sge;
+}
+
+
+/* Assumptions:
+ * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
+ */
+static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
+		      u32 rmr, u64 to,
+		      u32 xdr_off, int write_len,
+		      struct ib_sge *xdr_sge, int sge_count)
+{
+	struct svc_rdma_op_ctxt *tmp_sge_ctxt;
+	struct ib_send_wr write_wr;
+	struct ib_sge *sge;
+	int xdr_sge_no;
+	int sge_no;
+	int sge_bytes;
+	int sge_off;
+	int bc;
+	struct svc_rdma_op_ctxt *ctxt;
+	int ret = 0;
+
+	BUG_ON(sge_count >= 32);
+	dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
+		"write_len=%d, xdr_sge=%p, sge_count=%d\n",
+		rmr, to, xdr_off, write_len, xdr_sge, sge_count);
+
+	ctxt = svc_rdma_get_context(xprt);
+	ctxt->count = 0;
+	tmp_sge_ctxt = svc_rdma_get_context(xprt);
+	sge = tmp_sge_ctxt->sge;
+
+	/* Find the SGE associated with xdr_off */
+	for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < sge_count;
+	     xdr_sge_no++) {
+		if (xdr_sge[xdr_sge_no].length > bc)
+			break;
+		bc -= xdr_sge[xdr_sge_no].length;
+	}
+
+	sge_off = bc;
+	bc = write_len;
+	sge_no = 0;
+
+	/* Copy the remaining SGE */
+	while (bc != 0 && xdr_sge_no < sge_count) {
+		sge[sge_no].addr = xdr_sge[xdr_sge_no].addr + sge_off;
+		sge[sge_no].lkey = xdr_sge[xdr_sge_no].lkey;
+		sge_bytes = min((size_t)bc,
+				(size_t)(xdr_sge[xdr_sge_no].length-sge_off));
+		sge[sge_no].length = sge_bytes;
+
+		sge_off = 0;
+		sge_no++;
+		xdr_sge_no++;
+		bc -= sge_bytes;
+	}
+
+	BUG_ON(bc != 0);
+	BUG_ON(xdr_sge_no > sge_count);
+
+	/* Prepare WRITE WR */
+	memset(&write_wr, 0, sizeof write_wr);
+	ctxt->wr_op = IB_WR_RDMA_WRITE;
+	write_wr.wr_id = (unsigned long)ctxt;
+	write_wr.sg_list = &sge[0];
+	write_wr.num_sge = sge_no;
+	write_wr.opcode = IB_WR_RDMA_WRITE;
+	write_wr.send_flags = IB_SEND_SIGNALED;
+	write_wr.wr.rdma.rkey = rmr;
+	write_wr.wr.rdma.remote_addr = to;
+
+	/* Post It */
+	atomic_inc(&rdma_stat_write);
+	if (svc_rdma_send(xprt, &write_wr)) {
+		svc_rdma_put_context(ctxt, 1);
+		/* Fatal error, close transport */
+		ret = -EIO;
+	}
+	svc_rdma_put_context(tmp_sge_ctxt, 0);
+	return ret;
+}
+
+static int send_write_chunks(struct svcxprt_rdma *xprt,
+			     struct rpcrdma_msg *rdma_argp,
+			     struct rpcrdma_msg *rdma_resp,
+			     struct svc_rqst *rqstp,
+			     struct ib_sge *sge,
+			     int sge_count)
+{
+	u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+	int write_len;
+	int max_write;
+	u32 xdr_off;
+	int chunk_off;
+	int chunk_no;
+	struct rpcrdma_write_array *arg_ary;
+	struct rpcrdma_write_array *res_ary;
+	int ret;
+
+	arg_ary = svc_rdma_get_write_array(rdma_argp);
+	if (!arg_ary)
+		return 0;
+	res_ary = (struct rpcrdma_write_array *)
+		&rdma_resp->rm_body.rm_chunks[1];
+
+	max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+	/* Write chunks start at the pagelist */
+	for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
+	     xfer_len && chunk_no < arg_ary->wc_nchunks;
+	     chunk_no++) {
+		struct rpcrdma_segment *arg_ch;
+		u64 rs_offset;
+
+		arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+		write_len = min(xfer_len, arg_ch->rs_length);
+
+		/* Prepare the response chunk given the length actually
+		 * written */
+		rs_offset = get_unaligned(&(arg_ch->rs_offset));
+		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+					    arg_ch->rs_handle,
+					    rs_offset,
+					    write_len);
+		chunk_off = 0;
+		while (write_len) {
+			int this_write;
+			this_write = min(write_len, max_write);
+			ret = send_write(xprt, rqstp,
+					 arg_ch->rs_handle,
+					 rs_offset + chunk_off,
+					 xdr_off,
+					 this_write,
+					 sge,
+					 sge_count);
+			if (ret) {
+				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+					ret);
+				return -EIO;
+			}
+			chunk_off += this_write;
+			xdr_off += this_write;
+			xfer_len -= this_write;
+			write_len -= this_write;
+		}
+	}
+	/* Update the req with the number of chunks actually used */
+	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
+
+	return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+}
+
+static int send_reply_chunks(struct svcxprt_rdma *xprt,
+			     struct rpcrdma_msg *rdma_argp,
+			     struct rpcrdma_msg *rdma_resp,
+			     struct svc_rqst *rqstp,
+			     struct ib_sge *sge,
+			     int sge_count)
+{
+	u32 xfer_len = rqstp->rq_res.len;
+	int write_len;
+	int max_write;
+	u32 xdr_off;
+	int chunk_no;
+	int chunk_off;
+	struct rpcrdma_segment *ch;
+	struct rpcrdma_write_array *arg_ary;
+	struct rpcrdma_write_array *res_ary;
+	int ret;
+
+	arg_ary = svc_rdma_get_reply_array(rdma_argp);
+	if (!arg_ary)
+		return 0;
+	/* XXX: need to fix when reply lists occur with read-list and or
+	 * write-list */
+	res_ary = (struct rpcrdma_write_array *)
+		&rdma_resp->rm_body.rm_chunks[2];
+
+	max_write = xprt->sc_max_sge * PAGE_SIZE;
+
+	/* xdr offset starts at RPC message */
+	for (xdr_off = 0, chunk_no = 0;
+	     xfer_len && chunk_no < arg_ary->wc_nchunks;
+	     chunk_no++) {
+		u64 rs_offset;
+		ch = &arg_ary->wc_array[chunk_no].wc_target;
+		write_len = min(xfer_len, ch->rs_length);
+
+
+		/* Prepare the reply chunk given the length actually
+		 * written */
+		rs_offset = get_unaligned(&(ch->rs_offset));
+		svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+					    ch->rs_handle, rs_offset,
+					    write_len);
+		chunk_off = 0;
+		while (write_len) {
+			int this_write;
+
+			this_write = min(write_len, max_write);
+			ret = send_write(xprt, rqstp,
+					 ch->rs_handle,
+					 rs_offset + chunk_off,
+					 xdr_off,
+					 this_write,
+					 sge,
+					 sge_count);
+			if (ret) {
+				dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+					ret);
+				return -EIO;
+			}
+			chunk_off += this_write;
+			xdr_off += this_write;
+			xfer_len -= this_write;
+			write_len -= this_write;
+		}
+	}
+	/* Update the req with the number of chunks actually used */
+	svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+
+	return rqstp->rq_res.len;
+}
+
+/* This function prepares the portion of the RPCRDMA message to be
+ * sent in the RDMA_SEND. This function is called after data sent via
+ * RDMA has already been transmitted. There are three cases:
+ * - The RPCRDMA header, RPC header, and payload are all sent in a
+ *   single RDMA_SEND. This is the "inline" case.
+ * - The RPCRDMA header and some portion of the RPC header and data
+ *   are sent via this RDMA_SEND and another portion of the data is
+ *   sent via RDMA.
+ * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
+ *   header and data are all transmitted via RDMA.
+ * In all three cases, this function prepares the RPCRDMA header in
+ * sge[0], the 'type' parameter indicates the type to place in the
+ * RPCRDMA header, and the 'byte_count' field indicates how much of
+ * the XDR to include in this RDMA_SEND.
+ */
+static int send_reply(struct svcxprt_rdma *rdma,
+		      struct svc_rqst *rqstp,
+		      struct page *page,
+		      struct rpcrdma_msg *rdma_resp,
+		      struct svc_rdma_op_ctxt *ctxt,
+		      int sge_count,
+		      int byte_count)
+{
+	struct ib_send_wr send_wr;
+	int sge_no;
+	int sge_bytes;
+	int page_no;
+	int ret;
+
+	/* Prepare the context */
+	ctxt->pages[0] = page;
+	ctxt->count = 1;
+
+	/* Prepare the SGE for the RPCRDMA Header */
+	ctxt->sge[0].addr =
+		ib_dma_map_page(rdma->sc_cm_id->device,
+				page, 0, PAGE_SIZE, DMA_TO_DEVICE);
+	ctxt->direction = DMA_TO_DEVICE;
+	ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
+	ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
+
+	/* Determine how many of our SGE are to be transmitted */
+	for (sge_no = 1; byte_count && sge_no < sge_count; sge_no++) {
+		sge_bytes = min((size_t)ctxt->sge[sge_no].length,
+				(size_t)byte_count);
+		byte_count -= sge_bytes;
+	}
+	BUG_ON(byte_count != 0);
+
+	/* Save all respages in the ctxt and remove them from the
+	 * respages array. They are our pages until the I/O
+	 * completes.
+	 */
+	for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
+		ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
+		ctxt->count++;
+		rqstp->rq_respages[page_no] = NULL;
+	}
+
+	BUG_ON(sge_no > rdma->sc_max_sge);
+	memset(&send_wr, 0, sizeof send_wr);
+	ctxt->wr_op = IB_WR_SEND;
+	send_wr.wr_id = (unsigned long)ctxt;
+	send_wr.sg_list = ctxt->sge;
+	send_wr.num_sge = sge_no;
+	send_wr.opcode = IB_WR_SEND;
+	send_wr.send_flags =  IB_SEND_SIGNALED;
+
+	ret = svc_rdma_send(rdma, &send_wr);
+	if (ret)
+		svc_rdma_put_context(ctxt, 1);
+
+	return ret;
+}
+
+void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+}
+
+/*
+ * Return the start of an xdr buffer.
+ */
+static void *xdr_start(struct xdr_buf *xdr)
+{
+	return xdr->head[0].iov_base -
+		(xdr->len -
+		 xdr->page_len -
+		 xdr->tail[0].iov_len -
+		 xdr->head[0].iov_len);
+}
+
+int svc_rdma_sendto(struct svc_rqst *rqstp)
+{
+	struct svc_xprt *xprt = rqstp->rq_xprt;
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	struct rpcrdma_msg *rdma_argp;
+	struct rpcrdma_msg *rdma_resp;
+	struct rpcrdma_write_array *reply_ary;
+	enum rpcrdma_proc reply_type;
+	int ret;
+	int inline_bytes;
+	struct ib_sge *sge;
+	int sge_count = 0;
+	struct page *res_page;
+	struct svc_rdma_op_ctxt *ctxt;
+
+	dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+
+	/* Get the RDMA request header. */
+	rdma_argp = xdr_start(&rqstp->rq_arg);
+
+	/* Build an SGE for the XDR */
+	ctxt = svc_rdma_get_context(rdma);
+	ctxt->direction = DMA_TO_DEVICE;
+	sge = xdr_to_sge(rdma, &rqstp->rq_res, ctxt->sge, &sge_count);
+
+	inline_bytes = rqstp->rq_res.len;
+
+	/* Create the RDMA response header */
+	res_page = svc_rdma_get_page();
+	rdma_resp = page_address(res_page);
+	reply_ary = svc_rdma_get_reply_array(rdma_argp);
+	if (reply_ary)
+		reply_type = RDMA_NOMSG;
+	else
+		reply_type = RDMA_MSG;
+	svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
+					 rdma_resp, reply_type);
+
+	/* Send any write-chunk data and build resp write-list */
+	ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
+				rqstp, sge, sge_count);
+	if (ret < 0) {
+		printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
+		       ret);
+		goto error;
+	}
+	inline_bytes -= ret;
+
+	/* Send any reply-list data and update resp reply-list */
+	ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
+				rqstp, sge, sge_count);
+	if (ret < 0) {
+		printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
+		       ret);
+		goto error;
+	}
+	inline_bytes -= ret;
+
+	ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, sge_count,
+			 inline_bytes);
+	dprintk("svcrdma: send_reply returns %d\n", ret);
+	return ret;
+ error:
+	svc_rdma_put_context(ctxt, 0);
+	put_page(res_page);
+	return ret;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
new file mode 100644
index 000000000000..f09444c451bc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -0,0 +1,1080 @@
+/*
+ * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ *      Redistributions of source code must retain the above copyright
+ *      notice, this list of conditions and the following disclaimer.
+ *
+ *      Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials provided
+ *      with the distribution.
+ *
+ *      Neither the name of the Network Appliance, Inc. nor the names of
+ *      its contributors may be used to endorse or promote products
+ *      derived from this software without specific prior written
+ *      permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/svc_xprt.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+					struct sockaddr *sa, int salen,
+					int flags);
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
+static void svc_rdma_release_rqst(struct svc_rqst *);
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt);
+static void dto_tasklet_func(unsigned long data);
+static void svc_rdma_detach(struct svc_xprt *xprt);
+static void svc_rdma_free(struct svc_xprt *xprt);
+static int svc_rdma_has_wspace(struct svc_xprt *xprt);
+static void rq_cq_reap(struct svcxprt_rdma *xprt);
+static void sq_cq_reap(struct svcxprt_rdma *xprt);
+
+DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
+static DEFINE_SPINLOCK(dto_lock);
+static LIST_HEAD(dto_xprt_q);
+
+static struct svc_xprt_ops svc_rdma_ops = {
+	.xpo_create = svc_rdma_create,
+	.xpo_recvfrom = svc_rdma_recvfrom,
+	.xpo_sendto = svc_rdma_sendto,
+	.xpo_release_rqst = svc_rdma_release_rqst,
+	.xpo_detach = svc_rdma_detach,
+	.xpo_free = svc_rdma_free,
+	.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+	.xpo_has_wspace = svc_rdma_has_wspace,
+	.xpo_accept = svc_rdma_accept,
+};
+
+struct svc_xprt_class svc_rdma_class = {
+	.xcl_name = "rdma",
+	.xcl_owner = THIS_MODULE,
+	.xcl_ops = &svc_rdma_ops,
+	.xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
+};
+
+static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
+{
+	int target;
+	int at_least_one = 0;
+	struct svc_rdma_op_ctxt *ctxt;
+
+	target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump,
+		     xprt->sc_ctxt_max);
+
+	spin_lock_bh(&xprt->sc_ctxt_lock);
+	while (xprt->sc_ctxt_cnt < target) {
+		xprt->sc_ctxt_cnt++;
+		spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+		ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+
+		spin_lock_bh(&xprt->sc_ctxt_lock);
+		if (ctxt) {
+			at_least_one = 1;
+			ctxt->next = xprt->sc_ctxt_head;
+			xprt->sc_ctxt_head = ctxt;
+		} else {
+			/* kmalloc failed...give up for now */
+			xprt->sc_ctxt_cnt--;
+			break;
+		}
+	}
+	spin_unlock_bh(&xprt->sc_ctxt_lock);
+	dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n",
+		xprt->sc_ctxt_max, xprt->sc_ctxt_cnt);
+	return at_least_one;
+}
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+
+	while (1) {
+		spin_lock_bh(&xprt->sc_ctxt_lock);
+		if (unlikely(xprt->sc_ctxt_head == NULL)) {
+			/* Try to bump my cache. */
+			spin_unlock_bh(&xprt->sc_ctxt_lock);
+
+			if (rdma_bump_context_cache(xprt))
+				continue;
+
+			printk(KERN_INFO "svcrdma: sleeping waiting for "
+			       "context memory on xprt=%p\n",
+			       xprt);
+			schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+			continue;
+		}
+		ctxt = xprt->sc_ctxt_head;
+		xprt->sc_ctxt_head = ctxt->next;
+		spin_unlock_bh(&xprt->sc_ctxt_lock);
+		ctxt->xprt = xprt;
+		INIT_LIST_HEAD(&ctxt->dto_q);
+		ctxt->count = 0;
+		break;
+	}
+	return ctxt;
+}
+
+void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+	struct svcxprt_rdma *xprt;
+	int i;
+
+	BUG_ON(!ctxt);
+	xprt = ctxt->xprt;
+	if (free_pages)
+		for (i = 0; i < ctxt->count; i++)
+			put_page(ctxt->pages[i]);
+
+	for (i = 0; i < ctxt->count; i++)
+		dma_unmap_single(xprt->sc_cm_id->device->dma_device,
+				 ctxt->sge[i].addr,
+				 ctxt->sge[i].length,
+				 ctxt->direction);
+	spin_lock_bh(&xprt->sc_ctxt_lock);
+	ctxt->next = xprt->sc_ctxt_head;
+	xprt->sc_ctxt_head = ctxt;
+	spin_unlock_bh(&xprt->sc_ctxt_lock);
+}
+
+/* ib_cq event handler */
+static void cq_event_handler(struct ib_event *event, void *context)
+{
+	struct svc_xprt *xprt = context;
+	dprintk("svcrdma: received CQ event id=%d, context=%p\n",
+		event->event, context);
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+}
+
+/* QP event handler */
+static void qp_event_handler(struct ib_event *event, void *context)
+{
+	struct svc_xprt *xprt = context;
+
+	switch (event->event) {
+	/* These are considered benign events */
+	case IB_EVENT_PATH_MIG:
+	case IB_EVENT_COMM_EST:
+	case IB_EVENT_SQ_DRAINED:
+	case IB_EVENT_QP_LAST_WQE_REACHED:
+		dprintk("svcrdma: QP event %d received for QP=%p\n",
+			event->event, event->element.qp);
+		break;
+	/* These are considered fatal events */
+	case IB_EVENT_PATH_MIG_ERR:
+	case IB_EVENT_QP_FATAL:
+	case IB_EVENT_QP_REQ_ERR:
+	case IB_EVENT_QP_ACCESS_ERR:
+	case IB_EVENT_DEVICE_FATAL:
+	default:
+		dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
+			"closing transport\n",
+			event->event, event->element.qp);
+		set_bit(XPT_CLOSE, &xprt->xpt_flags);
+		break;
+	}
+}
+
+/*
+ * Data Transfer Operation Tasklet
+ *
+ * Walks a list of transports with I/O pending, removing entries as
+ * they are added to the server's I/O pending list. Two bits indicate
+ * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
+ * spinlock that serializes access to the transport list with the RQ
+ * and SQ interrupt handlers.
+ */
+static void dto_tasklet_func(unsigned long data)
+{
+	struct svcxprt_rdma *xprt;
+	unsigned long flags;
+
+	spin_lock_irqsave(&dto_lock, flags);
+	while (!list_empty(&dto_xprt_q)) {
+		xprt = list_entry(dto_xprt_q.next,
+				  struct svcxprt_rdma, sc_dto_q);
+		list_del_init(&xprt->sc_dto_q);
+		spin_unlock_irqrestore(&dto_lock, flags);
+
+		if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
+			ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+			rq_cq_reap(xprt);
+			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+			/*
+			 * If data arrived before established event,
+			 * don't enqueue. This defers RPC I/O until the
+			 * RDMA connection is complete.
+			 */
+			if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+				svc_xprt_enqueue(&xprt->sc_xprt);
+		}
+
+		if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
+			ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+			sq_cq_reap(xprt);
+		}
+
+		spin_lock_irqsave(&dto_lock, flags);
+	}
+	spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+/*
+ * Receive Queue Completion Handler
+ *
+ * Since an RQ completion handler is called on interrupt context, we
+ * need to defer the handling of the I/O to a tasklet
+ */
+static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+	struct svcxprt_rdma *xprt = cq_context;
+	unsigned long flags;
+
+	/*
+	 * Set the bit regardless of whether or not it's on the list
+	 * because it may be on the list already due to an SQ
+	 * completion.
+	*/
+	set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+
+	/*
+	 * If this transport is not already on the DTO transport queue,
+	 * add it
+	 */
+	spin_lock_irqsave(&dto_lock, flags);
+	if (list_empty(&xprt->sc_dto_q))
+		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+	spin_unlock_irqrestore(&dto_lock, flags);
+
+	/* Tasklet does all the work to avoid irqsave locks. */
+	tasklet_schedule(&dto_tasklet);
+}
+
+/*
+ * rq_cq_reap - Process the RQ CQ.
+ *
+ * Take all completing WC off the CQE and enqueue the associated DTO
+ * context on the dto_q for the transport.
+ */
+static void rq_cq_reap(struct svcxprt_rdma *xprt)
+{
+	int ret;
+	struct ib_wc wc;
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+
+	atomic_inc(&rdma_stat_rq_poll);
+
+	spin_lock_bh(&xprt->sc_rq_dto_lock);
+	while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
+		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+		ctxt->wc_status = wc.status;
+		ctxt->byte_len = wc.byte_len;
+		if (wc.status != IB_WC_SUCCESS) {
+			/* Close the transport */
+			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+			svc_rdma_put_context(ctxt, 1);
+			continue;
+		}
+		list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+	}
+	spin_unlock_bh(&xprt->sc_rq_dto_lock);
+
+	if (ctxt)
+		atomic_inc(&rdma_stat_rq_prod);
+}
+
+/*
+ * Send Queue Completion Handler - potentially called on interrupt context.
+ */
+static void sq_cq_reap(struct svcxprt_rdma *xprt)
+{
+	struct svc_rdma_op_ctxt *ctxt = NULL;
+	struct ib_wc wc;
+	struct ib_cq *cq = xprt->sc_sq_cq;
+	int ret;
+
+	atomic_inc(&rdma_stat_sq_poll);
+	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
+		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+		xprt = ctxt->xprt;
+
+		if (wc.status != IB_WC_SUCCESS)
+			/* Close the transport */
+			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+
+		/* Decrement used SQ WR count */
+		atomic_dec(&xprt->sc_sq_count);
+		wake_up(&xprt->sc_send_wait);
+
+		switch (ctxt->wr_op) {
+		case IB_WR_SEND:
+		case IB_WR_RDMA_WRITE:
+			svc_rdma_put_context(ctxt, 1);
+			break;
+
+		case IB_WR_RDMA_READ:
+			if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+				set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+				set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
+				spin_lock_bh(&xprt->sc_read_complete_lock);
+				list_add_tail(&ctxt->dto_q,
+					      &xprt->sc_read_complete_q);
+				spin_unlock_bh(&xprt->sc_read_complete_lock);
+				svc_xprt_enqueue(&xprt->sc_xprt);
+			}
+			break;
+
+		default:
+			printk(KERN_ERR "svcrdma: unexpected completion type, "
+			       "opcode=%d, status=%d\n",
+			       wc.opcode, wc.status);
+			break;
+		}
+	}
+
+	if (ctxt)
+		atomic_inc(&rdma_stat_sq_prod);
+}
+
+static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+	struct svcxprt_rdma *xprt = cq_context;
+	unsigned long flags;
+
+	/*
+	 * Set the bit regardless of whether or not it's on the list
+	 * because it may be on the list already due to an RQ
+	 * completion.
+	*/
+	set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
+
+	/*
+	 * If this transport is not already on the DTO transport queue,
+	 * add it
+	 */
+	spin_lock_irqsave(&dto_lock, flags);
+	if (list_empty(&xprt->sc_dto_q))
+		list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+	spin_unlock_irqrestore(&dto_lock, flags);
+
+	/* Tasklet does all the work to avoid irqsave locks. */
+	tasklet_schedule(&dto_tasklet);
+}
+
+static void create_context_cache(struct svcxprt_rdma *xprt,
+				 int ctxt_count, int ctxt_bump, int ctxt_max)
+{
+	struct svc_rdma_op_ctxt *ctxt;
+	int i;
+
+	xprt->sc_ctxt_max = ctxt_max;
+	xprt->sc_ctxt_bump = ctxt_bump;
+	xprt->sc_ctxt_cnt = 0;
+	xprt->sc_ctxt_head = NULL;
+	for (i = 0; i < ctxt_count; i++) {
+		ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+		if (ctxt) {
+			ctxt->next = xprt->sc_ctxt_head;
+			xprt->sc_ctxt_head = ctxt;
+			xprt->sc_ctxt_cnt++;
+		}
+	}
+}
+
+static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+{
+	struct svc_rdma_op_ctxt *next;
+	if (!ctxt)
+		return;
+
+	do {
+		next = ctxt->next;
+		kfree(ctxt);
+		ctxt = next;
+	} while (next);
+}
+
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
+					     int listener)
+{
+	struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
+
+	if (!cma_xprt)
+		return NULL;
+	svc_xprt_init(&svc_rdma_class, &cma_xprt->sc_xprt, serv);
+	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
+	INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
+	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
+	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
+	init_waitqueue_head(&cma_xprt->sc_send_wait);
+
+	spin_lock_init(&cma_xprt->sc_lock);
+	spin_lock_init(&cma_xprt->sc_read_complete_lock);
+	spin_lock_init(&cma_xprt->sc_ctxt_lock);
+	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
+
+	cma_xprt->sc_ord = svcrdma_ord;
+
+	cma_xprt->sc_max_req_size = svcrdma_max_req_size;
+	cma_xprt->sc_max_requests = svcrdma_max_requests;
+	cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
+	atomic_set(&cma_xprt->sc_sq_count, 0);
+
+	if (!listener) {
+		int reqs = cma_xprt->sc_max_requests;
+		create_context_cache(cma_xprt,
+				     reqs << 1, /* starting size */
+				     reqs,	/* bump amount */
+				     reqs +
+				     cma_xprt->sc_sq_depth +
+				     RPCRDMA_MAX_THREADS + 1); /* max */
+		if (!cma_xprt->sc_ctxt_head) {
+			kfree(cma_xprt);
+			return NULL;
+		}
+		clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+	} else
+		set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+
+	return cma_xprt;
+}
+
+struct page *svc_rdma_get_page(void)
+{
+	struct page *page;
+
+	while ((page = alloc_page(GFP_KERNEL)) == NULL) {
+		/* If we can't get memory, wait a bit and try again */
+		printk(KERN_INFO "svcrdma: out of memory...retrying in 1000 "
+		       "jiffies.\n");
+		schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+	}
+	return page;
+}
+
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+{
+	struct ib_recv_wr recv_wr, *bad_recv_wr;
+	struct svc_rdma_op_ctxt *ctxt;
+	struct page *page;
+	unsigned long pa;
+	int sge_no;
+	int buflen;
+	int ret;
+
+	ctxt = svc_rdma_get_context(xprt);
+	buflen = 0;
+	ctxt->direction = DMA_FROM_DEVICE;
+	for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
+		BUG_ON(sge_no >= xprt->sc_max_sge);
+		page = svc_rdma_get_page();
+		ctxt->pages[sge_no] = page;
+		pa = ib_dma_map_page(xprt->sc_cm_id->device,
+				     page, 0, PAGE_SIZE,
+				     DMA_FROM_DEVICE);
+		ctxt->sge[sge_no].addr = pa;
+		ctxt->sge[sge_no].length = PAGE_SIZE;
+		ctxt->sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
+		buflen += PAGE_SIZE;
+	}
+	ctxt->count = sge_no;
+	recv_wr.next = NULL;
+	recv_wr.sg_list = &ctxt->sge[0];
+	recv_wr.num_sge = ctxt->count;
+	recv_wr.wr_id = (u64)(unsigned long)ctxt;
+
+	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+	return ret;
+}
+
+/*
+ * This function handles the CONNECT_REQUEST event on a listening
+ * endpoint. It is passed the cma_id for the _new_ connection. The context in
+ * this cma_id is inherited from the listening cma_id and is the svc_xprt
+ * structure for the listening endpoint.
+ *
+ * This function creates a new xprt for the new connection and enqueues it on
+ * the accept queue for the listent xprt. When the listen thread is kicked, it
+ * will call the recvfrom method on the listen xprt which will accept the new
+ * connection.
+ */
+static void handle_connect_req(struct rdma_cm_id *new_cma_id)
+{
+	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
+	struct svcxprt_rdma *newxprt;
+
+	/* Create a new transport */
+	newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
+	if (!newxprt) {
+		dprintk("svcrdma: failed to create new transport\n");
+		return;
+	}
+	newxprt->sc_cm_id = new_cma_id;
+	new_cma_id->context = newxprt;
+	dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
+		newxprt, newxprt->sc_cm_id, listen_xprt);
+
+	/*
+	 * Enqueue the new transport on the accept queue of the listening
+	 * transport
+	 */
+	spin_lock_bh(&listen_xprt->sc_lock);
+	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
+	spin_unlock_bh(&listen_xprt->sc_lock);
+
+	/*
+	 * Can't use svc_xprt_received here because we are not on a
+	 * rqstp thread
+	*/
+	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
+	svc_xprt_enqueue(&listen_xprt->sc_xprt);
+}
+
+/*
+ * Handles events generated on the listening endpoint. These events will be
+ * either be incoming connect requests or adapter removal  events.
+ */
+static int rdma_listen_handler(struct rdma_cm_id *cma_id,
+			       struct rdma_cm_event *event)
+{
+	struct svcxprt_rdma *xprt = cma_id->context;
+	int ret = 0;
+
+	switch (event->event) {
+	case RDMA_CM_EVENT_CONNECT_REQUEST:
+		dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
+			"event=%d\n", cma_id, cma_id->context, event->event);
+		handle_connect_req(cma_id);
+		break;
+
+	case RDMA_CM_EVENT_ESTABLISHED:
+		/* Accept complete */
+		dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
+			"cm_id=%p\n", xprt, cma_id);
+		break;
+
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+		dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
+			xprt, cma_id);
+		if (xprt)
+			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+		break;
+
+	default:
+		dprintk("svcrdma: Unexpected event on listening endpoint %p, "
+			"event=%d\n", cma_id, event->event);
+		break;
+	}
+
+	return ret;
+}
+
+static int rdma_cma_handler(struct rdma_cm_id *cma_id,
+			    struct rdma_cm_event *event)
+{
+	struct svc_xprt *xprt = cma_id->context;
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	switch (event->event) {
+	case RDMA_CM_EVENT_ESTABLISHED:
+		/* Accept complete */
+		dprintk("svcrdma: Connection completed on DTO xprt=%p, "
+			"cm_id=%p\n", xprt, cma_id);
+		clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
+		svc_xprt_enqueue(xprt);
+		break;
+	case RDMA_CM_EVENT_DISCONNECTED:
+		dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
+			xprt, cma_id);
+		if (xprt) {
+			set_bit(XPT_CLOSE, &xprt->xpt_flags);
+			svc_xprt_enqueue(xprt);
+		}
+		break;
+	case RDMA_CM_EVENT_DEVICE_REMOVAL:
+		dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
+			"event=%d\n", cma_id, xprt, event->event);
+		if (xprt) {
+			set_bit(XPT_CLOSE, &xprt->xpt_flags);
+			svc_xprt_enqueue(xprt);
+		}
+		break;
+	default:
+		dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
+			"event=%d\n", cma_id, event->event);
+		break;
+	}
+	return 0;
+}
+
+/*
+ * Create a listening RDMA service endpoint.
+ */
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+					struct sockaddr *sa, int salen,
+					int flags)
+{
+	struct rdma_cm_id *listen_id;
+	struct svcxprt_rdma *cma_xprt;
+	struct svc_xprt *xprt;
+	int ret;
+
+	dprintk("svcrdma: Creating RDMA socket\n");
+
+	cma_xprt = rdma_create_xprt(serv, 1);
+	if (!cma_xprt)
+		return ERR_PTR(ENOMEM);
+	xprt = &cma_xprt->sc_xprt;
+
+	listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
+	if (IS_ERR(listen_id)) {
+		rdma_destroy_xprt(cma_xprt);
+		dprintk("svcrdma: rdma_create_id failed = %ld\n",
+			PTR_ERR(listen_id));
+		return (void *)listen_id;
+	}
+	ret = rdma_bind_addr(listen_id, sa);
+	if (ret) {
+		rdma_destroy_xprt(cma_xprt);
+		rdma_destroy_id(listen_id);
+		dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
+		return ERR_PTR(ret);
+	}
+	cma_xprt->sc_cm_id = listen_id;
+
+	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
+	if (ret) {
+		rdma_destroy_id(listen_id);
+		rdma_destroy_xprt(cma_xprt);
+		dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+	}
+
+	/*
+	 * We need to use the address from the cm_id in case the
+	 * caller specified 0 for the port number.
+	 */
+	sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
+	svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
+
+	return &cma_xprt->sc_xprt;
+}
+
+/*
+ * This is the xpo_recvfrom function for listening endpoints. Its
+ * purpose is to accept incoming connections. The CMA callback handler
+ * has already created a new transport and attached it to the new CMA
+ * ID.
+ *
+ * There is a queue of pending connections hung on the listening
+ * transport. This queue contains the new svc_xprt structure. This
+ * function takes svc_xprt structures off the accept_q and completes
+ * the connection.
+ */
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *listen_rdma;
+	struct svcxprt_rdma *newxprt = NULL;
+	struct rdma_conn_param conn_param;
+	struct ib_qp_init_attr qp_attr;
+	struct ib_device_attr devattr;
+	struct sockaddr *sa;
+	int ret;
+	int i;
+
+	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	clear_bit(XPT_CONN, &xprt->xpt_flags);
+	/* Get the next entry off the accept list */
+	spin_lock_bh(&listen_rdma->sc_lock);
+	if (!list_empty(&listen_rdma->sc_accept_q)) {
+		newxprt = list_entry(listen_rdma->sc_accept_q.next,
+				     struct svcxprt_rdma, sc_accept_q);
+		list_del_init(&newxprt->sc_accept_q);
+	}
+	if (!list_empty(&listen_rdma->sc_accept_q))
+		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
+	spin_unlock_bh(&listen_rdma->sc_lock);
+	if (!newxprt)
+		return NULL;
+
+	dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
+		newxprt, newxprt->sc_cm_id);
+
+	ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
+	if (ret) {
+		dprintk("svcrdma: could not query device attributes on "
+			"device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
+		goto errout;
+	}
+
+	/* Qualify the transport resource defaults with the
+	 * capabilities of this particular device */
+	newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+				  (size_t)RPCSVC_MAXPAGES);
+	newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
+				   (size_t)svcrdma_max_requests);
+	newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+
+	newxprt->sc_ord =  min((size_t)devattr.max_qp_rd_atom,
+			       (size_t)svcrdma_ord);
+
+	newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+	if (IS_ERR(newxprt->sc_pd)) {
+		dprintk("svcrdma: error creating PD for connect request\n");
+		goto errout;
+	}
+	newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+					 sq_comp_handler,
+					 cq_event_handler,
+					 newxprt,
+					 newxprt->sc_sq_depth,
+					 0);
+	if (IS_ERR(newxprt->sc_sq_cq)) {
+		dprintk("svcrdma: error creating SQ CQ for connect request\n");
+		goto errout;
+	}
+	newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+					 rq_comp_handler,
+					 cq_event_handler,
+					 newxprt,
+					 newxprt->sc_max_requests,
+					 0);
+	if (IS_ERR(newxprt->sc_rq_cq)) {
+		dprintk("svcrdma: error creating RQ CQ for connect request\n");
+		goto errout;
+	}
+
+	memset(&qp_attr, 0, sizeof qp_attr);
+	qp_attr.event_handler = qp_event_handler;
+	qp_attr.qp_context = &newxprt->sc_xprt;
+	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+	qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+	qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
+	qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
+	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+	qp_attr.qp_type = IB_QPT_RC;
+	qp_attr.send_cq = newxprt->sc_sq_cq;
+	qp_attr.recv_cq = newxprt->sc_rq_cq;
+	dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
+		"    cm_id->device=%p, sc_pd->device=%p\n"
+		"    cap.max_send_wr = %d\n"
+		"    cap.max_recv_wr = %d\n"
+		"    cap.max_send_sge = %d\n"
+		"    cap.max_recv_sge = %d\n",
+		newxprt->sc_cm_id, newxprt->sc_pd,
+		newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+		qp_attr.cap.max_send_wr,
+		qp_attr.cap.max_recv_wr,
+		qp_attr.cap.max_send_sge,
+		qp_attr.cap.max_recv_sge);
+
+	ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+	if (ret) {
+		/*
+		 * XXX: This is a hack. We need a xx_request_qp interface
+		 * that will adjust the qp_attr's with a best-effort
+		 * number
+		 */
+		qp_attr.cap.max_send_sge -= 2;
+		qp_attr.cap.max_recv_sge -= 2;
+		ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd,
+				     &qp_attr);
+		if (ret) {
+			dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
+			goto errout;
+		}
+		newxprt->sc_max_sge = qp_attr.cap.max_send_sge;
+		newxprt->sc_max_sge = qp_attr.cap.max_recv_sge;
+		newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
+		newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
+	}
+	newxprt->sc_qp = newxprt->sc_cm_id->qp;
+
+	/* Register all of physical memory */
+	newxprt->sc_phys_mr = ib_get_dma_mr(newxprt->sc_pd,
+					    IB_ACCESS_LOCAL_WRITE |
+					    IB_ACCESS_REMOTE_WRITE);
+	if (IS_ERR(newxprt->sc_phys_mr)) {
+		dprintk("svcrdma: Failed to create DMA MR ret=%d\n", ret);
+		goto errout;
+	}
+
+	/* Post receive buffers */
+	for (i = 0; i < newxprt->sc_max_requests; i++) {
+		ret = svc_rdma_post_recv(newxprt);
+		if (ret) {
+			dprintk("svcrdma: failure posting receive buffers\n");
+			goto errout;
+		}
+	}
+
+	/* Swap out the handler */
+	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+
+	/* Accept Connection */
+	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
+	memset(&conn_param, 0, sizeof conn_param);
+	conn_param.responder_resources = 0;
+	conn_param.initiator_depth = newxprt->sc_ord;
+	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+	if (ret) {
+		dprintk("svcrdma: failed to accept new connection, ret=%d\n",
+		       ret);
+		goto errout;
+	}
+
+	dprintk("svcrdma: new connection %p accepted with the following "
+		"attributes:\n"
+		"    local_ip        : %d.%d.%d.%d\n"
+		"    local_port	     : %d\n"
+		"    remote_ip       : %d.%d.%d.%d\n"
+		"    remote_port     : %d\n"
+		"    max_sge         : %d\n"
+		"    sq_depth        : %d\n"
+		"    max_requests    : %d\n"
+		"    ord             : %d\n",
+		newxprt,
+		NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+			 route.addr.src_addr)->sin_addr.s_addr),
+		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+		       route.addr.src_addr)->sin_port),
+		NIPQUAD(((struct sockaddr_in *)&newxprt->sc_cm_id->
+			 route.addr.dst_addr)->sin_addr.s_addr),
+		ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+		       route.addr.dst_addr)->sin_port),
+		newxprt->sc_max_sge,
+		newxprt->sc_sq_depth,
+		newxprt->sc_max_requests,
+		newxprt->sc_ord);
+
+	/* Set the local and remote addresses in the transport */
+	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+	svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+
+	ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+	ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+	return &newxprt->sc_xprt;
+
+ errout:
+	dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
+	rdma_destroy_id(newxprt->sc_cm_id);
+	rdma_destroy_xprt(newxprt);
+	return NULL;
+}
+
+/*
+ * Post an RQ WQE to the RQ when the rqst is being released. This
+ * effectively returns an RQ credit to the client. The rq_xprt_ctxt
+ * will be null if the request is deferred due to an RDMA_READ or the
+ * transport had no data ready (EAGAIN). Note that an RPC deferred in
+ * svc_process will still return the credit, this is because the data
+ * is copied and no longer consume a WQE/WC.
+ */
+static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
+{
+	int err;
+	struct svcxprt_rdma *rdma =
+		container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
+	if (rqstp->rq_xprt_ctxt) {
+		BUG_ON(rqstp->rq_xprt_ctxt != rdma);
+		err = svc_rdma_post_recv(rdma);
+		if (err)
+			dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
+				err);
+	}
+	rqstp->rq_xprt_ctxt = NULL;
+}
+
+/* Disable data ready events for this connection */
+static void svc_rdma_detach(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	unsigned long flags;
+
+	dprintk("svc: svc_rdma_detach(%p)\n", xprt);
+	/*
+	 * Shutdown the connection. This will ensure we don't get any
+	 * more events from the provider.
+	 */
+	rdma_disconnect(rdma->sc_cm_id);
+	rdma_destroy_id(rdma->sc_cm_id);
+
+	/* We may already be on the DTO list */
+	spin_lock_irqsave(&dto_lock, flags);
+	if (!list_empty(&rdma->sc_dto_q))
+		list_del_init(&rdma->sc_dto_q);
+	spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+	dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+	rdma_destroy_xprt(rdma);
+	kfree(rdma);
+}
+
+static void rdma_destroy_xprt(struct svcxprt_rdma *xprt)
+{
+	if (xprt->sc_qp && !IS_ERR(xprt->sc_qp))
+		ib_destroy_qp(xprt->sc_qp);
+
+	if (xprt->sc_sq_cq && !IS_ERR(xprt->sc_sq_cq))
+		ib_destroy_cq(xprt->sc_sq_cq);
+
+	if (xprt->sc_rq_cq && !IS_ERR(xprt->sc_rq_cq))
+		ib_destroy_cq(xprt->sc_rq_cq);
+
+	if (xprt->sc_phys_mr && !IS_ERR(xprt->sc_phys_mr))
+		ib_dereg_mr(xprt->sc_phys_mr);
+
+	if (xprt->sc_pd && !IS_ERR(xprt->sc_pd))
+		ib_dealloc_pd(xprt->sc_pd);
+
+	destroy_context_cache(xprt->sc_ctxt_head);
+}
+
+static int svc_rdma_has_wspace(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+	/*
+	 * If there are fewer SQ WR available than required to send a
+	 * simple response, return false.
+	 */
+	if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
+		return 0;
+
+	/*
+	 * ...or there are already waiters on the SQ,
+	 * return false.
+	 */
+	if (waitqueue_active(&rdma->sc_send_wait))
+		return 0;
+
+	/* Otherwise return true. */
+	return 1;
+}
+
+int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
+{
+	struct ib_send_wr *bad_wr;
+	int ret;
+
+	if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
+		return 0;
+
+	BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
+	BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
+		wr->opcode);
+	/* If the SQ is full, wait until an SQ entry is available */
+	while (1) {
+		spin_lock_bh(&xprt->sc_lock);
+		if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
+			spin_unlock_bh(&xprt->sc_lock);
+			atomic_inc(&rdma_stat_sq_starve);
+			/* See if we can reap some SQ WR */
+			sq_cq_reap(xprt);
+
+			/* Wait until SQ WR available if SQ still full */
+			wait_event(xprt->sc_send_wait,
+				   atomic_read(&xprt->sc_sq_count) <
+				   xprt->sc_sq_depth);
+			continue;
+		}
+		/* Bumped used SQ WR count and post */
+		ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
+		if (!ret)
+			atomic_inc(&xprt->sc_sq_count);
+		else
+			dprintk("svcrdma: failed to post SQ WR rc=%d, "
+			       "sc_sq_count=%d, sc_sq_depth=%d\n",
+			       ret, atomic_read(&xprt->sc_sq_count),
+			       xprt->sc_sq_depth);
+		spin_unlock_bh(&xprt->sc_lock);
+		break;
+	}
+	return ret;
+}
+
+int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+			enum rpcrdma_errcode err)
+{
+	struct ib_send_wr err_wr;
+	struct ib_sge sge;
+	struct page *p;
+	struct svc_rdma_op_ctxt *ctxt;
+	u32 *va;
+	int length;
+	int ret;
+
+	p = svc_rdma_get_page();
+	va = page_address(p);
+
+	/* XDR encode error */
+	length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+	/* Prepare SGE for local address */
+	sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
+				   p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
+	sge.lkey = xprt->sc_phys_mr->lkey;
+	sge.length = length;
+
+	ctxt = svc_rdma_get_context(xprt);
+	ctxt->count = 1;
+	ctxt->pages[0] = p;
+
+	/* Prepare SEND WR */
+	memset(&err_wr, 0, sizeof err_wr);
+	ctxt->wr_op = IB_WR_SEND;
+	err_wr.wr_id = (unsigned long)ctxt;
+	err_wr.sg_list = &sge;
+	err_wr.num_sge = 1;
+	err_wr.opcode = IB_WR_SEND;
+	err_wr.send_flags = IB_SEND_SIGNALED;
+
+	/* Post It */
+	ret = svc_rdma_send(xprt, &err_wr);
+	if (ret) {
+		dprintk("svcrdma: Error posting send = %d\n", ret);
+		svc_rdma_put_context(ctxt, 1);
+	}
+
+	return ret;
+}