summary refs log tree commit diff
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c7
-rw-r--r--net/sunrpc/clnt.c9
-rw-r--r--net/sunrpc/netns.h2
-rw-r--r--net/sunrpc/stats.c10
-rw-r--r--net/sunrpc/sunrpc_syms.c2
-rw-r--r--net/sunrpc/svc_xprt.c11
-rw-r--r--net/sunrpc/svcsock.c43
-rw-r--r--net/sunrpc/xprt.c3
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c4
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c131
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c36
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c6
-rw-r--r--net/sunrpc/xprtrdma/transport.c34
-rw-r--r--net/sunrpc/xprtrdma/verbs.c37
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h34
-rw-r--r--net/sunrpc/xprtsock.c4
16 files changed, 216 insertions, 157 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index 3dfd769dc5b5..16cea00c959b 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -541,9 +541,13 @@ gss_setup_upcall(struct gss_auth *gss_auth, struct rpc_cred *cred)
 		return gss_new;
 	gss_msg = gss_add_msg(gss_new);
 	if (gss_msg == gss_new) {
-		int res = rpc_queue_upcall(gss_new->pipe, &gss_new->msg);
+		int res;
+		atomic_inc(&gss_msg->count);
+		res = rpc_queue_upcall(gss_new->pipe, &gss_new->msg);
 		if (res) {
 			gss_unhash_msg(gss_new);
+			atomic_dec(&gss_msg->count);
+			gss_release_msg(gss_new);
 			gss_msg = ERR_PTR(res);
 		}
 	} else
@@ -836,6 +840,7 @@ gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
 			warn_gssd();
 		gss_release_msg(gss_msg);
 	}
+	gss_release_msg(gss_msg);
 }
 
 static void gss_pipe_dentry_destroy(struct dentry *dir,
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 34dd7b26ee5f..1efbe48e794f 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1926,6 +1926,8 @@ call_connect_status(struct rpc_task *task)
 	case -EADDRINUSE:
 	case -ENOBUFS:
 	case -EPIPE:
+		xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
+					    task->tk_rqstp->rq_connect_cookie);
 		if (RPC_IS_SOFTCONN(task))
 			break;
 		/* retry with existing socket, after a delay */
