summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorTrond Myklebust <Trond.Myklebust@netapp.com>2008-10-15 15:54:56 -0400
committerTrond Myklebust <Trond.Myklebust@netapp.com>2008-10-15 15:54:56 -0400
commit6925bac120097b823fc990c143b9789c21cc60b5 (patch)
tree0f92e1b22122fba623aeff4c271c16df673244eb /net
parent04ab591808565f968d4406f6435090ad671ebdab (diff)
parent011935a0a710c20bb7ae63523b78856848db1926 (diff)
downloadlinux-6925bac120097b823fc990c143b9789c21cc60b5.tar.gz
Merge branch 'next'
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/clnt.c4
-rw-r--r--net/sunrpc/rpcb_clnt.c40
-rw-r--r--net/sunrpc/xprt.c12
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c29
-rw-r--r--net/sunrpc/xprtrdma/transport.c41
-rw-r--r--net/sunrpc/xprtrdma/verbs.c741
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h17
7 files changed, 616 insertions, 268 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index da0789fa1b88..4895c341e46d 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -213,10 +213,10 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, stru
 	}
 
 	/* save the nodename */
-	clnt->cl_nodelen = strlen(utsname()->nodename);
+	clnt->cl_nodelen = strlen(init_utsname()->nodename);
 	if (clnt->cl_nodelen > UNX_MAXNODENAME)
 		clnt->cl_nodelen = UNX_MAXNODENAME;
-	memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
+	memcpy(clnt->cl_nodename, init_utsname()->nodename, clnt->cl_nodelen);
 	rpc_register_client(clnt);
 	return clnt;
 
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 34abc91058d8..41013dd66ac3 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -460,6 +460,28 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi
 	return rpc_run_task(&task_setup_data);
 }
 
+/*
+ * In the case where rpc clients have been cloned, we want to make
+ * sure that we use the program number/version etc of the actual
+ * owner of the xprt. To do so, we walk back up the tree of parents
+ * to find whoever created the transport and/or whoever has the
+ * autobind flag set.
+ */
+static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
+{
+	struct rpc_clnt *parent = clnt->cl_parent;
+
+	while (parent != clnt) {
+		if (parent->cl_xprt != clnt->cl_xprt)
+			break;
+		if (clnt->cl_autobind)
+			break;
+		clnt = parent;
+		parent = parent->cl_parent;
+	}
+	return clnt;
+}
+
 /**
  * rpcb_getport_async - obtain the port for a given RPC service on a given host
  * @task: task that is waiting for portmapper request
@@ -469,10 +491,10 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi
  */
 void rpcb_getport_async(struct rpc_task *task)
 {
-	struct rpc_clnt *clnt = task->tk_client;
+	struct rpc_clnt *clnt;
 	struct rpc_procinfo *proc;
 	u32 bind_version;
-	struct rpc_xprt *xprt = task->tk_xprt;
+	struct rpc_xprt *xprt;
 	struct rpc_clnt	*rpcb_clnt;
 	static struct rpcbind_args *map;
 	struct rpc_task	*child;
@@ -481,13 +503,13 @@ void rpcb_getport_async(struct rpc_task *task)
 	size_t salen;
 	int status;
 
+	clnt = rpcb_find_transport_owner(task->tk_client);
+	xprt = clnt->cl_xprt;
+
 	dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
 		task->tk_pid, __func__,
 		clnt->cl_server, clnt->cl_prog, clnt->cl_vers, xprt->prot);
 
-	/* Autobind on cloned rpc clients is discouraged */
-	BUG_ON(clnt->cl_parent != clnt);
-
 	/* Put self on the wait queue to ensure we get notified if
 	 * some other task is already attempting to bind the port */
 	rpc_sleep_on(&xprt->binding, task, NULL);
@@ -549,7 +571,7 @@ void rpcb_getport_async(struct rpc_task *task)
 		status = -ENOMEM;
 		dprintk("RPC: %5u %s: no memory available\n",
 			task->tk_pid, __func__);
-		goto bailout_nofree;
+		goto bailout_release_client;
 	}
 	map->r_prog = clnt->cl_prog;
 	map->r_vers = clnt->cl_vers;
@@ -569,11 +591,13 @@ void rpcb_getport_async(struct rpc_task *task)
 			task->tk_pid, __func__);
 		return;
 	}
-	rpc_put_task(child);
 
-	task->tk_xprt->stat.bind_count++;
+	xprt->stat.bind_count++;
+	rpc_put_task(child);
 	return;
 
+bailout_release_client:
+	rpc_release_client(rpcb_clnt);
 bailout_nofree:
 	rpcb_wake_rpcbind_waiters(xprt, status);
 	task->tk_status = status;
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 99a52aabe332..29e401bb612e 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -108,13 +108,10 @@ int xprt_register_transport(struct xprt_class *transport)
 			goto out;
 	}
 
-	result = -EINVAL;
-	if (try_module_get(THIS_MODULE)) {
-		list_add_tail(&transport->list, &xprt_list);
-		printk(KERN_INFO "RPC: Registered %s transport module.\n",
-			transport->name);
-		result = 0;
-	}
+	list_add_tail(&transport->list, &xprt_list);
+	printk(KERN_INFO "RPC: Registered %s transport module.\n",
+	       transport->name);
+	result = 0;
 
 out:
 	spin_unlock(&xprt_list_lock);
@@ -143,7 +140,6 @@ int xprt_unregister_transport(struct xprt_class *transport)
 				"RPC: Unregistered %s transport module.\n",
 				transport->name);
 			list_del_init(&transport->list);
-			module_put(THIS_MODULE);
 			goto out;
 		}
 	}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 5c1954d28d09..14106d26bb95 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -118,6 +118,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 	}
 
 	if (xdrbuf->tail[0].iov_len) {
+		/* the rpcrdma protocol allows us to omit any trailing
+		 * xdr pad bytes, saving the server an RDMA operation. */
+		if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
+			return n;
 		if (n == nsegs)
 			return 0;
 		seg[n].mr_page = NULL;
@@ -508,8 +512,8 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	if (hdrlen == 0)
 		return -1;
 
-	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd\n"
-		"                   headerp 0x%p base 0x%p lkey 0x%x\n",
+	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
+		" headerp 0x%p base 0x%p lkey 0x%x\n",
 		__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
 		headerp, base, req->rl_iov.lkey);
 
