summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@g5.osdl.org>2006-09-23 16:58:40 -0700
committerLinus Torvalds <torvalds@g5.osdl.org>2006-09-23 16:58:40 -0700
commit9f261e011340bcd22c1dd48b465153bd78caa8c8 (patch)
treeb1c266ea746a0e8591e6af781aef22854e652ff9 /net
parenta4c12d6c5dde48c69464baf7c703e425ee511433 (diff)
parent026ed5c9185dcc4b2df92e98c3d61a01cea19cbf (diff)
downloadlinux-9f261e011340bcd22c1dd48b465153bd78caa8c8.tar.gz
Merge git://git.linux-nfs.org/pub/linux/nfs-2.6
* git://git.linux-nfs.org/pub/linux/nfs-2.6: (74 commits)
  NFS: unmark NFS direct I/O as experimental
  NFS: add comments clarifying the use of nfs_post_op_update()
  NFSv4: rpc_mkpipe creating socket inodes w/out sk buffers
  NFS: Use SEEK_END instead of hardcoded value
  NFSv4: When mounting with a port=0 argument, substitute port=2049
  NFSv4: Poll more aggressively when handling NFS4ERR_DELAY
  NFSv4: Handle the condition NFS4ERR_FILE_OPEN
  NFSv4: Retry lease recovery if it failed during a synchronous operation.
  NFS: Don't invalidate the symlink we just stuffed into the cache
  NFS: Make read() return an ESTALE if the file has been deleted
  NFSv4: It's perfectly legal for clp to be NULL here....
  NFS: nfs_lookup - don't hash dentry when optimising away the lookup
  SUNRPC: Fix Oops in pmap_getport_done
  SUNRPC: Add refcounting to the struct rpc_xprt
  SUNRPC: Clean up soft task error handling
  SUNRPC: Handle ENETUNREACH, EHOSTUNREACH and EHOSTDOWN socket errors
  SUNRPC: rpc_delay() should not clobber the rpc_task->tk_status
  Fix a referral error Oops
  NFS: NFS_ROOT should use the new rpc_create API
  NFS: Fix up compiler warnings on 64-bit platforms in client.c
  ...

Manually resolved conflict in net/sunrpc/xprtsock.c
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c7
-rw-r--r--net/sunrpc/clnt.c194
-rw-r--r--net/sunrpc/pmap_clnt.c266
-rw-r--r--net/sunrpc/rpc_pipe.c46
-rw-r--r--net/sunrpc/sched.c99
-rw-r--r--net/sunrpc/sunrpc_syms.c3
-rw-r--r--net/sunrpc/timer.c2
-rw-r--r--net/sunrpc/xprt.c86
-rw-r--r--net/sunrpc/xprtsock.c108
9 files changed, 460 insertions, 351 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index ef1cf5b476c8..6eed3e166ba3 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -88,7 +88,6 @@ struct gss_auth {
 	struct list_head upcalls;
 	struct rpc_clnt *client;
 	struct dentry *dentry;
-	char path[48];
 	spinlock_t lock;
 };
 
@@ -690,10 +689,8 @@ gss_create(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
 	if (err)
 		goto err_put_mech;
 
-	snprintf(gss_auth->path, sizeof(gss_auth->path), "%s/%s",
-			clnt->cl_pathname,
-			gss_auth->mech->gm_name);
-	gss_auth->dentry = rpc_mkpipe(gss_auth->path, clnt, &gss_upcall_ops, RPC_PIPE_WAIT_FOR_OPEN);
+	gss_auth->dentry = rpc_mkpipe(clnt->cl_dentry, gss_auth->mech->gm_name,
+			clnt, &gss_upcall_ops, RPC_PIPE_WAIT_FOR_OPEN);
 	if (IS_ERR(gss_auth->dentry)) {
 		err = PTR_ERR(gss_auth->dentry);
 		goto err_put_mech;
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 3e19d321067a..084a0ad5c64e 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -97,17 +97,7 @@ rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
 	}
 }
 
-/*
- * Create an RPC client
- * FIXME: This should also take a flags argument (as in task->tk_flags).
- * It's called (among others) from pmap_create_client, which may in
- * turn be called by an async task. In this case, rpciod should not be
- * made to sleep too long.
- */
-struct rpc_clnt *
-rpc_new_client(struct rpc_xprt *xprt, char *servname,
-		  struct rpc_program *program, u32 vers,
-		  rpc_authflavor_t flavor)
+static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
 {
 	struct rpc_version	*version;
 	struct rpc_clnt		*clnt = NULL;
@@ -147,16 +137,12 @@ rpc_new_client(struct rpc_xprt *xprt, char *servname,
 	clnt->cl_procinfo = version->procs;
 	clnt->cl_maxproc  = version->nrprocs;
 	clnt->cl_protname = program->name;
-	clnt->cl_pmap	  = &clnt->cl_pmap_default;
-	clnt->cl_port     = xprt->addr.sin_port;
 	clnt->cl_prog     = program->number;
 	clnt->cl_vers     = version->number;
-	clnt->cl_prot     = xprt->prot;
 	clnt->cl_stats    = program->stats;
 	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
-	rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
 
-	if (!clnt->cl_port)
+	if (!xprt_bound(clnt->cl_xprt))
 		clnt->cl_autobind = 1;
 
 	clnt->cl_rtt = &clnt->cl_rtt_default;
@@ -191,40 +177,71 @@ out_no_path:
 		kfree(clnt->cl_server);
 	kfree(clnt);
 out_err:
-	xprt_destroy(xprt);
+	xprt_put(xprt);
 out_no_xprt:
 	return ERR_PTR(err);
 }
 
-/**
- * Create an RPC client
- * @xprt - pointer to xprt struct
- * @servname - name of server
- * @info - rpc_program
- * @version - rpc_program version
- * @authflavor - rpc_auth flavour to use
+/*
+ * rpc_create - create an RPC client and transport with one call
+ * @args: rpc_clnt create argument structure
  *
- * Creates an RPC client structure, then pings the server in order to
- * determine if it is up, and if it supports this program and version.
+ * Creates and initializes an RPC transport and an RPC client.
  *
- * This function should never be called by asynchronous tasks such as
- * the portmapper.
+ * It can ping the server in order to determine if it is up, and to see if
+ * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
+ * this behavior so asynchronous tasks can also use rpc_create.
  */
-struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
-		struct rpc_program *info, u32 version, rpc_authflavor_t authflavor)
+struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 {
+	struct rpc_xprt *xprt;
 	struct rpc_clnt *clnt;
-	int err;
-	
-	clnt = rpc_new_client(xprt, servname, info, version, authflavor);
+
+	xprt = xprt_create_transport(args->protocol, args->address,
+					args->addrsize, args->timeout);
+	if (IS_ERR(xprt))
+		return (struct rpc_clnt *)xprt;
+
+	/*
+	 * By default, kernel RPC client connects from a reserved port.
+	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
+	 * but it is always enabled for rpciod, which handles the connect
+	 * operation.
+	 */
+	xprt->resvport = 1;
+	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
+		xprt->resvport = 0;
+
+	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
+		args->program->name, args->servername, xprt);
+
+	clnt = rpc_new_client(xprt, args->servername, args->program,
+				args->version, args->authflavor);
 	if (IS_ERR(clnt))
 		return clnt;