@@ -2753,14 +2755,18 @@ EXPORT_SYMBOL_GPL(rpc_cap_max_reconnect_timeout);
 
 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
 {
+	rcu_read_lock();
 	xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
+	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
 
 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
 {
+	rcu_read_lock();
 	rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
 				 xprt);
+	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
 
@@ -2770,9 +2776,8 @@ bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
 	struct rpc_xprt_switch *xps;
 	bool ret;
 
-	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
-
 	rcu_read_lock();
+	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
 	ret = rpc_xprt_switch_has_addr(xps, sap);
 	rcu_read_unlock();
 	return ret;
diff --git a/net/sunrpc/netns.h b/net/sunrpc/netns.h
index df5826876535..394ce523174c 100644
--- a/net/sunrpc/netns.h
+++ b/net/sunrpc/netns.h
@@ -34,7 +34,7 @@ struct sunrpc_net {
 	struct proc_dir_entry *use_gssp_proc;
 };
 
-extern int sunrpc_net_id;
+extern unsigned int sunrpc_net_id;
 
 int ip_map_cache_create(struct net *);
 void ip_map_cache_destroy(struct net *);
diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c
index 2ecb994314c1..caeb01ad2b5a 100644
--- a/net/sunrpc/stats.c
+++ b/net/sunrpc/stats.c
@@ -157,15 +157,17 @@ void rpc_count_iostats_metrics(const struct rpc_task *task,
 	spin_lock(&op_metrics->om_lock);
 
 	op_metrics->om_ops++;
-	op_metrics->om_ntrans += req->rq_ntrans;
+	/* kernel API: om_ops must never become larger than om_ntrans */
+	op_metrics->om_ntrans += max(req->rq_ntrans, 1);
 	op_metrics->om_timeouts += task->tk_timeouts;
 
 	op_metrics->om_bytes_sent += req->rq_xmit_bytes_sent;
 	op_metrics->om_bytes_recv += req->rq_reply_bytes_recvd;
 
-	delta = ktime_sub(req->rq_xtime, task->tk_start);
-	op_metrics->om_queue = ktime_add(op_metrics->om_queue, delta);
-
+	if (ktime_to_ns(req->rq_xtime)) {
+		delta = ktime_sub(req->rq_xtime, task->tk_start);
+		op_metrics->om_queue = ktime_add(op_metrics->om_queue, delta);
+	}
 	op_metrics->om_rtt = ktime_add(op_metrics->om_rtt, req->rq_rtt);
 
 	delta = ktime_sub(now, task->tk_start);
diff --git a/net/sunrpc/sunrpc_syms.c b/net/sunrpc/sunrpc_syms.c
index ee5d3d253102..d1c330a7953a 100644
--- a/net/sunrpc/sunrpc_syms.c
+++ b/net/sunrpc/sunrpc_syms.c
@@ -24,7 +24,7 @@
 
 #include "netns.h"
 
-int sunrpc_net_id;
+unsigned int sunrpc_net_id;
 EXPORT_SYMBOL_GPL(sunrpc_net_id);
 
 static __net_init int sunrpc_init_net(struct net *net)
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index c3f652395a80..3bc1d61694cb 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -1002,14 +1002,8 @@ static void svc_age_temp_xprts(unsigned long closure)
 void svc_age_temp_xprts_now(struct svc_serv *serv, struct sockaddr *server_addr)
 {
 	struct svc_xprt *xprt;
-	struct svc_sock *svsk;
-	struct socket *sock;
 	struct list_head *le, *next;
 	LIST_HEAD(to_be_closed);
-	struct linger no_linger = {
-		.l_onoff = 1,
-		.l_linger = 0,
-	};
 
 	spin_lock_bh(&serv->sv_lock);
 	list_for_each_safe(le, next, &serv->sv_tempsocks) {
@@ -1027,10 +1021,7 @@ void svc_age_temp_xprts_now(struct svc_serv *serv, struct sockaddr *server_addr)
 		list_del_init(le);
 		xprt = list_entry(le, struct svc_xprt, xpt_list);
 		dprintk("svc_age_temp_xprts_now: closing %p\n", xprt);
-		svsk = container_of(xprt, struct svc_sock, sk_xprt);
-		sock = svsk->sk_sock;
-		kernel_setsockopt(sock, SOL_SOCKET, SO_LINGER,
-				  (char *)&no_linger, sizeof(no_linger));
+		xprt->xpt_ops->xpo_kill_temp_xprt(xprt);
 		svc_close_xprt(xprt);
 	}
 }
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 57625f64efd5..135ec2c11b3b 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -39,6 +39,7 @@
 #include <net/checksum.h>
 #include <net/ip.h>
 #include <net/ipv6.h>
+#include <net/udp.h>
 #include <net/tcp.h>
 #include <net/tcp_states.h>
 #include <asm/uaccess.h>
@@ -129,6 +130,18 @@ static void svc_release_skb(struct svc_rqst *rqstp)
 	}
 }
 
+static void svc_release_udp_skb(struct svc_rqst *rqstp)
+{
+	struct sk_buff *skb = rqstp->rq_xprt_ctxt;
+
+	if (skb) {
+		rqstp->rq_xprt_ctxt = NULL;
+
+		dprintk("svc: service %p, releasing skb %p\n", rqstp, skb);
+		consume_skb(skb);
+	}
+}
+
 union svc_pktinfo_u {
 	struct in_pktinfo pkti;
 	struct in6_pktinfo pkti6;
@@ -438,6 +451,21 @@ static int svc_tcp_has_wspace(struct svc_xprt *xprt)
 	return !test_bit(SOCK_NOSPACE, &svsk->sk_sock->flags);
 }
 
+static void svc_tcp_kill_temp_xprt(struct svc_xprt *xprt)
+{
+	struct svc_sock *svsk;
+	struct socket *sock;
+	struct linger no_linger = {
+		.l_onoff = 1,
+		.l_linger = 0,
+	};
+
+	svsk = container_of(xprt, struct svc_sock, sk_xprt);
+	sock = svsk->sk_sock;
+	kernel_setsockopt(sock, SOL_SOCKET, SO_LINGER,
+			  (char *)&no_linger, sizeof(no_linger));
+}
+
 /*
  * See net/ipv6/ip_sockglue.c : ip_cmsg_recv_pktinfo
  */
@@ -534,7 +562,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
 	err = kernel_recvmsg(svsk->sk_sock, &msg, NULL,
 			     0, 0, MSG_PEEK | MSG_DONTWAIT);
 	if (err >= 0)