@@ -594,7 +598,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b
  * Scatter inline received data back into provided iov's.
  */
 static void
-rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
+rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 {
 	int i, npages, curlen, olen;
 	char *destp;
@@ -660,6 +664,13 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len)
 	} else
 		rqst->rq_rcv_buf.tail[0].iov_len = 0;
 
+	if (pad) {
+		/* implicit padding on terminal chunk */
+		unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
+		while (pad--)
+			p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
+	}
+
 	if (copy_len)
 		dprintk("RPC:       %s: %d bytes in"
 			" %d extra segments (%d lost)\n",
@@ -681,12 +692,14 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
 	struct rpc_xprt *xprt = ep->rep_xprt;
 
 	spin_lock_bh(&xprt->transport_lock);
+	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
+		++xprt->connect_cookie;
 	if (ep->rep_connected > 0) {
 		if (!xprt_test_and_set_connected(xprt))
 			xprt_wake_pending_tasks(xprt, 0);
 	} else {
 		if (xprt_test_and_clear_connected(xprt))
-			xprt_wake_pending_tasks(xprt, ep->rep_connected);
+			xprt_wake_pending_tasks(xprt, -ENOTCONN);
 	}
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -792,14 +805,20 @@ repost:
 			    ((unsigned char *)iptr - (unsigned char *)headerp);
 			status = rep->rr_len + rdmalen;
 			r_xprt->rx_stats.total_rdma_reply += rdmalen;
+			/* special case - last chunk may omit padding */
+			if (rdmalen &= 3) {
+				rdmalen = 4 - rdmalen;
+				status += rdmalen;
+			}
 		} else {
 			/* else ordinary inline */
+			rdmalen = 0;
 			iptr = (__be32 *)((unsigned char *)headerp + 28);
 			rep->rr_len -= 28; /*sizeof *headerp;*/
 			status = rep->rr_len;
 		}
 		/* Fix up the rpc results for upper layer */
-		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len);
+		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
 		break;
 
 	case htonl(RDMA_NOMSG):
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index a564c1a39ec5..9839c3d94145 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -70,11 +70,8 @@ static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
 static unsigned int xprt_rdma_inline_write_padding;
-#if !RPCRDMA_PERSISTENT_REGISTRATION
-static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
-#else
-static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
-#endif
+static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
+                int xprt_rdma_pad_optimize = 0;
 
 #ifdef RPC_DEBUG
 
@@ -140,6 +137,14 @@ static ctl_table xr_tunables_table[] = {
 		.extra2		= &max_memreg,
 	},
 	{
+		.ctl_name       = CTL_UNNUMBERED,
+		.procname	= "rdma_pad_optimize",
+		.data		= &xprt_rdma_pad_optimize,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= &proc_dointvec,
+	},
+	{
 		.ctl_name = 0,
 	},
 };
@@ -458,6 +463,8 @@ xprt_rdma_close(struct rpc_xprt *xprt)
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 
 	dprintk("RPC:       %s: closing\n", __func__);
+	if (r_xprt->rx_ep.rep_connected > 0)
+		xprt->reestablish_timeout = 0;
 	xprt_disconnect_done(xprt);
 	(void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
 }
@@ -485,6 +492,11 @@ xprt_rdma_connect(struct rpc_task *task)
 			/* Reconnect */
 			schedule_delayed_work(&r_xprt->rdma_connect,
 				xprt->reestablish_timeout);
+			xprt->reestablish_timeout <<= 1;
+			if (xprt->reestablish_timeout > (30 * HZ))
+				xprt->reestablish_timeout = (30 * HZ);
+			else if (xprt->reestablish_timeout < (5 * HZ))
+				xprt->reestablish_timeout = (5 * HZ);
 		} else {
 			schedule_delayed_work(&r_xprt->rdma_connect, 0);
 			if (!RPC_IS_ASYNC(task))
@@ -591,6 +603,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size)
 	}
 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
 out:
+	req->rl_connect_cookie = 0;	/* our reserved value */
 	return req->rl_xdr_buf;
 
 outfail:
@@ -694,13 +707,21 @@ xprt_rdma_send_request(struct rpc_task *task)
 		req->rl_reply->rr_xprt = xprt;
 	}
 
-	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
-		xprt_disconnect_done(xprt);
-		return -ENOTCONN;	/* implies disconnect */
-	}
+	/* Must suppress retransmit to maintain credits */
+	if (req->rl_connect_cookie == xprt->connect_cookie)
+		goto drop_connection;
+	req->rl_connect_cookie = xprt->connect_cookie;
+
+	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
+		goto drop_connection;
 
+	task->tk_bytes_sent += rqst->rq_snd_buf.len;
 	rqst->rq_bytes_sent = 0;
 	return 0;
+
+drop_connection:
+	xprt_disconnect_done(xprt);
+	return -ENOTCONN;	/* implies disconnect */
 }
 
 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