-	err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
-	if (err == 0)
-		return clnt;
-	rpc_shutdown_client(clnt);
-	return ERR_PTR(err);
+
+	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
+		int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
+		if (err != 0) {
+			rpc_shutdown_client(clnt);
+			return ERR_PTR(err);
+		}
+	}
+
+	clnt->cl_softrtry = 1;
+	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
+		clnt->cl_softrtry = 0;
+
+	if (args->flags & RPC_CLNT_CREATE_INTR)
+		clnt->cl_intr = 1;
+	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
+		clnt->cl_autobind = 1;
+	if (args->flags & RPC_CLNT_CREATE_ONESHOT)
+		clnt->cl_oneshot = 1;
+
+	return clnt;
 }
+EXPORT_SYMBOL_GPL(rpc_create);
 
 /*
  * This function clones the RPC client structure. It allows us to share the
@@ -244,8 +261,7 @@ rpc_clone_client(struct rpc_clnt *clnt)
 	atomic_set(&new->cl_users, 0);
 	new->cl_parent = clnt;
 	atomic_inc(&clnt->cl_count);
-	/* Duplicate portmapper */
-	rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
+	new->cl_xprt = xprt_get(clnt->cl_xprt);
 	/* Turn off autobind on clones */
 	new->cl_autobind = 0;
 	new->cl_oneshot = 0;
@@ -255,8 +271,7 @@ rpc_clone_client(struct rpc_clnt *clnt)
 	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
 	if (new->cl_auth)
 		atomic_inc(&new->cl_auth->au_count);
-	new->cl_pmap		= &new->cl_pmap_default;
-	new->cl_metrics         = rpc_alloc_iostats(clnt);
+	new->cl_metrics = rpc_alloc_iostats(clnt);
 	return new;
 out_no_clnt:
 	printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
@@ -323,15 +338,12 @@ rpc_destroy_client(struct rpc_clnt *clnt)
 		rpc_rmdir(clnt->cl_dentry);
 		rpc_put_mount();
 	}
-	if (clnt->cl_xprt) {
-		xprt_destroy(clnt->cl_xprt);
-		clnt->cl_xprt = NULL;
-	}
 	if (clnt->cl_server != clnt->cl_inline_name)
 		kfree(clnt->cl_server);
 out_free:
 	rpc_free_iostats(clnt->cl_metrics);
 	clnt->cl_metrics = NULL;
+	xprt_put(clnt->cl_xprt);
 	kfree(clnt);
 	return 0;
 }
@@ -540,6 +552,40 @@ rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
 		task->tk_action = rpc_exit_task;
 }
 
+/**
+ * rpc_peeraddr - extract remote peer address from clnt's xprt
+ * @clnt: RPC client structure
+ * @buf: target buffer
+ * @size: length of target buffer
+ *
+ * Returns the number of bytes that are actually in the stored address.
+ */
+size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
+{
+	size_t bytes;
+	struct rpc_xprt *xprt = clnt->cl_xprt;
+
+	bytes = sizeof(xprt->addr);
+	if (bytes > bufsize)
+		bytes = bufsize;
+	memcpy(buf, &clnt->cl_xprt->addr, bytes);
+	return xprt->addrlen;
+}
+EXPORT_SYMBOL_GPL(rpc_peeraddr);
+
+/**
+ * rpc_peeraddr2str - return remote peer address in printable format
+ * @clnt: RPC client structure
+ * @format: address format
+ *
+ */
+char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
+{
+	struct rpc_xprt *xprt = clnt->cl_xprt;
+	return xprt->ops->print_addr(xprt, format);
+}
+EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
+
 void
 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
 {
@@ -560,7 +606,7 @@ size_t rpc_max_payload(struct rpc_clnt *clnt)
 {
 	return clnt->cl_xprt->max_payload;
 }
-EXPORT_SYMBOL(rpc_max_payload);
+EXPORT_SYMBOL_GPL(rpc_max_payload);
 
 /**
  * rpc_force_rebind - force transport to check that remote port is unchanged
@@ -570,9 +616,9 @@ EXPORT_SYMBOL(rpc_max_payload);
 void rpc_force_rebind(struct rpc_clnt *clnt)
 {
 	if (clnt->cl_autobind)
-		clnt->cl_port = 0;
+		xprt_clear_bound(clnt->cl_xprt);
 }
-EXPORT_SYMBOL(rpc_force_rebind);
+EXPORT_SYMBOL_GPL(rpc_force_rebind);
 
 /*
  * Restart an (async) RPC call. Usually called from within the
@@ -781,16 +827,16 @@ call_encode(struct rpc_task *task)
 static void
 call_bind(struct rpc_task *task)
 {
-	struct rpc_clnt	*clnt = task->tk_client;
+	struct rpc_xprt *xprt = task->tk_xprt;
 
 	dprintk("RPC: %4d call_bind (status %d)\n",
 				task->tk_pid, task->tk_status);
 
 	task->tk_action = call_connect;
-	if (!clnt->cl_port) {
+	if (!xprt_bound(xprt)) {
 		task->tk_action = call_bind_status;
-		task->tk_timeout = task->tk_xprt->bind_timeout;
-		rpc_getport(task, clnt);
+		task->tk_timeout = xprt->bind_timeout;
+		xprt->ops->rpcbind(task);
 	}
 }
 
@@ -815,15 +861,11 @@ call_bind_status(struct rpc_task *task)
 		dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n",
 				task->tk_pid);
 		rpc_delay(task, 3*HZ);
-		goto retry_bind;
+		goto retry_timeout;
 	case -ETIMEDOUT:
 		dprintk("RPC: %4d rpcbind request timed out\n",
 				task->tk_pid);
-		if (RPC_IS_SOFT(task)) {
-			status = -EIO;
-			break;
-		}
-		goto retry_bind;
+		goto retry_timeout;
 	case -EPFNOSUPPORT:
 		dprintk("RPC: %4d remote rpcbind service unavailable\n",
 				task->tk_pid);
@@ -836,16 +878,13 @@ call_bind_status(struct rpc_task *task)
 		dprintk("RPC: %4d unrecognized rpcbind error (%d)\n",
 				task->tk_pid, -task->tk_status);
 		status = -EIO;
-		break;
 	}
 
 	rpc_exit(task, status);
 	return;
 
-retry_bind:
-	task->tk_status = 0;
-	task->tk_action = call_bind;
-	return;
+retry_timeout:
+	task->tk_action = call_timeout;
 }
 
 /*
@@ -893,14 +932,16 @@ call_connect_status(struct rpc_task *task)
 
 	switch (status) {
 	case -ENOTCONN:
-	case -ETIMEDOUT:
 	case -EAGAIN:
 		task->tk_action = call_bind;
-		break;
-	default:
-		rpc_exit(task, -EIO);
-		break;
+		if (!RPC_IS_SOFT(task))
+			return;
+		/* if soft mounted, test if we've timed out */
+	case -ETIMEDOUT:
+		task->tk_action = call_timeout;
+		return;
 	}