-		skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err);
+		skb = skb_recv_udp(svsk->sk_sk, 0, 1, &err);
 
 	if (skb == NULL) {
 		if (err != -EAGAIN) {
@@ -575,7 +603,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
 			goto out_free;
 		}
 		local_bh_enable();
-		skb_free_datagram_locked(svsk->sk_sk, skb);
+		consume_skb(skb);
 	} else {
 		/* we can use it in-place */
 		rqstp->rq_arg.head[0].iov_base = skb->data;
@@ -602,8 +630,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
 
 	return len;
 out_free:
-	trace_kfree_skb(skb, svc_udp_recvfrom);
-	skb_free_datagram_locked(svsk->sk_sk, skb);
+	kfree_skb(skb);
 	return 0;
 }
 
@@ -648,6 +675,10 @@ static struct svc_xprt *svc_udp_accept(struct svc_xprt *xprt)
 	return NULL;
 }
 
+static void svc_udp_kill_temp_xprt(struct svc_xprt *xprt)
+{
+}
+
 static struct svc_xprt *svc_udp_create(struct svc_serv *serv,
 				       struct net *net,
 				       struct sockaddr *sa, int salen,
@@ -660,13 +691,14 @@ static struct svc_xprt_ops svc_udp_ops = {
 	.xpo_create = svc_udp_create,
 	.xpo_recvfrom = svc_udp_recvfrom,
 	.xpo_sendto = svc_udp_sendto,
-	.xpo_release_rqst = svc_release_skb,
+	.xpo_release_rqst = svc_release_udp_skb,
 	.xpo_detach = svc_sock_detach,
 	.xpo_free = svc_sock_free,
 	.xpo_prep_reply_hdr = svc_udp_prep_reply_hdr,
 	.xpo_has_wspace = svc_udp_has_wspace,
 	.xpo_accept = svc_udp_accept,
 	.xpo_secure_port = svc_sock_secure_port,
+	.xpo_kill_temp_xprt = svc_udp_kill_temp_xprt,
 };
 
 static struct svc_xprt_class svc_udp_class = {
@@ -1242,6 +1274,7 @@ static struct svc_xprt_ops svc_tcp_ops = {
 	.xpo_has_wspace = svc_tcp_has_wspace,
 	.xpo_accept = svc_tcp_accept,
 	.xpo_secure_port = svc_sock_secure_port,
+	.xpo_kill_temp_xprt = svc_tcp_kill_temp_xprt,
 };
 
 static struct svc_xprt_class svc_tcp_class = {
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 685e6d225414..9a6be030ca7d 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -669,7 +669,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 	spin_lock_bh(&xprt->transport_lock);
 	if (cookie != xprt->connect_cookie)
 		goto out;
-	if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
+	if (test_bit(XPRT_CLOSING, &xprt->state))
 		goto out;
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
@@ -772,6 +772,7 @@ void xprt_connect(struct rpc_task *task)
 	if (!xprt_connected(xprt)) {
 		task->tk_rqstp->rq_bytes_sent = 0;
 		task->tk_timeout = task->tk_rqstp->rq_timeout;
+		task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
 		rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
 
 		if (test_bit(XPRT_CLOSING, &xprt->state))
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index 2c472e1b4827..24fedd4b117e 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -55,7 +55,8 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
 	if (IS_ERR(rb))
 		goto out_fail;
 	req->rl_sendbuf = rb;
-	xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size);
+	xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base,
+		     min_t(size_t, size, PAGE_SIZE));
 	rpcrdma_set_xprtdata(rqst, req);
 	return 0;
 
@@ -191,6 +192,7 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
 	size_t maxmsg;
 
 	maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
+	maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE);
 	return maxmsg - RPCRDMA_HDRLEN_MIN;
 }
 
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 210949562786..47bed5333c7f 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -44,18 +44,20 @@
  * being done.
  *
  * When the underlying transport disconnects, MRs are left in one of
- * three states:
+ * four states:
  *
  * INVALID:	The MR was not in use before the QP entered ERROR state.
- *		(Or, the LOCAL_INV WR has not completed or flushed yet).
- *
- * STALE:	The MR was being registered or unregistered when the QP
- *		entered ERROR state, and the pending WR was flushed.
  *
  * VALID:	The MR was registered before the QP entered ERROR state.
  *