@@ -770,7 +791,7 @@ static void __exit xprt_rdma_cleanup(void)
 {
 	int rc;
 
-	dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
+	dprintk(KERN_INFO "RPCRDMA Module Removed, deregister RPC RDMA transport\n");
 #ifdef RPC_DEBUG
 	if (sunrpc_table_header) {
 		unregister_sysctl_table(sunrpc_table_header);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 8ea283ecc522..a5fef5e6c323 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -284,6 +284,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
 	switch (event->event) {
 	case RDMA_CM_EVENT_ADDR_RESOLVED:
 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
+		ia->ri_async_rc = 0;
 		complete(&ia->ri_done);
 		break;
 	case RDMA_CM_EVENT_ADDR_ERROR:
@@ -338,13 +339,32 @@ connected:
 		wake_up_all(&ep->rep_connect_wait);
 		break;
 	default:
-		ia->ri_async_rc = -EINVAL;
-		dprintk("RPC:       %s: unexpected CM event %X\n",
+		dprintk("RPC:       %s: unexpected CM event %d\n",
 			__func__, event->event);
-		complete(&ia->ri_done);
 		break;
 	}
 
+#ifdef RPC_DEBUG
+	if (connstate == 1) {
+		int ird = attr.max_dest_rd_atomic;
+		int tird = ep->rep_remote_cma.responder_resources;
+		printk(KERN_INFO "rpcrdma: connection to %u.%u.%u.%u:%u "
+			"on %s, memreg %d slots %d ird %d%s\n",
+			NIPQUAD(addr->sin_addr.s_addr),
+			ntohs(addr->sin_port),
+			ia->ri_id->device->name,
+			ia->ri_memreg_strategy,
+			xprt->rx_buf.rb_max_requests,
+			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
+	} else if (connstate < 0) {
+		printk(KERN_INFO "rpcrdma: connection to %u.%u.%u.%u:%u "
+			"closed (%d)\n",
+			NIPQUAD(addr->sin_addr.s_addr),
+			ntohs(addr->sin_port),
+			connstate);
+	}
+#endif
+
 	return 0;
 }
 
@@ -355,6 +375,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 	struct rdma_cm_id *id;
 	int rc;
 
+	init_completion(&ia->ri_done);
+
 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
 	if (IS_ERR(id)) {
 		rc = PTR_ERR(id);
@@ -363,26 +385,28 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 		return id;
 	}
 
-	ia->ri_async_rc = 0;
+	ia->ri_async_rc = -ETIMEDOUT;
 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
 	if (rc) {
 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
 			__func__, rc);
 		goto out;
 	}
-	wait_for_completion(&ia->ri_done);
+	wait_for_completion_interruptible_timeout(&ia->ri_done,
+				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 	rc = ia->ri_async_rc;
 	if (rc)
 		goto out;
 
-	ia->ri_async_rc = 0;
+	ia->ri_async_rc = -ETIMEDOUT;
 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
 	if (rc) {
 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
 			__func__, rc);
 		goto out;
 	}
-	wait_for_completion(&ia->ri_done);
+	wait_for_completion_interruptible_timeout(&ia->ri_done,
+				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
 	rc = ia->ri_async_rc;
 	if (rc)
 		goto out;
@@ -423,11 +447,10 @@ rpcrdma_clean_cq(struct ib_cq *cq)
 int
 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 {
-	int rc;
+	int rc, mem_priv;
+	struct ib_device_attr devattr;
 	struct rpcrdma_ia *ia = &xprt->rx_ia;
 
-	init_completion(&ia->ri_done);
-
 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 	if (IS_ERR(ia->ri_id)) {
 		rc = PTR_ERR(ia->ri_id);
@@ -443,6 +466,73 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	}
 
 	/*
+	 * Query the device to determine if the requested memory
+	 * registration strategy is supported. If it isn't, set the
+	 * strategy to a globally supported model.
+	 */
+	rc = ib_query_device(ia->ri_id->device, &devattr);
+	if (rc) {
+		dprintk("RPC:       %s: ib_query_device failed %d\n",
+			__func__, rc);
+		goto out2;
+	}
+
+	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
+		ia->ri_have_dma_lkey = 1;
+		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
+	}
+
+	switch (memreg) {
+	case RPCRDMA_MEMWINDOWS:
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
+			dprintk("RPC:       %s: MEMWINDOWS registration "
+				"specified but not supported by adapter, "
+				"using slower RPCRDMA_REGISTER\n",
+				__func__);
+			memreg = RPCRDMA_REGISTER;
+		}
+		break;
+	case RPCRDMA_MTHCAFMR:
+		if (!ia->ri_id->device->alloc_fmr) {
+#if RPCRDMA_PERSISTENT_REGISTRATION
+			dprintk("RPC:       %s: MTHCAFMR registration "
+				"specified but not supported by adapter, "
+				"using riskier RPCRDMA_ALLPHYSICAL\n",
+				__func__);
+			memreg = RPCRDMA_ALLPHYSICAL;
+#else
+			dprintk("RPC:       %s: MTHCAFMR registration "
+				"specified but not supported by adapter, "
+				"using slower RPCRDMA_REGISTER\n",
+				__func__);
+			memreg = RPCRDMA_REGISTER;
+#endif
+		}
+		break;
+	case RPCRDMA_FRMR:
+		/* Requires both frmr reg and local dma lkey */
+		if ((devattr.device_cap_flags &
+		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
+		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
+#if RPCRDMA_PERSISTENT_REGISTRATION
+			dprintk("RPC:       %s: FRMR registration "
+				"specified but not supported by adapter, "
+				"using riskier RPCRDMA_ALLPHYSICAL\n",
+				__func__);
+			memreg = RPCRDMA_ALLPHYSICAL;
+#else
+			dprintk("RPC:       %s: FRMR registration "
+				"specified but not supported by adapter, "
+				"using slower RPCRDMA_REGISTER\n",
+				__func__);
+			memreg = RPCRDMA_REGISTER;
+#endif
+		}
+		break;
+	}
+
+	/*
 	 * Optionally obtain an underlying physical identity mapping in
 	 * order to do a memory window-based bind. This base registration
 	 * is protected from remote access - that is enabled only by binding
@@ -450,22 +540,28 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	 * revoked after the corresponding completion similar to a storage
 	 * adapter.
 	 */
-	if (memreg > RPCRDMA_REGISTER) {
-		int mem_priv = IB_ACCESS_LOCAL_WRITE;
-		switch (memreg) {
+	switch (memreg) {
+	case RPCRDMA_BOUNCEBUFFERS:
+	case RPCRDMA_REGISTER:
+	case RPCRDMA_FRMR:
+		break;
 #if RPCRDMA_PERSISTENT_REGISTRATION
-		case RPCRDMA_ALLPHYSICAL:
-			mem_priv |= IB_ACCESS_REMOTE_WRITE;
-			mem_priv |= IB_ACCESS_REMOTE_READ;
-			break;
+	case RPCRDMA_ALLPHYSICAL:
+		mem_priv = IB_ACCESS_LOCAL_WRITE |
+				IB_ACCESS_REMOTE_WRITE |
+				IB_ACCESS_REMOTE_READ;
+		goto register_setup;
 #endif
-		case RPCRDMA_MEMWINDOWS_ASYNC:
-		case RPCRDMA_MEMWINDOWS:
-			mem_priv |= IB_ACCESS_MW_BIND;
-			break;
-		default:
+	case RPCRDMA_MEMWINDOWS_ASYNC:
+	case RPCRDMA_MEMWINDOWS:
+		mem_priv = IB_ACCESS_LOCAL_WRITE |
+				IB_ACCESS_MW_BIND;
+		goto register_setup;
+	case RPCRDMA_MTHCAFMR:
+		if (ia->ri_have_dma_lkey)
 			break;
-		}
+		mem_priv = IB_ACCESS_LOCAL_WRITE;
+	register_setup:
 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
 		if (IS_ERR(ia->ri_bind_mem)) {
 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
@@ -475,7 +571,15 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 			memreg = RPCRDMA_REGISTER;
 			ia->ri_bind_mem = NULL;
 		}
+		break;
+	default:
+		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
+				__func__, memreg);
+		rc = -EINVAL;
+		goto out2;
 	}
+	dprintk("RPC:       %s: memory registration strategy is %d\n",
+		__func__, memreg);
 
 	/* Else will do memory reg/dereg for each chunk */
 	ia->ri_memreg_strategy = memreg;
@@ -483,6 +587,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 	return 0;
 out2:
 	rdma_destroy_id(ia->ri_id);
+	ia->ri_id = NULL;
 out1:
 	return rc;
 }
@@ -503,15 +608,17 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia)
 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
 			__func__, rc);
 	}