+	rpc_exit(task, -EIO);
 }
 
 /*
@@ -982,6 +1023,14 @@ call_status(struct rpc_task *task)
 
 	task->tk_status = 0;
 	switch(status) {
+	case -EHOSTDOWN:
+	case -EHOSTUNREACH:
+	case -ENETUNREACH:
+		/*
+		 * Delay any retries for 3 seconds, then handle as if it
+		 * were a timeout.
+		 */
+		rpc_delay(task, 3*HZ);
 	case -ETIMEDOUT:
 		task->tk_action = call_timeout;
 		break;
@@ -1001,7 +1050,6 @@ call_status(struct rpc_task *task)
 		printk("%s: RPC call returned error %d\n",
 			       clnt->cl_protname, -status);
 		rpc_exit(task, status);
-		break;
 	}
 }
 
@@ -1069,10 +1117,10 @@ call_decode(struct rpc_task *task)
 			clnt->cl_stats->rpcretrans++;
 			goto out_retry;
 		}
-		printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
+		dprintk("%s: too small RPC reply size (%d bytes)\n",
 			clnt->cl_protname, task->tk_status);
-		rpc_exit(task, -EIO);
-		return;
+		task->tk_action = call_timeout;
+		goto out_retry;
 	}
 
 	/*
diff --git a/net/sunrpc/pmap_clnt.c b/net/sunrpc/pmap_clnt.c
index 623180f224c9..c04609d3476a 100644
--- a/net/sunrpc/pmap_clnt.c
+++ b/net/sunrpc/pmap_clnt.c
@@ -1,7 +1,9 @@
 /*
- * linux/net/sunrpc/pmap.c
+ * linux/net/sunrpc/pmap_clnt.c
  *
- * Portmapper client.
+ * In-kernel RPC portmapper client.
+ *
+ * Portmapper supports version 2 of the rpcbind protocol (RFC 1833).
  *
  * Copyright (C) 1996, Olaf Kirch <okir@monad.swb.de>
  */
@@ -13,7 +15,6 @@
 #include <linux/uio.h>
 #include <linux/in.h>
 #include <linux/sunrpc/clnt.h>
-#include <linux/sunrpc/xprt.h>
 #include <linux/sunrpc/sched.h>
 
 #ifdef RPC_DEBUG
@@ -24,80 +25,141 @@
 #define PMAP_UNSET		2
 #define PMAP_GETPORT		3
 
+struct portmap_args {
+	u32			pm_prog;
+	u32			pm_vers;
+	u32			pm_prot;
+	unsigned short		pm_port;
+	struct rpc_xprt *	pm_xprt;
+};
+
 static struct rpc_procinfo	pmap_procedures[];
 static struct rpc_clnt *	pmap_create(char *, struct sockaddr_in *, int, int);
-static void			pmap_getport_done(struct rpc_task *);
+static void			pmap_getport_done(struct rpc_task *, void *);
 static struct rpc_program	pmap_program;
-static DEFINE_SPINLOCK(pmap_lock);
 