- * When frwr_op_map encounters STALE and VALID MRs, they are recovered
- * with ib_dereg_mr and then are re-initialized. Beause MR recovery
+ * FLUSHED_FR:	The MR was being registered when the QP entered ERROR
+ *		state, and the pending WR was flushed.
+ *
+ * FLUSHED_LI:	The MR was being invalidated when the QP entered ERROR
+ *		state, and the pending WR was flushed.
+ *
+ * When frwr_op_map encounters FLUSHED and VALID MRs, they are recovered
+ * with ib_dereg_mr and then are re-initialized. Because MR recovery
  * allocates fresh resources, it is deferred to a workqueue, and the
  * recovered MRs are placed back on the rb_mws list when recovery is
  * complete. frwr_op_map allocates another MR for the current RPC while
@@ -99,7 +101,7 @@ frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 	struct rpcrdma_frmr *f = &r->frmr;
 	int rc;
 
-	f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, depth);
+	f->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype, depth);
 	if (IS_ERR(f->fr_mr))
 		goto out_mr_err;
 
@@ -155,7 +157,7 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 		return rc;
 	}
 
-	f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG,
+	f->fr_mr = ib_alloc_mr(ia->ri_pd, ia->ri_mrtype,
 			       ia->ri_max_frmr_depth);
 	if (IS_ERR(f->fr_mr)) {
 		pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n",
@@ -169,20 +171,19 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
 }
 
 /* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
- *
- * There's no recovery if this fails. The FRMR is abandoned, but
- * remains in rb_all. It will be cleaned up when the transport is
- * destroyed.
  */
 static void
 frwr_op_recover_mr(struct rpcrdma_mw *mw)
 {
+	enum rpcrdma_frmr_state state = mw->frmr.fr_state;
 	struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	int rc;
 
 	rc = __frwr_reset_mr(ia, mw);
-	ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
+	if (state != FRMR_FLUSHED_LI)
+		ib_dma_unmap_sg(ia->ri_device,
+				mw->mw_sg, mw->mw_nents, mw->mw_dir);
 	if (rc)
 		goto out_release;
 
@@ -205,11 +206,16 @@ static int
 frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 	     struct rpcrdma_create_data_internal *cdata)
 {
+	struct ib_device_attr *attrs = &ia->ri_device->attrs;
 	int depth, delta;
 
+	ia->ri_mrtype = IB_MR_TYPE_MEM_REG;
+	if (attrs->device_cap_flags & IB_DEVICE_SG_GAPS_REG)
+		ia->ri_mrtype = IB_MR_TYPE_SG_GAPS;
+
 	ia->ri_max_frmr_depth =
 			min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
-			      ia->ri_device->attrs.max_fast_reg_page_list_len);
+			      attrs->max_fast_reg_page_list_len);
 	dprintk("RPC:       %s: device's max FR page list len = %u\n",
 		__func__, ia->ri_max_frmr_depth);
 
@@ -236,8 +242,8 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
 	}
 
 	ep->rep_attr.cap.max_send_wr *= depth;
-	if (ep->rep_attr.cap.max_send_wr > ia->ri_device->attrs.max_qp_wr) {
-		cdata->max_requests = ia->ri_device->attrs.max_qp_wr / depth;
+	if (ep->rep_attr.cap.max_send_wr > attrs->max_qp_wr) {
+		cdata->max_requests = attrs->max_qp_wr / depth;
 		if (!cdata->max_requests)
 			return -EINVAL;
 		ep->rep_attr.cap.max_send_wr = cdata->max_requests *
@@ -262,10 +268,8 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
 }
 
 static void
-__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
-			    const char *wr)
+__frwr_sendcompletion_flush(struct ib_wc *wc, const char *wr)
 {
-	frmr->fr_state = FRMR_IS_STALE;
 	if (wc->status != IB_WC_WR_FLUSH_ERR)
 		pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
 		       wr, ib_wc_status_msg(wc->status),
@@ -288,7 +292,8 @@ frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
 	if (wc->status != IB_WC_SUCCESS) {
 		cqe = wc->wr_cqe;
 		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
-		__frwr_sendcompletion_flush(wc, frmr, "fastreg");
+		frmr->fr_state = FRMR_FLUSHED_FR;
+		__frwr_sendcompletion_flush(wc, "fastreg");
 	}
 }
 