-	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
-		rdma_destroy_qp(ia->ri_id);
+	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
+		if (ia->ri_id->qp)
+			rdma_destroy_qp(ia->ri_id);
+		rdma_destroy_id(ia->ri_id);
+		ia->ri_id = NULL;
+	}
 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
 		rc = ib_dealloc_pd(ia->ri_pd);
 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
 			__func__, rc);
 	}
-	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
-		rdma_destroy_id(ia->ri_id);
 }
 
 /*
@@ -541,6 +648,12 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_attr.srq = NULL;
 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
 	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_FRMR:
+		/* Add room for frmr register and invalidate WRs */
+		ep->rep_attr.cap.max_send_wr *= 3;
+		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
+			return -EINVAL;
+		break;
 	case RPCRDMA_MEMWINDOWS_ASYNC:
 	case RPCRDMA_MEMWINDOWS:
 		/* Add room for mw_binds+unbinds - overkill! */
@@ -617,29 +730,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_remote_cma.private_data_len = 0;
 
 	/* Client offers RDMA Read but does not initiate */
-	switch (ia->ri_memreg_strategy) {
-	case RPCRDMA_BOUNCEBUFFERS:
+	ep->rep_remote_cma.initiator_depth = 0;
+	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
 		ep->rep_remote_cma.responder_resources = 0;
-		break;
-	case RPCRDMA_MTHCAFMR:
-	case RPCRDMA_REGISTER:
-		ep->rep_remote_cma.responder_resources = cdata->max_requests *
-				(RPCRDMA_MAX_DATA_SEGS / 8);
-		break;
-	case RPCRDMA_MEMWINDOWS:
-	case RPCRDMA_MEMWINDOWS_ASYNC:
-#if RPCRDMA_PERSISTENT_REGISTRATION
-	case RPCRDMA_ALLPHYSICAL:
-#endif
-		ep->rep_remote_cma.responder_resources = cdata->max_requests *
-				(RPCRDMA_MAX_DATA_SEGS / 2);
-		break;
-	default:
-		break;
-	}
-	if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
+	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
+		ep->rep_remote_cma.responder_resources = 32;
+	else
 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
-	ep->rep_remote_cma.initiator_depth = 0;
 
 	ep->rep_remote_cma.retry_count = 7;
 	ep->rep_remote_cma.flow_control = 0;
@@ -679,21 +776,16 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 		if (rc)
 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
 				" returned %i\n", __func__, rc);
+		rdma_destroy_qp(ia->ri_id);
+		ia->ri_id->qp = NULL;
 	}
 
-	ep->rep_func = NULL;
-
 	/* padding - could be done in rpcrdma_buffer_destroy... */
 	if (ep->rep_pad_mr) {
 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
 		ep->rep_pad_mr = NULL;
 	}
 
-	if (ia->ri_id->qp) {
-		rdma_destroy_qp(ia->ri_id);
-		ia->ri_id->qp = NULL;
-	}
-
 	rpcrdma_clean_cq(ep->rep_cq);
 	rc = ib_destroy_cq(ep->rep_cq);
 	if (rc)
@@ -712,9 +804,8 @@ rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	struct rdma_cm_id *id;
 	int rc = 0;
 	int retry_count = 0;
-	int reconnect = (ep->rep_connected != 0);
 
-	if (reconnect) {
+	if (ep->rep_connected != 0) {
 		struct rpcrdma_xprt *xprt;
 retry:
 		rc = rpcrdma_ep_disconnect(ep, ia);
@@ -745,6 +836,7 @@ retry:
 			goto out;
 		}
 		/* END TEMP */
+		rdma_destroy_qp(ia->ri_id);
 		rdma_destroy_id(ia->ri_id);
 		ia->ri_id = id;
 	}
@@ -769,14 +861,6 @@ if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
 	}
 }
 
-	/* Theoretically a client initiator_depth > 0 is not needed,
-	 * but many peers fail to complete the connection unless they
-	 * == responder_resources! */
-	if (ep->rep_remote_cma.initiator_depth !=
-				ep->rep_remote_cma.responder_resources)
-		ep->rep_remote_cma.initiator_depth =
-			ep->rep_remote_cma.responder_resources;
-
 	ep->rep_connected = 0;
 
 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
@@ -786,9 +870,6 @@ if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
 		goto out;
 	}
 
-	if (reconnect)
-		return 0;
-
 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
 
 	/*
@@ -805,14 +886,16 @@ if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
 	if (ep->rep_connected <= 0) {
 		/* Sometimes, the only way to reliably connect to remote
 		 * CMs is to use same nonzero values for ORD and IRD. */