-/*
- * Obtain the port for a given RPC service on a given host. This one can
- * be called for an ongoing RPC request.
- */
-void
-rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
+static void pmap_getport_prepare(struct rpc_task *task, void *calldata)
 {
-	struct rpc_portmap *map = clnt->cl_pmap;
-	struct sockaddr_in *sap = &clnt->cl_xprt->addr;
+	struct portmap_args *map = calldata;
 	struct rpc_message msg = {
 		.rpc_proc	= &pmap_procedures[PMAP_GETPORT],
 		.rpc_argp	= map,
-		.rpc_resp	= &clnt->cl_port,
-		.rpc_cred	= NULL
+		.rpc_resp	= &map->pm_port,
 	};
+
+	rpc_call_setup(task, &msg, 0);
+}
+
+static inline struct portmap_args *pmap_map_alloc(void)
+{
+	return kmalloc(sizeof(struct portmap_args), GFP_NOFS);
+}
+
+static inline void pmap_map_free(struct portmap_args *map)
+{
+	kfree(map);
+}
+
+static void pmap_map_release(void *data)
+{
+	pmap_map_free(data);
+}
+
+static const struct rpc_call_ops pmap_getport_ops = {
+	.rpc_call_prepare	= pmap_getport_prepare,
+	.rpc_call_done		= pmap_getport_done,
+	.rpc_release		= pmap_map_release,
+};
+
+static inline void pmap_wake_portmap_waiters(struct rpc_xprt *xprt, int status)
+{
+	xprt_clear_binding(xprt);
+	rpc_wake_up_status(&xprt->binding, status);
+}
+
+/**
+ * rpc_getport - obtain the port for a given RPC service on a given host
+ * @task: task that is waiting for portmapper request
+ *
+ * This one can be called for an ongoing RPC request, and can be used in
+ * an async (rpciod) context.
+ */
+void rpc_getport(struct rpc_task *task)
+{
+	struct rpc_clnt *clnt = task->tk_client;
+	struct rpc_xprt *xprt = task->tk_xprt;
+	struct sockaddr_in addr;
+	struct portmap_args *map;
 	struct rpc_clnt	*pmap_clnt;
-	struct rpc_task	*child;
+	struct rpc_task *child;
+	int status;
 
-	dprintk("RPC: %4d rpc_getport(%s, %d, %d, %d)\n",
+	dprintk("RPC: %4d rpc_getport(%s, %u, %u, %d)\n",
 			task->tk_pid, clnt->cl_server,
-			map->pm_prog, map->pm_vers, map->pm_prot);
+			clnt->cl_prog, clnt->cl_vers, xprt->prot);
 
 	/* Autobind on cloned rpc clients is discouraged */
 	BUG_ON(clnt->cl_parent != clnt);
 
-	spin_lock(&pmap_lock);
-	if (map->pm_binding) {
-		rpc_sleep_on(&map->pm_bindwait, task, NULL, NULL);
-		spin_unlock(&pmap_lock);
+	if (xprt_test_and_set_binding(xprt)) {
+		task->tk_status = -EACCES;	/* tell caller to check again */
+		rpc_sleep_on(&xprt->binding, task, NULL, NULL);
 		return;
 	}
-	map->pm_binding = 1;
-	spin_unlock(&pmap_lock);
 
-	pmap_clnt = pmap_create(clnt->cl_server, sap, map->pm_prot, 0);
-	if (IS_ERR(pmap_clnt)) {
-		task->tk_status = PTR_ERR(pmap_clnt);
+	/* Someone else may have bound if we slept */
+	status = 0;
+	if (xprt_bound(xprt))
+		goto bailout_nofree;
+
+	status = -ENOMEM;
+	map = pmap_map_alloc();
+	if (!map)
+		goto bailout_nofree;
+	map->pm_prog = clnt->cl_prog;
+	map->pm_vers = clnt->cl_vers;
+	map->pm_prot = xprt->prot;
+	map->pm_port = 0;
+	map->pm_xprt = xprt_get(xprt);
+
+	rpc_peeraddr(clnt, (struct sockaddr *) &addr, sizeof(addr));
+	pmap_clnt = pmap_create(clnt->cl_server, &addr, map->pm_prot, 0);
+	status = PTR_ERR(pmap_clnt);
+	if (IS_ERR(pmap_clnt))
 		goto bailout;
-	}
-	task->tk_status = 0;
 
-	/*
-	 * Note: rpc_new_child will release client after a failure.
-	 */
-	if (!(child = rpc_new_child(pmap_clnt, task)))
+	status = -EIO;
+	child = rpc_run_task(pmap_clnt, RPC_TASK_ASYNC, &pmap_getport_ops, map);
+	if (IS_ERR(child))
 		goto bailout;
+	rpc_release_task(child);
 
-	/* Setup the call info struct */
-	rpc_call_setup(child, &msg, 0);
+	rpc_sleep_on(&xprt->binding, task, NULL, NULL);
 
-	/* ... and run the child task */
 	task->tk_xprt->stat.bind_count++;
-	rpc_run_child(task, child, pmap_getport_done);
 	return;
 
 bailout:
-	spin_lock(&pmap_lock);
-	map->pm_binding = 0;
-	rpc_wake_up(&map->pm_bindwait);
-	spin_unlock(&pmap_lock);
-	rpc_exit(task, -EIO);
+	pmap_map_free(map);
+	xprt_put(xprt);
+bailout_nofree:
+	task->tk_status = status;
+	pmap_wake_portmap_waiters(xprt, status);
 }
 
 #ifdef CONFIG_ROOT_NFS
-int
-rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
+/**
+ * rpc_getport_external - obtain the port for a given RPC service on a given host
+ * @sin: address of remote peer
+ * @prog: RPC program number to bind
+ * @vers: RPC version number to bind
+ * @prot: transport protocol to use to make this request
+ *
+ * This one is called from outside the RPC client in a synchronous task context.
+ */
+int rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
 {
-	struct rpc_portmap map = {
+	struct portmap_args map = {
 		.pm_prog	= prog,
 		.pm_vers	= vers,
 		.pm_prot	= prot,
@@ -112,7 +174,7 @@ rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
 	char		hostname[32];
 	int		status;
 
-	dprintk("RPC:      rpc_getport_external(%u.%u.%u.%u, %d, %d, %d)\n",
+	dprintk("RPC:      rpc_getport_external(%u.%u.%u.%u, %u, %u, %d)\n",
 			NIPQUAD(sin->sin_addr.s_addr), prog, vers, prot);
 
 	sprintf(hostname, "%u.%u.%u.%u", NIPQUAD(sin->sin_addr.s_addr));
@@ -132,45 +194,53 @@ rpc_getport_external(struct sockaddr_in *sin, __u32 prog, __u32 vers, int prot)
 }
 #endif
 
-static void
-pmap_getport_done(struct rpc_task *task)
+/*
+ * Portmapper child task invokes this callback via tk_exit.
+ */
+static void pmap_getport_done(struct rpc_task *child, void *data)
 {
-	struct rpc_clnt	*clnt = task->tk_client;
-	struct rpc_xprt *xprt = task->tk_xprt;
-	struct rpc_portmap *map = clnt->cl_pmap;
-
-	dprintk("RPC: %4d pmap_getport_done(status %d, port %d)\n",
-			task->tk_pid, task->tk_status, clnt->cl_port);
-
-	xprt->ops->set_port(xprt, 0);
-	if (task->tk_status < 0) {
-		/* Make the calling task exit with an error */
-		task->tk_action = rpc_exit_task;
-	} else if (clnt->cl_port == 0) {
-		/* Program not registered */
-		rpc_exit(task, -EACCES);
+	struct portmap_args *map = data;
+	struct rpc_xprt *xprt = map->pm_xprt;
+	int status = child->tk_status;
+
+	if (status < 0) {
+		/* Portmapper not available */
+		xprt->ops->set_port(xprt, 0);
+	} else if (map->pm_port == 0) {
+		/* Requested RPC service wasn't registered */
+		xprt->ops->set_port(xprt, 0);
+		status = -EACCES;
 	} else {
-		xprt->ops->set_port(xprt, clnt->cl_port);
-		clnt->cl_port = htons(clnt->cl_port);
+		/* Succeeded */
+		xprt->ops->set_port(xprt, map->pm_port);
+		xprt_set_bound(xprt);
+		status = 0;
 	}
-	spin_lock(&pmap_lock);
-	map->pm_binding = 0;
-	rpc_wake_up(&map->pm_bindwait);
-	spin_unlock(&pmap_lock);
+
+	dprintk("RPC: %4d pmap_getport_done(status %d, port %u)\n",
+			child->tk_pid, status, map->pm_port);
+
+	pmap_wake_portmap_waiters(xprt, status);
+	xprt_put(xprt);
 }
 
-/*
- * Set or unset a port registration with the local portmapper.
+/**
+ * rpc_register - set or unset a port registration with the local portmapper
+ * @prog: RPC program number to bind
+ * @vers: RPC version number to bind
+ * @prot: transport protocol to use to make this request
+ * @port: port value to register
+ * @okay: result code
+ *
  * port == 0 means unregister, port != 0 means register.
  */
-int
-rpc_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay)
+int rpc_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay)
 {
 	struct sockaddr_in	sin = {
 		.sin_family	= AF_INET,
 		.sin_addr.s_addr = htonl(INADDR_LOOPBACK),
 	};
-	struct rpc_portmap	map = {
+	struct portmap_args	map = {
 		.pm_prog	= prog,
 		.pm_vers	= vers,
 		.pm_prot	= prot,
@@ -184,7 +254,7 @@ rpc_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay)
 	struct rpc_clnt		*pmap_clnt;
 	int error = 0;
 
-	dprintk("RPC: registering (%d, %d, %d, %d) with portmapper.\n",
+	dprintk("RPC: registering (%u, %u, %d, %u) with portmapper.\n",
 			prog, vers, prot, port);
 
 	pmap_clnt = pmap_create("localhost", &sin, IPPROTO_UDP, 1);
@@ -207,38 +277,32 @@ rpc_register(u32 prog, u32 vers, int prot, unsigned short port, int *okay)
 	return error;
 }
 
-static struct rpc_clnt *
-pmap_create(char *hostname, struct sockaddr_in *srvaddr, int proto, int privileged)
+static struct rpc_clnt *pmap_create(char *hostname, struct sockaddr_in *srvaddr, int proto, int privileged)
 {
-	struct rpc_xprt	*xprt;
-	struct rpc_clnt	*clnt;
-
-	/* printk("pmap: create xprt\n"); */
-	xprt = xprt_create_proto(proto, srvaddr, NULL);
-	if (IS_ERR(xprt))
-		return (struct rpc_clnt *)xprt;
-	xprt->ops->set_port(xprt, RPC_PMAP_PORT);
+	struct rpc_create_args args = {
+		.protocol	= proto,
+		.address	= (struct sockaddr *)srvaddr,
+		.addrsize	= sizeof(*srvaddr),
+		.servername	= hostname,
+		.program	= &pmap_program,
+		.version	= RPC_PMAP_VERSION,
+		.authflavor	= RPC_AUTH_UNIX,
+		.flags		= (RPC_CLNT_CREATE_ONESHOT |
+				   RPC_CLNT_CREATE_NOPING),
+	};
+
+	srvaddr->sin_port = htons(RPC_PMAP_PORT);
 	if (!privileged)
-		xprt->resvport = 0;
-
-	/* printk("pmap: create clnt\n"); */
-	clnt = rpc_new_client(xprt, hostname,
-				&pmap_program, RPC_PMAP_VERSION,
-				RPC_AUTH_UNIX);
-	if (!IS_ERR(clnt)) {
-		clnt->cl_softrtry = 1;
-		clnt->cl_oneshot  = 1;
-	}
-	return clnt;
+		args.flags |= RPC_CLNT_CREATE_NONPRIVPORT;
+	return rpc_create(&args);
 }
 
 /*
  * XDR encode/decode functions for PMAP
  */
-static int
-xdr_encode_mapping(struct rpc_rqst *req, u32 *p, struct rpc_portmap *map)
+static int xdr_encode_mapping(struct rpc_rqst *req, u32 *p, struct portmap_args *map)
 {
-	dprintk("RPC: xdr_encode_mapping(%d, %d, %d, %d)\n",
+	dprintk("RPC: xdr_encode_mapping(%u, %u, %u, %u)\n",
 		map->pm_prog, map->pm_vers, map->pm_prot, map->pm_port);
 	*p++ = htonl(map->pm_prog);
 	*p++ = htonl(map->pm_vers);
@@ -249,15 +313,13 @@ xdr_encode_mapping(struct rpc_rqst *req, u32 *p, struct rpc_portmap *map)
 	return 0;
 }
 
-static int
-xdr_decode_port(struct rpc_rqst *req, u32 *p, unsigned short *portp)
+static int xdr_decode_port(struct rpc_rqst *req, u32 *p, unsigned short *portp)
 {
 	*portp = (unsigned short) ntohl(*p++);
 	return 0;
 }
 
-static int
-xdr_decode_bool(struct rpc_rqst *req, u32 *p, unsigned int *boolp)
+static int xdr_decode_bool(struct rpc_rqst *req, u32 *p, unsigned int *boolp)
 {
 	*boolp = (unsigned int) ntohl(*p++);
 	return 0;
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 0b1a1ac8a4bc..dfa504fe383f 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -327,10 +327,8 @@ rpc_show_info(struct seq_file *m, void *v)
 	seq_printf(m, "RPC server: %s\n", clnt->cl_server);
 	seq_printf(m, "service: %s (%d) version %d\n", clnt->cl_protname,
 			clnt->cl_prog, clnt->cl_vers);
-	seq_printf(m, "address: %u.%u.%u.%u\n",
-			NIPQUAD(clnt->cl_xprt->addr.sin_addr.s_addr));
-	seq_printf(m, "protocol: %s\n",
-			clnt->cl_xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
+	seq_printf(m, "address: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
+	seq_printf(m, "protocol: %s\n", rpc_peeraddr2str(clnt, RPC_DISPLAY_PROTO));
 	return 0;
 }
 
@@ -623,17 +621,13 @@ __rpc_rmdir(struct inode *dir, struct dentry *dentry)
 }
 
 static struct dentry *
-rpc_lookup_negative(char *path, struct nameidata *nd)
+rpc_lookup_create(struct dentry *parent, const char *name, int len)
 {
+	struct inode *dir = parent->d_inode;
 	struct dentry *dentry;
-	struct inode *dir;
-	int error;
 
-	if ((error = rpc_lookup_parent(path, nd)) != 0)
-		return ERR_PTR(error);
-	dir = nd->dentry->d_inode;
 	mutex_lock_nested(&dir->i_mutex, I_MUTEX_PARENT);
-	dentry = lookup_one_len(nd->last.name, nd->dentry, nd->last.len);
+	dentry = lookup_one_len(name, parent, len);
 	if (IS_ERR(dentry))
 		goto out_err;
 	if (dentry->d_inode) {
@@ -644,7 +638,20 @@ rpc_lookup_negative(char *path, struct nameidata *nd)
 	return dentry;
 out_err:
 	mutex_unlock(&dir->i_mutex);
-	rpc_release_path(nd);
+	return dentry;
+}
+
+static struct dentry *
+rpc_lookup_negative(char *path, struct nameidata *nd)
+{
+	struct dentry *dentry;
+	int error;
+
+	if ((error = rpc_lookup_parent(path, nd)) != 0)
+		return ERR_PTR(error);
+	dentry = rpc_lookup_create(nd->dentry, nd->last.name, nd->last.len);
+	if (IS_ERR(dentry))
+		rpc_release_path(nd);
 	return dentry;
 }
 
@@ -703,18 +710,17 @@ rpc_rmdir(struct dentry *dentry)
 }
 
 struct dentry *
-rpc_mkpipe(char *path, void *private, struct rpc_pipe_ops *ops, int flags)
+rpc_mkpipe(struct dentry *parent, const char *name, void *private, struct rpc_pipe_ops *ops, int flags)
 {
-	struct nameidata nd;
 	struct dentry *dentry;
 	struct inode *dir, *inode;
 	struct rpc_inode *rpci;
 
-	dentry = rpc_lookup_negative(path, &nd);
+	dentry = rpc_lookup_create(parent, name, strlen(name));
 	if (IS_ERR(dentry))
 		return dentry;
-	dir = nd.dentry->d_inode;
-	inode = rpc_get_inode(dir->i_sb, S_IFSOCK | S_IRUSR | S_IWUSR);
+	dir = parent->d_inode;
+	inode = rpc_get_inode(dir->i_sb, S_IFIFO | S_IRUSR | S_IWUSR);
 	if (!inode)
 		goto err_dput;
 	inode->i_ino = iunique(dir->i_sb, 100);
@@ -728,13 +734,13 @@ rpc_mkpipe(char *path, void *private, struct rpc_pipe_ops *ops, int flags)
 	dget(dentry);
 out:
 	mutex_unlock(&dir->i_mutex);
-	rpc_release_path(&nd);
 	return dentry;
 err_dput:
 	dput(dentry);
 	dentry = ERR_PTR(-ENOMEM);
-	printk(KERN_WARNING "%s: %s() failed to create pipe %s (errno = %d)\n",
-			__FILE__, __FUNCTION__, path, -ENOMEM);
+	printk(KERN_WARNING "%s: %s() failed to create pipe %s/%s (errno = %d)\n",
+			__FILE__, __FUNCTION__, parent->d_name.name, name,
+			-ENOMEM);
 	goto out;
 }
 
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 5c3eee768504..6390461a9756 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -21,7 +21,6 @@
 #include <linux/mutex.h>
 
 #include <linux/sunrpc/clnt.h>
-#include <linux/sunrpc/xprt.h>
 
 #ifdef RPC_DEBUG
 #define RPCDBG_FACILITY		RPCDBG_SCHED
@@ -45,12 +44,6 @@ static void			rpciod_killall(void);
 static void			rpc_async_schedule(void *);
 
 /*
- * RPC tasks that create another task (e.g. for contacting the portmapper)
- * will wait on this queue for their child's completion
- */
-static RPC_WAITQ(childq, "childq");
-
-/*
  * RPC tasks sit here while waiting for conditions to improve.
  */
 static RPC_WAITQ(delay_queue, "delayq");
@@ -324,16 +317,6 @@ static void rpc_make_runnable(struct rpc_task *task)
 }
 
 /*
- * Place a newly initialized task on the workqueue.
- */
-static inline void
-rpc_schedule_run(struct rpc_task *task)
-{
-	rpc_set_active(task);
-	rpc_make_runnable(task);
-}
-
-/*
  * Prepare for sleeping on a wait queue.
  * By always appending tasks to the list we ensure FIFO behavior.
  * NB: An RPC task will only receive interrupt-driven events as long
@@ -559,24 +542,20 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 	spin_unlock_bh(&queue->lock);
 }
 
+static void __rpc_atrun(struct rpc_task *task)
+{
+	rpc_wake_up_task(task);
+}
+
 /*
  * Run a task at a later time
  */
-static void	__rpc_atrun(struct rpc_task *);
-void
-rpc_delay(struct rpc_task *task, unsigned long delay)
+void rpc_delay(struct rpc_task *task, unsigned long delay)
 {
 	task->tk_timeout = delay;
 	rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
 }
 
-static void
-__rpc_atrun(struct rpc_task *task)
-{
-	task->tk_status = 0;
-	rpc_wake_up_task(task);
-}
-
 /*
  * Helper to call task->tk_ops->rpc_call_prepare
  */
@@ -933,72 +912,6 @@ struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
 }
 EXPORT_SYMBOL(rpc_run_task);
 
-/**
- * rpc_find_parent - find the parent of a child task.
- * @child: child task
- * @parent: parent task
- *
- * Checks that the parent task is still sleeping on the
- * queue 'childq'. If so returns a pointer to the parent.
- * Upon failure returns NULL.
- *
- * Caller must hold childq.lock
- */
-static inline struct rpc_task *rpc_find_parent(struct rpc_task *child, struct rpc_task *parent)
-{
-	struct rpc_task	*task;
-	struct list_head *le;
-
-	task_for_each(task, le, &childq.tasks[0])
-		if (task == parent)
-			return parent;
-
-	return NULL;
-}
-
-static void rpc_child_exit(struct rpc_task *child, void *calldata)
-{
-	struct rpc_task	*parent;
-
-	spin_lock_bh(&childq.lock);
-	if ((parent = rpc_find_parent(child, calldata)) != NULL) {
-		parent->tk_status = child->tk_status;
-		__rpc_wake_up_task(parent);
-	}
-	spin_unlock_bh(&childq.lock);
-}
-
-static const struct rpc_call_ops rpc_child_ops = {
-	.rpc_call_done = rpc_child_exit,
-};
-
-/*
- * Note: rpc_new_task releases the client after a failure.
- */
-struct rpc_task *
-rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
-{
-	struct rpc_task	*task;
-
-	task = rpc_new_task(clnt, RPC_TASK_ASYNC | RPC_TASK_CHILD, &rpc_child_ops, parent);
-	if (!task)
-		goto fail;
-	return task;
-
-fail:
-	parent->tk_status = -ENOMEM;
-	return NULL;
-}
-
-void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
-{
-	spin_lock_bh(&childq.lock);
-	/* N.B. Is it possible for the child to have already finished? */
-	__rpc_sleep_on(&childq, task, func, NULL);
-	rpc_schedule_run(child);
-	spin_unlock_bh(&childq.lock);
-}
-
 /*
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index f38f939ce95f..26c0531d7e25 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -36,8 +36,6 @@ EXPORT_SYMBOL(rpc_wake_up_status);
 EXPORT_SYMBOL(rpc_release_task);
 
 /* RPC client functions */
-EXPORT_SYMBOL(rpc_create_client);
-EXPORT_SYMBOL(rpc_new_client);
 EXPORT_SYMBOL(rpc_clone_client);
 EXPORT_SYMBOL(rpc_bind_new_program);
 EXPORT_SYMBOL(rpc_destroy_client);
@@ -57,7 +55,6 @@ EXPORT_SYMBOL(rpc_queue_upcall);
 EXPORT_SYMBOL(rpc_mkpipe);
 
 /* Client transport */
-EXPORT_SYMBOL(xprt_create_proto);
 EXPORT_SYMBOL(xprt_set_timeout);
 
 /* Client credential cache */
diff --git a/net/sunrpc/timer.c b/net/sunrpc/timer.c
index bcbdf6430d5c..8142fdb8a930 100644
--- a/net/sunrpc/timer.c
+++ b/net/sunrpc/timer.c
@@ -19,8 +19,6 @@
 #include <linux/unistd.h>
 
 #include <linux/sunrpc/clnt.h>
-#include <linux/sunrpc/xprt.h>
-#include <linux/sunrpc/timer.h>
 
 #define RPC_RTO_MAX (60*HZ)
 #define RPC_RTO_INIT (HZ/5)
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index e8c2bc4977f3..1f786f68729d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -534,7 +534,7 @@ void xprt_connect(struct rpc_task *task)
 	dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
 			xprt, (xprt_connected(xprt) ? "is" : "is not"));
 
-	if (!xprt->addr.sin_port) {
+	if (!xprt_bound(xprt)) {
 		task->tk_status = -EIO;
 		return;
 	}
@@ -585,13 +585,6 @@ static void xprt_connect_status(struct rpc_task *task)
 				task->tk_pid, -task->tk_status, task->tk_client->cl_server);
 		xprt_release_write(xprt, task);
 		task->tk_status = -EIO;
-		return;
-	}
-
-	/* if soft mounted, just cause this RPC to fail */
-	if (RPC_IS_SOFT(task)) {
-		xprt_release_write(xprt, task);
-		task->tk_status = -EIO;
 	}
 }
 
@@ -829,6 +822,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 	req->rq_bufsize = 0;
 	req->rq_xid     = xprt_alloc_xid(xprt);
 	req->rq_release_snd_buf = NULL;
+	xprt_reset_majortimeo(req);
 	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
 			req, ntohl(req->rq_xid));
 }