@@ -308,7 +313,8 @@ frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
 	if (wc->status != IB_WC_SUCCESS) {
 		cqe = wc->wr_cqe;
 		frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
-		__frwr_sendcompletion_flush(wc, frmr, "localinv");
+		frmr->fr_state = FRMR_FLUSHED_LI;
+		__frwr_sendcompletion_flush(wc, "localinv");
 	}
 }
 
@@ -328,8 +334,10 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
 	/* WARNING: Only wr_cqe and status are reliable at this point */
 	cqe = wc->wr_cqe;
 	frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
-	if (wc->status != IB_WC_SUCCESS)
-		__frwr_sendcompletion_flush(wc, frmr, "localinv");
+	if (wc->status != IB_WC_SUCCESS) {
+		frmr->fr_state = FRMR_FLUSHED_LI;
+		__frwr_sendcompletion_flush(wc, "localinv");
+	}
 	complete(&frmr->fr_linv_done);
 }
 
@@ -341,6 +349,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 	    int nsegs, bool writing, struct rpcrdma_mw **out)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS;
 	struct rpcrdma_mw *mw;
 	struct rpcrdma_frmr *frmr;
 	struct ib_mr *mr;
@@ -376,8 +385,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
 		++seg;
 		++i;
-
-		/* Check for holes */
+		if (holes_ok)
+			continue;
 		if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
@@ -414,7 +423,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 			 IB_ACCESS_REMOTE_READ;
 
-	DECR_CQCOUNT(&r_xprt->rx_ep);
+	rpcrdma_set_signaled(&r_xprt->rx_ep, &reg_wr->wr);
 	rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
 	if (rc)
 		goto out_senderr;
@@ -444,26 +453,6 @@ out_senderr:
 	return -ENOTCONN;
 }
 
-static struct ib_send_wr *
-__frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
-{
-	struct rpcrdma_frmr *f = &mw->frmr;
-	struct ib_send_wr *invalidate_wr;
-
-	dprintk("RPC:       %s: invalidating frmr %p\n", __func__, f);
-
-	f->fr_state = FRMR_IS_INVALID;
-	invalidate_wr = &f->fr_invwr;
-
-	memset(invalidate_wr, 0, sizeof(*invalidate_wr));
-	f->fr_cqe.done = frwr_wc_localinv;
-	invalidate_wr->wr_cqe = &f->fr_cqe;
-	invalidate_wr->opcode = IB_WR_LOCAL_INV;
-	invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
-
-	return invalidate_wr;
-}
-
 /* Invalidate all memory regions that were registered for "req".
  *
  * Sleeps until it is safe for the host CPU to access the
@@ -474,12 +463,12 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw)
 static void
 frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 {
-	struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr;
+	struct ib_send_wr *first, **prev, *last, *bad_wr;
 	struct rpcrdma_rep *rep = req->rl_reply;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_mw *mw, *tmp;
 	struct rpcrdma_frmr *f;
-	int rc;
+	int count, rc;
 
 	dprintk("RPC:       %s: req %p\n", __func__, req);
 
@@ -489,22 +478,29 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * a single ib_post_send() call.
 	 */
 	f = NULL;
-	invalidate_wrs = pos = prev = NULL;
+	count = 0;
+	prev = &first;
 	list_for_each_entry(mw, &req->rl_registered, mw_list) {
+		mw->frmr.fr_state = FRMR_IS_INVALID;
+
 		if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
-		    (mw->mw_handle == rep->rr_inv_rkey)) {
-			mw->frmr.fr_state = FRMR_IS_INVALID;
+		    (mw->mw_handle == rep->rr_inv_rkey))
 			continue;
-		}
-
-		pos = __frwr_prepare_linv_wr(mw);
 
-		if (!invalidate_wrs)
-			invalidate_wrs = pos;
-		else
-			prev->next = pos;
-		prev = pos;
 		f = &mw->frmr;
+		dprintk("RPC:       %s: invalidating frmr %p\n",
+			__func__, f);
+
+		f->fr_cqe.done = frwr_wc_localinv;
+		last = &f->fr_invwr;
+		memset(last, 0, sizeof(*last));
+		last->wr_cqe = &f->fr_cqe;
+		last->opcode = IB_WR_LOCAL_INV;
+		last->ex.invalidate_rkey = mw->mw_handle;
+		count++;
+
+		*prev = last;
+		prev = &last->next;
 	}
 	if (!f)
 		goto unmap;
@@ -513,17 +509,22 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 * last WR in the chain completes, all WRs in the chain
 	 * are complete.
 	 */