-		ep->rep_remote_cma.initiator_depth =
-					ep->rep_remote_cma.responder_resources;
-		if (ep->rep_remote_cma.initiator_depth == 0)
-			++ep->rep_remote_cma.initiator_depth;
-		if (ep->rep_remote_cma.responder_resources == 0)
-			++ep->rep_remote_cma.responder_resources;
-		if (retry_count++ == 0)
+		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
+		    (ep->rep_remote_cma.responder_resources == 0 ||
+		     ep->rep_remote_cma.initiator_depth !=
+				ep->rep_remote_cma.responder_resources)) {
+			if (ep->rep_remote_cma.responder_resources == 0)
+				ep->rep_remote_cma.responder_resources = 1;
+			ep->rep_remote_cma.initiator_depth =
+				ep->rep_remote_cma.responder_resources;
 			goto retry;
+		}
 		rc = ep->rep_connected;
 	} else {
 		dprintk("RPC:       %s: connected\n", __func__);
@@ -863,6 +946,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 	char *p;
 	size_t len;
 	int i, rc;
+	struct rpcrdma_mw *r;
 
 	buf->rb_max_requests = cdata->max_requests;
 	spin_lock_init(&buf->rb_lock);
@@ -873,7 +957,7 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
 	 *   3.  array of struct rpcrdma_rep for replies
 	 *   4.  padding, if any
-	 *   5.  mw's, if any
+	 *   5.  mw's, fmr's or frmr's, if any
 	 * Send/recv buffers in req/rep need to be registered
 	 */
 
@@ -881,6 +965,10 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
 	len += cdata->padding;
 	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_FRMR:
+		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
+				sizeof(struct rpcrdma_mw);
+		break;
 	case RPCRDMA_MTHCAFMR:
 		/* TBD we are perhaps overallocating here */
 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
@@ -927,15 +1015,37 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 	 * and also reduce unbind-to-bind collision.
 	 */
 	INIT_LIST_HEAD(&buf->rb_mws);
+	r = (struct rpcrdma_mw *)p;
 	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_FRMR:
+		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
+			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
+							 RPCRDMA_MAX_SEGS);
+			if (IS_ERR(r->r.frmr.fr_mr)) {
+				rc = PTR_ERR(r->r.frmr.fr_mr);
+				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
+					" failed %i\n", __func__, rc);
+				goto out;
+			}
+			r->r.frmr.fr_pgl =
+				ib_alloc_fast_reg_page_list(ia->ri_id->device,
+							    RPCRDMA_MAX_SEGS);
+			if (IS_ERR(r->r.frmr.fr_pgl)) {
+				rc = PTR_ERR(r->r.frmr.fr_pgl);
+				dprintk("RPC:       %s: "
+					"ib_alloc_fast_reg_page_list "
+					"failed %i\n", __func__, rc);
+				goto out;
+			}
+			list_add(&r->mw_list, &buf->rb_mws);
+			++r;
+		}
+		break;
 	case RPCRDMA_MTHCAFMR:
-		{
-		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
-		struct ib_fmr_attr fa = {
-			RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
-		};
 		/* TBD we are perhaps overallocating here */
 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
+			static struct ib_fmr_attr fa =
+				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
 				&fa);
@@ -948,12 +1058,9 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 			list_add(&r->mw_list, &buf->rb_mws);
 			++r;
 		}
-		}
 		break;
 	case RPCRDMA_MEMWINDOWS_ASYNC:
 	case RPCRDMA_MEMWINDOWS:
-		{
-		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
 		/* Allocate one extra request's worth, for full cycling */
 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
 			r->r.mw = ib_alloc_mw(ia->ri_pd);
@@ -966,7 +1073,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
 			list_add(&r->mw_list, &buf->rb_mws);
 			++r;
 		}
-		}
 		break;
 	default:
 		break;
