summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--net/rds/ib.h1
-rw-r--r--net/rds/ib_cm.c14
-rw-r--r--net/rds/ib_send.c47
3 files changed, 54 insertions, 8 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index acda2dbc6576..a13ced504145 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -108,6 +108,7 @@ struct rds_ib_connection {
 	struct rds_header	*i_send_hdrs;
 	u64			i_send_hdrs_dma;
 	struct rds_ib_send_work *i_sends;
+	atomic_t		i_signaled_sends;
 
 	/* rx */
 	struct tasklet_struct	i_recv_tasklet;
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 10f6a8815cd0..123c7d33b54e 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -615,11 +615,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
 		}
 
 		/*
-		 * Don't wait for the send ring to be empty -- there may be completed
-		 * non-signaled entries sitting on there. We unmap these below.
+		 * We want to wait for tx and rx completion to finish
+		 * before we tear down the connection, but we have to be
+		 * careful not to get stuck waiting on a send ring that
+		 * only has unsignaled sends in it.  We've shutdown new
+		 * sends before getting here so by waiting for signaled
+		 * sends to complete we're ensured that there will be no
+		 * more tx processing.
 		 */
 		wait_event(rds_ib_ring_empty_wait,
-			rds_ib_ring_empty(&ic->i_recv_ring));
+			   rds_ib_ring_empty(&ic->i_recv_ring) &&
+			   (atomic_read(&ic->i_signaled_sends) == 0));
+		tasklet_kill(&ic->i_recv_tasklet);
 
 		if (ic->i_send_hdrs)
 			ib_dma_free_coherent(dev,
@@ -729,6 +736,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 #ifndef KERNEL_HAS_ATOMIC64
 	spin_lock_init(&ic->i_ack_lock);
 #endif
+	atomic_set(&ic->i_signaled_sends, 0);
 
 	/*
 	 * rds_ib_conn_shutdown() waits for these to be emptied so they
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e88cb4af009b..15f75692574c 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -220,6 +220,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
 }
 
 /*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+	if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+	    waitqueue_active(&rds_ib_ring_empty_wait))
+		wake_up(&rds_ib_ring_empty_wait);
+	BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
+/*
  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
  * operations performed in the send path.  As the sender allocs and potentially
  * unallocs the next free entry in the ring it doesn't alter which is
@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 	u32 oldest;
 	u32 i = 0;
 	int ret;
+	int nr_sig = 0;
 
 	rdsdebug("cq %p conn %p\n", cq, conn);
 	rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 
 		for (i = 0; i < completed; i++) {
 			send = &ic->i_sends[oldest];
+			if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+				nr_sig++;
 
 			rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 		}
 
 		rds_ib_ring_free(&ic->i_send_ring, completed);
+		rds_ib_sub_signaled(ic, nr_sig);
+		nr_sig = 0;
 
 		if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
 		    test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
-					      struct rds_ib_send_work *send,
-					      bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+					     struct rds_ib_send_work *send,
+					     bool notify)
 {
 	/*
 	 * We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
 	if (ic->i_unsignaled_wrs-- == 0 || notify) {
 		ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
 		send->s_wr.send_flags |= IB_SEND_SIGNALED;
+		return 1;
 	}
+	return 0;
 }
 
 /*
@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 	int bytes_sent = 0;
 	int ret;
 	int flow_controlled = 0;
+	int nr_sig = 0;
 
 	BUG_ON(off % RDS_FRAG_SIZE);
 	BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 		if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
 			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 
+		if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+			nr_sig++;
+
 		rdsdebug("send %p wr %p num_sge %u next %p\n", send,
 			 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 	if (ic->i_flowctl && i < credit_alloc)
 		rds_ib_send_add_credits(conn, credit_alloc - i);
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	/* XXX need to worry about failed_wr and partial sends. */
 	failed_wr = &first->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 		printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		if (prev->s_op) {
 			ic->i_data_op = prev->s_op;
 			prev->s_op = NULL;
@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 	u32 pos;
 	u32 work_alloc;
 	int ret;
+	int nr_sig = 0;
 
 	rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 		send->s_wr.wr.atomic.compare_add = op->op_swap_add;
 		send->s_wr.wr.atomic.swap = 0;
 	}
-	rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+	nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 	send->s_wr.num_sge = 1;
 	send->s_wr.next = NULL;
 	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 	rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
 		 send->s_sge[0].addr, send->s_sge[0].length);
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	failed_wr = &send->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
 	rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
 		printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		goto out;
 	}
 
@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 	int sent;
 	int ret;
 	int num_sge;
+	int nr_sig = 0;
 
 	/* map the op the first time we see it */
 	if (!op->op_mapped) {
@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 		send->s_queued = jiffies;
 		send->s_op = NULL;
 
-		rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+		nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
 		send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
 		send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 		work_alloc = i;
 	}
 
+	if (nr_sig)
+		atomic_add(nr_sig, &ic->i_signaled_sends);
+
 	failed_wr = &first->s_wr;
 	ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
 	rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 		printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
 		       "returned %d\n", &conn->c_faddr, ret);
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+		rds_ib_sub_signaled(ic, nr_sig);
 		goto out;
 	}