@@ -887,16 +881,32 @@ void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long i
 	to->to_exponential = 0;
 }
 
-static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
+/**
+ * xprt_create_transport - create an RPC transport
+ * @proto: requested transport protocol
+ * @ap: remote peer address
+ * @size: length of address
+ * @to: timeout parameters
+ *
+ */
+struct rpc_xprt *xprt_create_transport(int proto, struct sockaddr *ap, size_t size, struct rpc_timeout *to)
 {
 	int result;
 	struct rpc_xprt	*xprt;
 	struct rpc_rqst	*req;
 
-	if ((xprt = kzalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
+	if ((xprt = kzalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) {
+		dprintk("RPC:      xprt_create_transport: no memory\n");
 		return ERR_PTR(-ENOMEM);
-
-	xprt->addr = *ap;
+	}
+	if (size <= sizeof(xprt->addr)) {
+		memcpy(&xprt->addr, ap, size);
+		xprt->addrlen = size;
+	} else {
+		kfree(xprt);
+		dprintk("RPC:      xprt_create_transport: address too large\n");
+		return ERR_PTR(-EBADF);
+	}
 
 	switch (proto) {
 	case IPPROTO_UDP:
@@ -908,14 +918,15 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
 	default:
 		printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n",
 				proto);
-		result = -EIO;
-		break;
+		return ERR_PTR(-EIO);
 	}
 	if (result) {
 		kfree(xprt);
+		dprintk("RPC:      xprt_create_transport: failed, %d\n", result);
 		return ERR_PTR(result);
 	}
 
+	kref_init(&xprt->kref);
 	spin_lock_init(&xprt->transport_lock);
 	spin_lock_init(&xprt->reserve_lock);
 
@@ -928,6 +939,7 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
 	xprt->last_used = jiffies;
 	xprt->cwnd = RPC_INITCWND;
 
+	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
 	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
 	rpc_init_wait_queue(&xprt->sending, "xprt_sending");
 	rpc_init_wait_queue(&xprt->resend, "xprt_resend");
@@ -941,41 +953,43 @@ static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc
 
 	dprintk("RPC:      created transport %p with %u slots\n", xprt,
 			xprt->max_reqs);
-	
-	return xprt;
-}
 
-/**
- * xprt_create_proto - create an RPC client transport
- * @proto: requested transport protocol
- * @sap: remote peer's address
- * @to: timeout parameters for new transport
- *
- */
-struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
-{
-	struct rpc_xprt	*xprt;
-
-	xprt = xprt_setup(proto, sap, to);
-	if (IS_ERR(xprt))
-		dprintk("RPC:      xprt_create_proto failed\n");
-	else
-		dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);
 	return xprt;
 }
 
 /**
  * xprt_destroy - destroy an RPC transport, killing off all requests.
- * @xprt: transport to destroy
+ * @kref: kref for the transport to destroy
  *
  */
-int xprt_destroy(struct rpc_xprt *xprt)
+static void xprt_destroy(struct kref *kref)
 {
+	struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
+
 	dprintk("RPC:      destroying transport %p\n", xprt);
 	xprt->shutdown = 1;
 	del_timer_sync(&xprt->timer);
 	xprt->ops->destroy(xprt);
 	kfree(xprt);
+}
 
-	return 0;
+/**
+ * xprt_put - release a reference to an RPC transport.
+ * @xprt: pointer to the transport
+ *
+ */
+void xprt_put(struct rpc_xprt *xprt)
+{
+	kref_put(&xprt->kref, xprt_destroy);
+}
+
+/**
+ * xprt_get - return a reference to an RPC transport.
+ * @xprt: pointer to the transport
+ *
+ */
+struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
+{
+	kref_get(&xprt->kref);
+	return xprt;
 }
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 897bdd982315..9b62923a9c06 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -125,6 +125,47 @@ static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
 }
 #endif
 