@@ -1046,6 +1152,7 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
 	int rc, i;
 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+	struct rpcrdma_mw *r;
 
 	/* clean up in reverse order from create
 	 *   1.  recv mr memory (mr free, then kfree)
@@ -1065,11 +1172,19 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 		}
 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
 			while (!list_empty(&buf->rb_mws)) {
-				struct rpcrdma_mw *r;
 				r = list_entry(buf->rb_mws.next,
 					struct rpcrdma_mw, mw_list);
 				list_del(&r->mw_list);
 				switch (ia->ri_memreg_strategy) {
+				case RPCRDMA_FRMR:
+					rc = ib_dereg_mr(r->r.frmr.fr_mr);
+					if (rc)
+						dprintk("RPC:       %s:"
+							" ib_dereg_mr"
+							" failed %i\n",
+							__func__, rc);
+					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
+					break;
 				case RPCRDMA_MTHCAFMR:
 					rc = ib_dealloc_fmr(r->r.fmr);
 					if (rc)
@@ -1115,6 +1230,8 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 {
 	struct rpcrdma_req *req;
 	unsigned long flags;
+	int i;
+	struct rpcrdma_mw *r;
 
 	spin_lock_irqsave(&buffers->rb_lock, flags);
 	if (buffers->rb_send_index == buffers->rb_max_requests) {
@@ -1135,9 +1252,8 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 	}
 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
 	if (!list_empty(&buffers->rb_mws)) {
-		int i = RPCRDMA_MAX_SEGS - 1;
+		i = RPCRDMA_MAX_SEGS - 1;
 		do {
-			struct rpcrdma_mw *r;
 			r = list_entry(buffers->rb_mws.next,
 					struct rpcrdma_mw, mw_list);
 			list_del(&r->mw_list);
@@ -1171,6 +1287,7 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
 		req->rl_reply = NULL;
 	}
 	switch (ia->ri_memreg_strategy) {
+	case RPCRDMA_FRMR:
 	case RPCRDMA_MTHCAFMR:
 	case RPCRDMA_MEMWINDOWS_ASYNC:
 	case RPCRDMA_MEMWINDOWS:
@@ -1252,7 +1369,11 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
 			va, len, DMA_BIDIRECTIONAL);
 	iov->length = len;
 
-	if (ia->ri_bind_mem != NULL) {
+	if (ia->ri_have_dma_lkey) {
+		*mrp = NULL;
+		iov->lkey = ia->ri_dma_lkey;
+		return 0;
+	} else if (ia->ri_bind_mem != NULL) {
 		*mrp = NULL;
 		iov->lkey = ia->ri_bind_mem->lkey;
 		return 0;
@@ -1329,15 +1450,292 @@ rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
 }
 
+static int
+rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
+			int *nsegs, int writing, struct rpcrdma_ia *ia,
+			struct rpcrdma_xprt *r_xprt)
+{
+	struct rpcrdma_mr_seg *seg1 = seg;
+	struct ib_send_wr frmr_wr, *bad_wr;
+	u8 key;
+	int len, pageoff;
+	int i, rc;
+
+	pageoff = offset_in_page(seg1->mr_offset);
+	seg1->mr_offset -= pageoff;	/* start of page */
+	seg1->mr_len += pageoff;
+	len = -pageoff;
+	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
+		*nsegs = RPCRDMA_MAX_DATA_SEGS;
+	for (i = 0; i < *nsegs;) {
+		rpcrdma_map_one(ia, seg, writing);
+		seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
+		len += seg->mr_len;
+		++seg;
+		++i;
+		/* Check for holes */
+		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
+		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
+			break;
+	}
+	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
+		__func__, seg1->mr_chunk.rl_mw, i);
+
+	/* Bump the key */
+	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
+	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
+
+	/* Prepare FRMR WR */
+	memset(&frmr_wr, 0, sizeof frmr_wr);
+	frmr_wr.opcode = IB_WR_FAST_REG_MR;
+	frmr_wr.send_flags = 0;			/* unsignaled */
+	frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
+	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
+	frmr_wr.wr.fast_reg.page_list_len = i;
+	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+	frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
+	frmr_wr.wr.fast_reg.access_flags = (writing ?
+				IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
+	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+	DECR_CQCOUNT(&r_xprt->rx_ep);
+
+	rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
+
+	if (rc) {
+		dprintk("RPC:       %s: failed ib_post_send for register,"
+			" status %i\n", __func__, rc);
+		while (i--)
+			rpcrdma_unmap_one(ia, --seg);
+	} else {
+		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+		seg1->mr_base = seg1->mr_dma + pageoff;
+		seg1->mr_nsegs = i;
+		seg1->mr_len = len;
+	}
+	*nsegs = i;
+	return rc;
+}
+
+static int
+rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
+			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
+{
+	struct rpcrdma_mr_seg *seg1 = seg;
+	struct ib_send_wr invalidate_wr, *bad_wr;
+	int rc;
+
+	while (seg1->mr_nsegs--)
+		rpcrdma_unmap_one(ia, seg++);
+
+	memset(&invalidate_wr, 0, sizeof invalidate_wr);
+	invalidate_wr.opcode = IB_WR_LOCAL_INV;
+	invalidate_wr.send_flags = 0;			/* unsignaled */
+	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+	DECR_CQCOUNT(&r_xprt->rx_ep);
+
+	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+	if (rc)
+		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
+			" status %i\n", __func__, rc);
+	return rc;
+}
+
+static int
+rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
+			int *nsegs, int writing, struct rpcrdma_ia *ia)
+{
+	struct rpcrdma_mr_seg *seg1 = seg;
+	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
+	int len, pageoff, i, rc;
+
+	pageoff = offset_in_page(seg1->mr_offset);
+	seg1->mr_offset -= pageoff;	/* start of page */
+	seg1->mr_len += pageoff;
+	len = -pageoff;
+	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
+		*nsegs = RPCRDMA_MAX_DATA_SEGS;
+	for (i = 0; i < *nsegs;) {
+		rpcrdma_map_one(ia, seg, writing);
+		physaddrs[i] = seg->mr_dma;
+		len += seg->mr_len;
+		++seg;
+		++i;
+		/* Check for holes */
+		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
+		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
+			break;
+	}
+	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
+				physaddrs, i, seg1->mr_dma);
+	if (rc) {
+		dprintk("RPC:       %s: failed ib_map_phys_fmr "
+			"%u@0x%llx+%i (%d)... status %i\n", __func__,
+			len, (unsigned long long)seg1->mr_dma,
+			pageoff, i, rc);
+		while (i--)
+			rpcrdma_unmap_one(ia, --seg);
+	} else {
+		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
+		seg1->mr_base = seg1->mr_dma + pageoff;
+		seg1->mr_nsegs = i;
+		seg1->mr_len = len;
+	}
+	*nsegs = i;
+	return rc;
+}
+
+static int
+rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
+			struct rpcrdma_ia *ia)
+{
+	struct rpcrdma_mr_seg *seg1 = seg;
+	LIST_HEAD(l);
+	int rc;
+
+	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
+	rc = ib_unmap_fmr(&l);
+	while (seg1->mr_nsegs--)
+		rpcrdma_unmap_one(ia, seg++);
+	if (rc)
+		dprintk("RPC:       %s: failed ib_unmap_fmr,"
+			" status %i\n", __func__, rc);
+	return rc;
+}
+
+static int
+rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
+			int *nsegs, int writing, struct rpcrdma_ia *ia,
+			struct rpcrdma_xprt *r_xprt)
+{
+	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
+				  IB_ACCESS_REMOTE_READ);
+	struct ib_mw_bind param;
+	int rc;
+
+	*nsegs = 1;
+	rpcrdma_map_one(ia, seg, writing);
+	param.mr = ia->ri_bind_mem;
+	param.wr_id = 0ULL;	/* no send cookie */
+	param.addr = seg->mr_dma;
+	param.length = seg->mr_len;
+	param.send_flags = 0;
+	param.mw_access_flags = mem_priv;
+
+	DECR_CQCOUNT(&r_xprt->rx_ep);
+	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
+	if (rc) {
+		dprintk("RPC:       %s: failed ib_bind_mw "
+			"%u@0x%llx status %i\n",
+			__func__, seg->mr_len,
+			(unsigned long long)seg->mr_dma, rc);
+		rpcrdma_unmap_one(ia, seg);
+	} else {
+		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
+		seg->mr_base = param.addr;
+		seg->mr_nsegs = 1;
+	}
+	return rc;
+}
+
+static int
+rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
+			struct rpcrdma_ia *ia,
+			struct rpcrdma_xprt *r_xprt, void **r)
+{
+	struct ib_mw_bind param;
+	LIST_HEAD(l);
+	int rc;
+
+	BUG_ON(seg->mr_nsegs != 1);
+	param.mr = ia->ri_bind_mem;
+	param.addr = 0ULL;	/* unbind */
+	param.length = 0;
+	param.mw_access_flags = 0;
+	if (*r) {
+		param.wr_id = (u64) (unsigned long) *r;
+		param.send_flags = IB_SEND_SIGNALED;
+		INIT_CQCOUNT(&r_xprt->rx_ep);
+	} else {
+		param.wr_id = 0ULL;
+		param.send_flags = 0;
+		DECR_CQCOUNT(&r_xprt->rx_ep);
+	}
+	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
+	rpcrdma_unmap_one(ia, seg);
+	if (rc)
+		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
+			" status %i\n", __func__, rc);
+	else
+		*r = NULL;	/* will upcall on completion */
+	return rc;
+}
+
+static int
+rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
+			int *nsegs, int writing, struct rpcrdma_ia *ia)
+{
+	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
+				  IB_ACCESS_REMOTE_READ);
+	struct rpcrdma_mr_seg *seg1 = seg;
+	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
+	int len, i, rc = 0;
+
+	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
+		*nsegs = RPCRDMA_MAX_DATA_SEGS;
+	for (len = 0, i = 0; i < *nsegs;) {
+		rpcrdma_map_one(ia, seg, writing);
+		ipb[i].addr = seg->mr_dma;
+		ipb[i].size = seg->mr_len;
+		len += seg->mr_len;
+		++seg;
+		++i;
+		/* Check for holes */
+		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
+		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
+			break;
+	}
+	seg1->mr_base = seg1->mr_dma;
+	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
+				ipb, i, mem_priv, &seg1->mr_base);
+	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
+		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
+		dprintk("RPC:       %s: failed ib_reg_phys_mr "
+			"%u@0x%llx (%d)... status %i\n",
+			__func__, len,
+			(unsigned long long)seg1->mr_dma, i, rc);
+		while (i--)
+			rpcrdma_unmap_one(ia, --seg);
+	} else {
+		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
+		seg1->mr_nsegs = i;
+		seg1->mr_len = len;
+	}
+	*nsegs = i;
+	return rc;
+}
+
+static int
+rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
+			struct rpcrdma_ia *ia)
+{
+	struct rpcrdma_mr_seg *seg1 = seg;
+	int rc;
+
+	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
+	seg1->mr_chunk.rl_mr = NULL;
+	while (seg1->mr_nsegs--)
+		rpcrdma_unmap_one(ia, seg++);
+	if (rc)
+		dprintk("RPC:       %s: failed ib_dereg_mr,"
+			" status %i\n", __func__, rc);
+	return rc;
+}
+
 int
 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