-	f->fr_invwr.send_flags = IB_SEND_SIGNALED;
+	last->send_flags = IB_SEND_SIGNALED;
 	f->fr_cqe.done = frwr_wc_localinv_wake;
 	reinit_completion(&f->fr_linv_done);
-	INIT_CQCOUNT(&r_xprt->rx_ep);
+
+	/* Initialize CQ count, since there is always a signaled
+	 * WR being posted here.  The new cqcount depends on how
+	 * many SQEs are about to be consumed.
+	 */
+	rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
 
 	/* Transport disconnect drains the receive CQ before it
 	 * replaces the QP. The RPC reply handler won't call us
 	 * unless ri_id->qp is a valid pointer.
 	 */
 	r_xprt->rx_stats.local_inv_needed++;
-	rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
+	rc = ib_post_send(ia->ri_id->qp, first, &bad_wr);
 	if (rc)
 		goto reset_mrs;
 
@@ -534,7 +535,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
 	 */
 unmap:
 	list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) {
-		dprintk("RPC:       %s: unmapping frmr %p\n",
+		dprintk("RPC:       %s: DMA unmapping frmr %p\n",
 			__func__, &mw->frmr);
 		list_del_init(&mw->mw_list);
 		ib_dma_unmap_sg(ia->ri_device,
@@ -552,7 +553,7 @@ reset_mrs:
 	 */
 	list_for_each_entry(mw, &req->rl_registered, mw_list) {
 		f = &mw->frmr;
-		if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) {
+		if (mw->mw_handle == bad_wr->ex.invalidate_rkey) {
 			__frwr_reset_mr(ia, mw);
 			bad_wr = bad_wr->next;
 		}
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index d987c2d3dd6e..c52e0f2ffe52 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -786,7 +786,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp)
 		ifdebug(FACILITY) {
 			u64 off;
 			xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
-			dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
+			dprintk("RPC:       %s: chunk %d@0x%016llx:0x%08x\n",
 				__func__,
 				be32_to_cpu(seg->rs_length),
 				(unsigned long long)off,
@@ -906,28 +906,6 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
 	return fixup_copy_count;
 }
 
-void
-rpcrdma_connect_worker(struct work_struct *work)
-{
-	struct rpcrdma_ep *ep =
-		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
-	struct rpcrdma_xprt *r_xprt =
-		container_of(ep, struct rpcrdma_xprt, rx_ep);
-	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
-
-	spin_lock_bh(&xprt->transport_lock);
-	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
-		++xprt->connect_cookie;
-	if (ep->rep_connected > 0) {
-		if (!xprt_test_and_set_connected(xprt))
-			xprt_wake_pending_tasks(xprt, 0);
-	} else {
-		if (xprt_test_and_clear_connected(xprt))
-			xprt_wake_pending_tasks(xprt, -ENOTCONN);
-	}
-	spin_unlock_bh(&xprt->transport_lock);
-}
-
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 /* By convention, backchannel calls arrive via rdma_msg type
  * messages, and never populate the chunk lists. This makes
@@ -959,18 +937,6 @@ rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
 }
 #endif	/* CONFIG_SUNRPC_BACKCHANNEL */
 
-/*
- * This function is called when an async event is posted to
- * the connection which changes the connection state. All it
- * does at this point is mark the connection up/down, the rpc
- * timers do the rest.
- */
-void
-rpcrdma_conn_func(struct rpcrdma_ep *ep)
-{
-	schedule_delayed_work(&ep->rep_connect_worker, 0);
-}
-
 /* Process received RPC/RDMA messages.
  *
  * Errors must result in the RPC task either being awakened, or
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index bdbc248e1c14..ca2799af05a6 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -68,6 +68,7 @@ static void svc_rdma_detach(struct svc_xprt *xprt);
 static void svc_rdma_free(struct svc_xprt *xprt);
 static int svc_rdma_has_wspace(struct svc_xprt *xprt);
 static int svc_rdma_secure_port(struct svc_rqst *);
+static void svc_rdma_kill_temp_xprt(struct svc_xprt *);
 
 static struct svc_xprt_ops svc_rdma_ops = {
 	.xpo_create = svc_rdma_create,
@@ -80,6 +81,7 @@ static struct svc_xprt_ops svc_rdma_ops = {
 	.xpo_has_wspace = svc_rdma_has_wspace,
 	.xpo_accept = svc_rdma_accept,
 	.xpo_secure_port = svc_rdma_secure_port,
+	.xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt,
 };
 
 struct svc_xprt_class svc_rdma_class = {
@@ -1291,6 +1293,10 @@ static int svc_rdma_secure_port(struct svc_rqst *rqstp)
 	return 1;
 }
 
+static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
+{
+}
+
 int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
 {
 	struct ib_send_wr *bad_wr, *n_wr;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index ed5e285fd2ea..534c178d2a7e 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -219,6 +219,34 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt)
 		}
 }
 
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
+{
+	schedule_delayed_work(&ep->rep_connect_worker, 0);
+}
+
+void
+rpcrdma_connect_worker(struct work_struct *work)
+{
+	struct rpcrdma_ep *ep =
+		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
+	struct rpcrdma_xprt *r_xprt =
+		container_of(ep, struct rpcrdma_xprt, rx_ep);
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+	spin_lock_bh(&xprt->transport_lock);
+	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
+		++xprt->connect_cookie;
+	if (ep->rep_connected > 0) {
+		if (!xprt_test_and_set_connected(xprt))
+			xprt_wake_pending_tasks(xprt, 0);
+	} else {
+		if (xprt_test_and_clear_connected(xprt))
+			xprt_wake_pending_tasks(xprt, -ENOTCONN);
+	}
+	spin_unlock_bh(&xprt->transport_lock);
+}
+
 static void
 xprt_rdma_connect_worker(struct work_struct *work)
 {
@@ -621,7 +649,8 @@ xprt_rdma_free(struct rpc_task *task)
 
 	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
+	if (unlikely(!list_empty(&req->rl_registered)))
+		ia->ri_ops->ro_unmap_safe(r_xprt, req, !RPC_IS_ASYNC(task));
 	rpcrdma_unmap_sges(ia, req);
 	rpcrdma_buffer_put(req);
 }
@@ -657,7 +686,8 @@ xprt_rdma_send_request(struct rpc_task *task)
 	int rc = 0;
 
 	/* On retransmit, remove any previously registered chunks */
-	r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
+	if (unlikely(!list_empty(&req->rl_registered)))
+		r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
 
 	rc = rpcrdma_marshal_req(rqst);
 	if (rc < 0)
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ec74289af7ec..11d07748f699 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -103,9 +103,9 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
 {
 	struct rpcrdma_ep *ep = context;
 
-	pr_err("RPC:       %s: %s on device %s ep %p\n",
-	       __func__, ib_event_msg(event->event),
-		event->device->name, context);
+	pr_err("rpcrdma: %s on device %s ep %p\n",
+	       ib_event_msg(event->event), event->device->name, context);
+
 	if (ep->rep_connected == 1) {
 		ep->rep_connected = -EIO;
 		rpcrdma_conn_func(ep);
@@ -223,8 +223,8 @@ rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
 		cdata->inline_rsize = rsize;
 	if (wsize < cdata->inline_wsize)
 		cdata->inline_wsize = wsize;
-	pr_info("rpcrdma: max send %u, max recv %u\n",
-		cdata->inline_wsize, cdata->inline_rsize);
+	dprintk("RPC:       %s: max send %u, max recv %u\n",
+		__func__, cdata->inline_wsize, cdata->inline_rsize);
 	rpcrdma_set_max_header_sizes(r_xprt);
 }
 
@@ -331,6 +331,7 @@ static struct rdma_cm_id *
 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 			struct rpcrdma_ia *ia, struct sockaddr *addr)
 {
+	unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
 	struct rdma_cm_id *id;
 	int rc;
 
@@ -352,8 +353,12 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 			__func__, rc);
 		goto out;
 	}
-	wait_for_completion_interruptible_timeout(&ia->ri_done,
-				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
+	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
+	if (rc < 0) {
+		dprintk("RPC:       %s: wait() exited: %i\n",
+			__func__, rc);
+		goto out;
+	}
 
 	/* FIXME:
 	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
@@ -376,8 +381,12 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
 			__func__, rc);
 		goto put;
 	}
-	wait_for_completion_interruptible_timeout(&ia->ri_done,
-				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
+	rc = wait_for_completion_interruptible_timeout(&ia->ri_done, wtimeout);
+	if (rc < 0) {
+		dprintk("RPC:       %s: wait() exited: %i\n",
+			__func__, rc);
+		goto put;
+	}
 	rc = ia->ri_async_rc;
 	if (rc)
 		goto put;
@@ -532,7 +541,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
 	if (ep->rep_cqinit <= 2)
 		ep->rep_cqinit = 0;	/* always signal? */
-	INIT_CQCOUNT(ep);
+	rpcrdma_init_cqcount(ep, 0);
 	init_waitqueue_head(&ep->rep_connect_wait);
 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
@@ -1311,13 +1320,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
 	dprintk("RPC:       %s: posting %d s/g entries\n",
 		__func__, send_wr->num_sge);
 
-	if (DECR_CQCOUNT(ep) > 0)
-		send_wr->send_flags = 0;
-	else { /* Provider must take a send completion every now and then */
-		INIT_CQCOUNT(ep);
-		send_wr->send_flags = IB_SEND_SIGNALED;
-	}
-
+	rpcrdma_set_signaled(ep, send_wr);
 	rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
 	if (rc)
 		goto out_postsend_err;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 0d35b761c883..e35efd4ac1e4 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -75,6 +75,7 @@ struct rpcrdma_ia {
 	unsigned int		ri_max_inline_write;
 	unsigned int		ri_max_inline_read;
 	bool			ri_reminv_expected;
+	enum ib_mr_type		ri_mrtype;
 	struct ib_qp_attr	ri_qp_attr;
 	struct ib_qp_init_attr	ri_qp_init_attr;
 };
@@ -95,8 +96,24 @@ struct rpcrdma_ep {
 	struct delayed_work	rep_connect_worker;
 };
 
-#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
-#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
+static inline void
+rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count)
+{
+	atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count);
+}
+
+/* To update send queue accounting, provider must take a
+ * send completion every now and then.
+ */
+static inline void
+rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr)
+{
+	send_wr->send_flags = 0;
+	if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) {
+		rpcrdma_init_cqcount(ep, 0);
+		send_wr->send_flags = IB_SEND_SIGNALED;
+	}
+}
 
 /* Pre-allocate extra Work Requests for handling backward receives
  * and sends. This is a fixed value because the Work Queues are
@@ -216,7 +233,8 @@ struct rpcrdma_rep {
 enum rpcrdma_frmr_state {
 	FRMR_IS_INVALID,	/* ready to be used */
 	FRMR_IS_VALID,		/* in use */