+static void xs_format_peer_addresses(struct rpc_xprt *xprt)
+{
+	struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
+	char *buf;
+
+	buf = kzalloc(20, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 20, "%u.%u.%u.%u",
+				NIPQUAD(addr->sin_addr.s_addr));
+	}
+	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
+
+	buf = kzalloc(8, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 8, "%u",
+				ntohs(addr->sin_port));
+	}
+	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
+
+	if (xprt->prot == IPPROTO_UDP)
+		xprt->address_strings[RPC_DISPLAY_PROTO] = "udp";
+	else
+		xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp";
+
+	buf = kzalloc(48, GFP_KERNEL);
+	if (buf) {
+		snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s",
+			NIPQUAD(addr->sin_addr.s_addr),
+			ntohs(addr->sin_port),
+			xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
+	}
+	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
+}
+
+static void xs_free_peer_addresses(struct rpc_xprt *xprt)
+{
+	kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
+	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
+	kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
+}
+
 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
 
 static inline int xs_send_head(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, unsigned int len)
@@ -295,7 +336,7 @@ static int xs_udp_send_request(struct rpc_task *task)
 
 	req->rq_xtime = jiffies;
 	status = xs_sendpages(xprt->sock, (struct sockaddr *) &xprt->addr,
-				sizeof(xprt->addr), xdr, req->rq_bytes_sent);
+				xprt->addrlen, xdr, req->rq_bytes_sent);
 
 	dprintk("RPC:      xs_udp_send_request(%u) = %d\n",
 			xdr->len - req->rq_bytes_sent, status);