-				  IB_ACCESS_REMOTE_READ);
-	struct rpcrdma_mr_seg *seg1 = seg;
-	int i;
 	int rc = 0;
 
 	switch (ia->ri_memreg_strategy) {
@@ -1352,114 +1750,25 @@ rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
 		break;
 #endif
 
-	/* Registration using fast memory registration */
+	/* Registration using frmr registration */
+	case RPCRDMA_FRMR:
+		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
+		break;
+
+	/* Registration using fmr memory registration */
 	case RPCRDMA_MTHCAFMR:
-		{
-		u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
-		int len, pageoff = offset_in_page(seg->mr_offset);
-		seg1->mr_offset -= pageoff;	/* start of page */
-		seg1->mr_len += pageoff;
-		len = -pageoff;
-		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
-			nsegs = RPCRDMA_MAX_DATA_SEGS;
-		for (i = 0; i < nsegs;) {
-			rpcrdma_map_one(ia, seg, writing);
-			physaddrs[i] = seg->mr_dma;
-			len += seg->mr_len;
-			++seg;
-			++i;
-			/* Check for holes */
-			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
-			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
-				break;
-		}
-		nsegs = i;
-		rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
-					physaddrs, nsegs, seg1->mr_dma);
-		if (rc) {
-			dprintk("RPC:       %s: failed ib_map_phys_fmr "
-				"%u@0x%llx+%i (%d)... status %i\n", __func__,
-				len, (unsigned long long)seg1->mr_dma,
-				pageoff, nsegs, rc);
-			while (nsegs--)
-				rpcrdma_unmap_one(ia, --seg);
-		} else {
-			seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
-			seg1->mr_base = seg1->mr_dma + pageoff;
-			seg1->mr_nsegs = nsegs;
-			seg1->mr_len = len;
-		}
-		}
+		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
 		break;
 
 	/* Registration using memory windows */
 	case RPCRDMA_MEMWINDOWS_ASYNC:
 	case RPCRDMA_MEMWINDOWS:
-		{
-		struct ib_mw_bind param;
-		rpcrdma_map_one(ia, seg, writing);
-		param.mr = ia->ri_bind_mem;
-		param.wr_id = 0ULL;	/* no send cookie */
-		param.addr = seg->mr_dma;
-		param.length = seg->mr_len;
-		param.send_flags = 0;
-		param.mw_access_flags = mem_priv;
-
-		DECR_CQCOUNT(&r_xprt->rx_ep);
-		rc = ib_bind_mw(ia->ri_id->qp,
-					seg->mr_chunk.rl_mw->r.mw, &param);
-		if (rc) {
-			dprintk("RPC:       %s: failed ib_bind_mw "
-				"%u@0x%llx status %i\n",
-				__func__, seg->mr_len,
-				(unsigned long long)seg->mr_dma, rc);
-			rpcrdma_unmap_one(ia, seg);
-		} else {
-			seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
-			seg->mr_base = param.addr;
-			seg->mr_nsegs = 1;
-			nsegs = 1;
-		}
-		}
+		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
 		break;
 
 	/* Default registration each time */
 	default:
-		{
-		struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
-		int len = 0;
-		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
-			nsegs = RPCRDMA_MAX_DATA_SEGS;
-		for (i = 0; i < nsegs;) {
-			rpcrdma_map_one(ia, seg, writing);
-			ipb[i].addr = seg->mr_dma;
-			ipb[i].size = seg->mr_len;
-			len += seg->mr_len;
-			++seg;
-			++i;
-			/* Check for holes */
-			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
-			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
-				break;
-		}
-		nsegs = i;
-		seg1->mr_base = seg1->mr_dma;
-		seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
-					ipb, nsegs, mem_priv, &seg1->mr_base);
-		if (IS_ERR(seg1->mr_chunk.rl_mr)) {
-			rc = PTR_ERR(seg1->mr_chunk.rl_mr);
-			dprintk("RPC:       %s: failed ib_reg_phys_mr "
-				"%u@0x%llx (%d)... status %i\n",
-				__func__, len,
-				(unsigned long long)seg1->mr_dma, nsegs, rc);
-			while (nsegs--)
-				rpcrdma_unmap_one(ia, --seg);
-		} else {
-			seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
-			seg1->mr_nsegs = nsegs;
-			seg1->mr_len = len;
-		}
-		}
+		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
 		break;
 	}
 	if (rc)
@@ -1473,7 +1782,6 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
 		struct rpcrdma_xprt *r_xprt, void *r)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct rpcrdma_mr_seg *seg1 = seg;
 	int nsegs = seg->mr_nsegs, rc;
 
 	switch (ia->ri_memreg_strategy) {
@@ -1486,56 +1794,21 @@ rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
 		break;
 #endif
 
+	case RPCRDMA_FRMR:
+		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
+		break;
+
 	case RPCRDMA_MTHCAFMR:
-		{
-		LIST_HEAD(l);
-		list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
-		rc = ib_unmap_fmr(&l);
-		while (seg1->mr_nsegs--)
-			rpcrdma_unmap_one(ia, seg++);
-		}
-		if (rc)
-			dprintk("RPC:       %s: failed ib_unmap_fmr,"
-				" status %i\n", __func__, rc);
+		rc = rpcrdma_deregister_fmr_external(seg, ia);
 		break;
 
 	case RPCRDMA_MEMWINDOWS_ASYNC:
 	case RPCRDMA_MEMWINDOWS:
-		{
-		struct ib_mw_bind param;
-		BUG_ON(nsegs != 1);
-		param.mr = ia->ri_bind_mem;
-		param.addr = 0ULL;	/* unbind */
-		param.length = 0;
-		param.mw_access_flags = 0;
-		if (r) {
-			param.wr_id = (u64) (unsigned long) r;
-			param.send_flags = IB_SEND_SIGNALED;
-			INIT_CQCOUNT(&r_xprt->rx_ep);
-		} else {
-			param.wr_id = 0ULL;
-			param.send_flags = 0;
-			DECR_CQCOUNT(&r_xprt->rx_ep);
-		}
-		rc = ib_bind_mw(ia->ri_id->qp,
-				seg->mr_chunk.rl_mw->r.mw, &param);
-		rpcrdma_unmap_one(ia, seg);
-		}
-		if (rc)
-			dprintk("RPC:       %s: failed ib_(un)bind_mw,"
-				" status %i\n", __func__, rc);
-		else
-			r = NULL;	/* will upcall on completion */
+		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
 		break;
 
 	default:
-		rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
-		seg1->mr_chunk.rl_mr = NULL;
-		while (seg1->mr_nsegs--)
-			rpcrdma_unmap_one(ia, seg++);
-		if (rc)
-			dprintk("RPC:       %s: failed ib_dereg_mr,"
-				" status %i\n", __func__, rc);
+		rc = rpcrdma_deregister_default_external(seg, ia);
 		break;
 	}
 	if (r) {
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 2427822f8bd4..c7a7eba991bc 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -51,6 +51,9 @@
 #include <linux/sunrpc/rpc_rdma.h> 	/* RPC/RDMA protocol */
 #include <linux/sunrpc/xprtrdma.h> 	/* xprt parameters */
 
+#define RDMA_RESOLVE_TIMEOUT	(5000)	/* 5 seconds */
+#define RDMA_CONNECT_RETRY_MAX	(2)	/* retries if no listener backlog */
+
 /*
  * Interface Adapter -- one per transport instance
  */
@@ -58,6 +61,8 @@ struct rpcrdma_ia {
 	struct rdma_cm_id 	*ri_id;
 	struct ib_pd		*ri_pd;
 	struct ib_mr		*ri_bind_mem;
+	u32			ri_dma_lkey;
+	int			ri_have_dma_lkey;
 	struct completion	ri_done;
 	int			ri_async_rc;
 	enum rpcrdma_memreg	ri_memreg_strategy;
@@ -156,6 +161,10 @@ struct rpcrdma_mr_seg {		/* chunk descriptors */
 			union {
 				struct ib_mw	*mw;
 				struct ib_fmr	*fmr;
+				struct {
+					struct ib_fast_reg_page_list *fr_pgl;
+					struct ib_mr *fr_mr;
+				} frmr;
 			} r;
 			struct list_head mw_list;
 		} *rl_mw;
@@ -175,6 +184,7 @@ struct rpcrdma_req {
 	size_t 		rl_size;	/* actual length of buffer */
 	unsigned int	rl_niovs;	/* 0, 2 or 4 */
 	unsigned int	rl_nchunks;	/* non-zero if chunks */
+	unsigned int	rl_connect_cookie;	/* retry detection */
 	struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
 	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
 	struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
@@ -198,7 +208,7 @@ struct rpcrdma_buffer {
 	atomic_t	rb_credits;	/* most recent server credits */
 	unsigned long	rb_cwndscale;	/* cached framework rpc_cwndscale */
 	int		rb_max_requests;/* client max requests */
-	struct list_head rb_mws;	/* optional memory windows/fmrs */
+	struct list_head rb_mws;	/* optional memory windows/fmrs/frmrs */
 	int		rb_send_index;
 	struct rpcrdma_req	**rb_send_bufs;
 	int		rb_recv_index;
@@ -273,6 +283,11 @@ struct rpcrdma_xprt {
 #define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt)
 #define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
 
+/* Setting this to 0 ensures interoperability with early servers.
+ * Setting this to 1 enhances certain unaligned read/write performance.
+ * Default is 0, see sysctl entry and rpc_rdma.c rpcrdma_convert_iovs() */
+extern int xprt_rdma_pad_optimize;
+
 /*
  * Interface Adapter calls - xprtrdma/verbs.c
  */