-	FRMR_IS_STALE,		/* failed completion */
+	FRMR_FLUSHED_FR,	/* flushed FASTREG WR */
+	FRMR_FLUSHED_LI,	/* flushed LOCALINV WR */
 };
 
 struct rpcrdma_frmr {
@@ -472,6 +490,7 @@ int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
 				struct rpcrdma_create_data_internal *);
 void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
 int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+void rpcrdma_conn_func(struct rpcrdma_ep *ep);
 void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
 
 int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
@@ -531,13 +550,6 @@ rpcrdma_data_dir(bool writing)
 }
 
 /*
- * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
- */
-void rpcrdma_connect_worker(struct work_struct *);
-void rpcrdma_conn_func(struct rpcrdma_ep *);
-void rpcrdma_reply_handler(struct work_struct *);
-
-/*
  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
  */
 
@@ -554,12 +566,14 @@ bool rpcrdma_prepare_send_sges(struct rpcrdma_ia *, struct rpcrdma_req *,
 void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
 int rpcrdma_marshal_req(struct rpc_rqst *);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
+void rpcrdma_reply_handler(struct work_struct *work);
 
 /* RPC/RDMA module init - xprtrdma/transport.c
  */
 extern unsigned int xprt_rdma_max_inline_read;
 void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap);
 void xprt_rdma_free_addresses(struct rpc_xprt *xprt);
+void rpcrdma_connect_worker(struct work_struct *work);
 void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq);
 int xprt_rdma_init(void);
 void xprt_rdma_cleanup(void);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index e01c825bc683..af392d9b9cec 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -1080,10 +1080,10 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
 	if (sk == NULL)
 		goto out;
 	for (;;) {
-		skb = skb_recv_datagram(sk, 0, 1, &err);
+		skb = skb_recv_udp(sk, 0, 1, &err);
 		if (skb != NULL) {
 			xs_udp_data_read_skb(&transport->xprt, sk, skb);
-			skb_free_datagram_locked(sk, skb);
+			consume_skb(skb);
 			continue;
 		}
 		if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))