@@ -485,6 +526,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
 
 	xprt_disconnect(xprt);
 	xs_close(xprt);
+	xs_free_peer_addresses(xprt);
 	kfree(xprt->slot);
 }
 
@@ -960,6 +1002,19 @@ static unsigned short xs_get_random_port(void)
 }
 
 /**
+ * xs_print_peer_address - format an IPv4 address for printing
+ * @xprt: generic transport
+ * @format: flags field indicating which parts of the address to render
+ */
+static char *xs_print_peer_address(struct rpc_xprt *xprt, enum rpc_display_format_t format)
+{
+	if (xprt->address_strings[format] != NULL)
+		return xprt->address_strings[format];
+	else
+		return "unprintable";
+}
+
+/**
  * xs_set_port - reset the port number in the remote endpoint address
  * @xprt: generic transport
  * @port: new port number
@@ -967,8 +1022,11 @@ static unsigned short xs_get_random_port(void)
  */
 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
 {
+	struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;
+
 	dprintk("RPC:      setting port for xprt %p to %u\n", xprt, port);
-	xprt->addr.sin_port = htons(port);
+
+	sap->sin_port = htons(port);
 }
 
 static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
@@ -1011,11 +1069,9 @@ static void xs_udp_connect_worker(void *args)
 	struct socket *sock = xprt->sock;
 	int err, status = -EIO;
 
-	if (xprt->shutdown || xprt->addr.sin_port == 0)
+	if (xprt->shutdown || !xprt_bound(xprt))
 		goto out;
 
-	dprintk("RPC:      xs_udp_connect_worker for xprt %p\n", xprt);
-
 	/* Start by resetting any existing state */
 	xs_close(xprt);
 
@@ -1029,6 +1085,9 @@ static void xs_udp_connect_worker(void *args)
 		goto out;
 	}
 
+	dprintk("RPC:      worker connecting xprt %p to address: %s\n",
+			xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
+
 	if (!xprt->inet) {
 		struct sock *sk = sock->sk;
 
@@ -1094,11 +1153,9 @@ static void xs_tcp_connect_worker(void *args)
 	struct socket *sock = xprt->sock;
 	int err, status = -EIO;
 
-	if (xprt->shutdown || xprt->addr.sin_port == 0)
+	if (xprt->shutdown || !xprt_bound(xprt))
 		goto out;
 
-	dprintk("RPC:      xs_tcp_connect_worker for xprt %p\n", xprt);
-
 	if (!xprt->sock) {
 		/* start from scratch */
 		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
@@ -1114,6 +1171,9 @@ static void xs_tcp_connect_worker(void *args)
 		/* "close" the socket, preserving the local port */
 		xs_tcp_reuse_connection(xprt);
 
+	dprintk("RPC:      worker connecting xprt %p to address: %s\n",
+			xprt, xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
+
 	if (!xprt->inet) {
 		struct sock *sk = sock->sk;
 
@@ -1147,7 +1207,7 @@ static void xs_tcp_connect_worker(void *args)
 	xprt->stat.connect_count++;
 	xprt->stat.connect_start = jiffies;
 	status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
-			sizeof(xprt->addr), O_NONBLOCK);
+			xprt->addrlen, O_NONBLOCK);
 	dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
 			xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
 	if (status < 0) {
@@ -1255,8 +1315,10 @@ static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
 
 static struct rpc_xprt_ops xs_udp_ops = {
 	.set_buffer_size	= xs_udp_set_buffer_size,
+	.print_addr		= xs_print_peer_address,
 	.reserve_xprt		= xprt_reserve_xprt_cong,
 	.release_xprt		= xprt_release_xprt_cong,
+	.rpcbind		= rpc_getport,
 	.set_port		= xs_set_port,
 	.connect		= xs_connect,
 	.buf_alloc		= rpc_malloc,
@@ -1271,8 +1333,10 @@ static struct rpc_xprt_ops xs_udp_ops = {
 };
 
 static struct rpc_xprt_ops xs_tcp_ops = {
+	.print_addr		= xs_print_peer_address,
 	.reserve_xprt		= xprt_reserve_xprt,
 	.release_xprt		= xs_tcp_release_xprt,
+	.rpcbind		= rpc_getport,
 	.set_port		= xs_set_port,
 	.connect		= xs_connect,
 	.buf_alloc		= rpc_malloc,
@@ -1293,8 +1357,7 @@ static struct rpc_xprt_ops xs_tcp_ops = {
 int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 {
 	size_t slot_table_size;
-
-	dprintk("RPC:      setting up udp-ipv4 transport...\n");
+	struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
 
 	xprt->max_reqs = xprt_udp_slot_table_entries;
 	slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
@@ -1302,10 +1365,12 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 	if (xprt->slot == NULL)
 		return -ENOMEM;
 
-	xprt->prot = IPPROTO_UDP;
+	if (ntohs(addr->sin_port != 0))
+		xprt_set_bound(xprt);
 	xprt->port = xs_get_random_port();
+
+	xprt->prot = IPPROTO_UDP;
 	xprt->tsh_size = 0;
-	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
 	/* XXX: header size can vary due to auth type, IPv6, etc. */
 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
@@ -1322,6 +1387,10 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 	else
 		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
 
+	xs_format_peer_addresses(xprt);
+	dprintk("RPC:      set up transport to address %s\n",
+			xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
+
 	return 0;
 }
 
@@ -1334,8 +1403,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 {
 	size_t slot_table_size;
-
-	dprintk("RPC:      setting up tcp-ipv4 transport...\n");
+	struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
 
 	xprt->max_reqs = xprt_tcp_slot_table_entries;
 	slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]);
@@ -1343,10 +1411,12 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 	if (xprt->slot == NULL)
 		return -ENOMEM;
 
-	xprt->prot = IPPROTO_TCP;
+	if (ntohs(addr->sin_port) != 0)
+		xprt_set_bound(xprt);
 	xprt->port = xs_get_random_port();
+
+	xprt->prot = IPPROTO_TCP;
 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
-	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
 
 	INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
@@ -1362,5 +1432,9 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
 	else
 		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
 
+	xs_format_peer_addresses(xprt);
+	dprintk("RPC:      set up transport to address %s\n",
+			xs_print_peer_address(xprt, RPC_DISPLAY_ALL));
+
 	return 0;
 }