summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorTrond Myklebust <trond.myklebust@primarydata.com>2016-07-24 17:08:31 -0400
committerTrond Myklebust <trond.myklebust@primarydata.com>2016-07-24 17:08:31 -0400
commit7f94ed24958d790687296701175cc43a6027c6c5 (patch)
tree51c65aa49eb29a572aefc69816489a38ea4650fd /net
parent149a4fddd0a72d526abbeac0c8deaab03559836a (diff)
parentce272302dd8f477b4d7de9b145b6b42da7e4292d (diff)
downloadlinux-7f94ed24958d790687296701175cc43a6027c6c5.tar.gz
Merge branch 'sunrpc'
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/clnt.c2
-rw-r--r--net/sunrpc/sched.c67
-rw-r--r--net/sunrpc/xprt.c14
-rw-r--r--net/sunrpc/xprtmultipath.c8
-rw-r--r--net/sunrpc/xprtsock.c113
5 files changed, 130 insertions, 74 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 2808d550d273..cb49898a5a58 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -2577,7 +2577,7 @@ static void rpc_cb_add_xprt_release(void *calldata)
 	kfree(data);
 }
 
-const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
+static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
 	.rpc_call_done = rpc_cb_add_xprt_done,
 	.rpc_release = rpc_cb_add_xprt_release,
 };
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index fcfd48d263f6..9ae588511aaf 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue;
 /*
  * rpciod-related stuff
  */
-struct workqueue_struct *rpciod_workqueue;
+struct workqueue_struct *rpciod_workqueue __read_mostly;
+struct workqueue_struct *xprtiod_workqueue __read_mostly;
 
 /*
  * Disable the timer for a given RPC task. Should be called with
@@ -329,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
  * lockless RPC_IS_QUEUED() test) before we've had a chance to test
  * the RPC_TASK_RUNNING flag.
  */
-static void rpc_make_runnable(struct rpc_task *task)
+static void rpc_make_runnable(struct workqueue_struct *wq,
+		struct rpc_task *task)
 {
 	bool need_wakeup = !rpc_test_and_set_running(task);
 
@@ -338,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
 		return;
 	if (RPC_IS_ASYNC(task)) {
 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
-		queue_work(rpciod_workqueue, &task->u.tk_work);
+		queue_work(wq, &task->u.tk_work);
 	} else
 		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
@@ -407,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
 
 /**
- * __rpc_do_wake_up_task - wake up a single rpc_task
+ * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
+ * @wq: workqueue on which to run task
  * @queue: wait queue
  * @task: task to be woken up
  *
  * Caller must hold queue->lock, and have cleared the task queued flag.
  */
-static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue,
+		struct rpc_task *task)
 {
 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
 			task->tk_pid, jiffies);
@@ -428,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
 
 	__rpc_remove_wait_queue(queue, task);
 
-	rpc_make_runnable(task);
+	rpc_make_runnable(wq, task);
 
 	dprintk("RPC:       __rpc_wake_up_task done\n");
 }
@@ -436,16 +441,25 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
 /*
  * Wake up a queued task while the queue lock is being held
  */
-static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue, struct rpc_task *task)
 {
 	if (RPC_IS_QUEUED(task)) {
 		smp_rmb();
 		if (task->tk_waitqueue == queue)
-			__rpc_do_wake_up_task(queue, task);
+			__rpc_do_wake_up_task_on_wq(wq, queue, task);
 	}
 }
 
 /*
+ * Wake up a queued task while the queue lock is being held
+ */
+static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
+{
+	rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
+}
+
+/*
  * Wake up a task on a specific queue
  */
 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
@@ -518,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
 /*
  * Wake up the first task on the wait queue.
  */
-struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
+		struct rpc_wait_queue *queue,
 		bool (*func)(struct rpc_task *, void *), void *data)
 {
 	struct rpc_task	*task = NULL;
@@ -529,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
 	task = __rpc_find_next_queued(queue);
 	if (task != NULL) {
 		if (func(task, data))
-			rpc_wake_up_task_queue_locked(queue, task);
+			rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
 		else
 			task = NULL;
 	}
@@ -537,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
 
 	return task;
 }
+
+/*
+ * Wake up the first task on the wait queue.
+ */
+struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+		bool (*func)(struct rpc_task *, void *), void *data)
+{
+	return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
+}
 EXPORT_SYMBOL_GPL(rpc_wake_up_first);
 
 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
@@ -814,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
 	bool is_async = RPC_IS_ASYNC(task);
 
 	rpc_set_active(task);
-	rpc_make_runnable(task);
+	rpc_make_runnable(rpciod_workqueue, task);
 	if (!is_async)
 		__rpc_execute(task);
 }
@@ -1071,10 +1095,22 @@ static int rpciod_start(void)
 	 * Create the rpciod thread and wait for it to start.
 	 */
 	dprintk("RPC:       creating workqueue rpciod\n");
-	/* Note: highpri because network receive is latency sensitive */
-	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
+	if (!wq)
+		goto out_failed;
 	rpciod_workqueue = wq;
-	return rpciod_workqueue != NULL;
+	/* Note: highpri because network receive is latency sensitive */
+	wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+	if (!wq)
+		goto free_rpciod;
+	xprtiod_workqueue = wq;
+	return 1;
+free_rpciod:
+	wq = rpciod_workqueue;
+	rpciod_workqueue = NULL;
+	destroy_workqueue(wq);
+out_failed:
+	return 0;
 }
 
 static void rpciod_stop(void)
@@ -1088,6 +1124,9 @@ static void rpciod_stop(void)
 	wq = rpciod_workqueue;
 	rpciod_workqueue = NULL;
 	destroy_workqueue(wq);
+	wq = xprtiod_workqueue;
+	xprtiod_workqueue = NULL;
+	destroy_workqueue(wq);
 }
 
 void
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 216a1385718a..8313960cac52 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
 		clear_bit(XPRT_LOCKED, &xprt->state);
 		smp_mb__after_atomic();
 	} else
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 }
 
 /*
@@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		return;
 
-	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
+	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
+				__xprt_lock_write_func, xprt))
 		return;
 	xprt_clear_locked(xprt);
 }
@@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 		return;
 	if (RPCXPRT_CONGESTED(xprt))
 		goto out_unlock;
-	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
+	if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
+				__xprt_lock_write_cong_func, xprt))
 		return;
 out_unlock:
 	xprt_clear_locked(xprt);
@@ -645,7 +647,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 	spin_unlock_bh(&xprt->transport_lock);
 }
@@ -672,7 +674,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	/* Try to schedule an autoclose RPC call */
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
-		queue_work(rpciod_workqueue, &xprt->task_cleanup);
+		queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	xprt_wake_pending_tasks(xprt, -EAGAIN);
 out:
 	spin_unlock_bh(&xprt->transport_lock);
@@ -689,7 +691,7 @@ xprt_init_autodisconnect(unsigned long data)
 	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
 		goto out_abort;
 	spin_unlock(&xprt->transport_lock);
-	queue_work(rpciod_workqueue, &xprt->task_cleanup);
+	queue_work(xprtiod_workqueue, &xprt->task_cleanup);
 	return;
 out_abort:
 	spin_unlock(&xprt->transport_lock);
diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c
index e7fd76975d86..66c9d63f4797 100644
--- a/net/sunrpc/xprtmultipath.c
+++ b/net/sunrpc/xprtmultipath.c
@@ -271,14 +271,12 @@ struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
 		xprt_switch_find_xprt_t find_next)
 {
 	struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
-	struct list_head *head;
 
 	if (xps == NULL)
 		return NULL;
-	head = &xps->xps_xprt_list;
-	if (xps->xps_nxprts < 2)
-		return xprt_switch_find_first_entry(head);
-	return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
+	return xprt_switch_set_next_cursor(&xps->xps_xprt_list,
+			&xpi->xpi_cursor,
+			find_next);
 }
 
 static
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 83e6f3316149..111767ab124a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -642,6 +642,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 	struct xdr_buf *xdr = &req->rq_snd_buf;
 	bool zerocopy = true;
+	bool vm_wait = false;
 	int status;
 	int sent;
 
@@ -677,15 +678,33 @@ static int xs_tcp_send_request(struct rpc_task *task)
 			return 0;
 		}
 
+		WARN_ON_ONCE(sent == 0 && status == 0);
+
+		if (status == -EAGAIN ) {
+			/*
+			 * Return EAGAIN if we're sure we're hitting the
+			 * socket send buffer limits.
+			 */
+			if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
+				break;
+			/*
+			 * Did we hit a memory allocation failure?
+			 */
+			if (sent == 0) {
+				status = -ENOBUFS;
+				if (vm_wait)
+					break;
+				/* Retry, knowing now that we're below the
+				 * socket send buffer limit
+				 */
+				vm_wait = true;
+			}
+			continue;
+		}
 		if (status < 0)
 			break;
-		if (sent == 0) {
-			status = -EAGAIN;
-			break;
-		}
+		vm_wait = false;
 	}
-	if (status == -EAGAIN && sk_stream_is_writeable(transport->inet))
-		status = -ENOBUFS;
 
 	switch (status) {
 	case -ENOTSOCK:
@@ -755,11 +774,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s
 	sk->sk_error_report = transport->old_error_report;
 }
 
+static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
+{
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+
+	clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+}
+
 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
 {
 	smp_mb__before_atomic();
 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
 	clear_bit(XPRT_CLOSING, &xprt->state);
+	xs_sock_reset_state_flags(xprt);
 	smp_mb__after_atomic();
 }
 
@@ -962,10 +989,13 @@ static void xs_local_data_receive(struct sock_xprt *transport)
 		goto out;
 	for (;;) {
 		skb = skb_recv_datagram(sk, 0, 1, &err);
-		if (skb == NULL)
+		if (skb != NULL) {
+			xs_local_data_read_skb(&transport->xprt, sk, skb);
+			skb_free_datagram(sk, skb);
+			continue;
+		}
+		if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
 			break;
-		xs_local_data_read_skb(&transport->xprt, sk, skb);
-		skb_free_datagram(sk, skb);
 	}
 out:
 	mutex_unlock(&transport->recv_mutex);
@@ -1043,10 +1073,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport)
 		goto out;
 	for (;;) {
 		skb = skb_recv_datagram(sk, 0, 1, &err);
-		if (skb == NULL)
+		if (skb != NULL) {
+			xs_udp_data_read_skb(&transport->xprt, sk, skb);
+			skb_free_datagram(sk, skb);
+			continue;
+		}
+		if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
 			break;
-		xs_udp_data_read_skb(&transport->xprt, sk, skb);
-		skb_free_datagram(sk, skb);
 	}
 out:
 	mutex_unlock(&transport->recv_mutex);
@@ -1074,7 +1107,14 @@ static void xs_data_ready(struct sock *sk)
 	if (xprt != NULL) {
 		struct sock_xprt *transport = container_of(xprt,
 				struct sock_xprt, xprt);
-		queue_work(rpciod_workqueue, &transport->recv_worker);
+		transport->old_data_ready(sk);
+		/* Any data means we had a useful conversation, so
+		 * then we don't need to delay the next reconnect
+		 */
+		if (xprt->reestablish_timeout)
+			xprt->reestablish_timeout = 0;
+		if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+			queue_work(xprtiod_workqueue, &transport->recv_worker);
 	}
 	read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -1474,10 +1514,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport)
 	for (;;) {
 		lock_sock(sk);
 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
-		release_sock(sk);
-		if (read <= 0)
-			break;
-		total += read;
+		if (read <= 0) {
+			clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+			release_sock(sk);
+			if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
+				break;
+		} else {
+			release_sock(sk);
+			total += read;
+		}
 		rd_desc.count = 65536;
 	}
 out:
@@ -1493,34 +1538,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work)
 }
 
 /**
- * xs_tcp_data_ready - "data ready" callback for TCP sockets
- * @sk: socket with data to read
- *
- */
-static void xs_tcp_data_ready(struct sock *sk)
-{
-	struct sock_xprt *transport;
-	struct rpc_xprt *xprt;
-
-	dprintk("RPC:       xs_tcp_data_ready...\n");
-
-	read_lock_bh(&sk->sk_callback_lock);
-	if (!(xprt = xprt_from_sock(sk)))
-		goto out;
-	transport = container_of(xprt, struct sock_xprt, xprt);
-
-	/* Any data means we had a useful conversation, so
-	 * the we don't need to delay the next reconnect
-	 */
-	if (xprt->reestablish_timeout)
-		xprt->reestablish_timeout = 0;
-	queue_work(rpciod_workqueue, &transport->recv_worker);
-
-out:
-	read_unlock_bh(&sk->sk_callback_lock);
-}
-
-/**
  * xs_tcp_state_change - callback to handle TCP socket state changes
  * @sk: socket whose state has changed
  *
@@ -2241,7 +2258,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 		xs_save_old_callbacks(transport, sk);
 
 		sk->sk_user_data = xprt;
-		sk->sk_data_ready = xs_tcp_data_ready;
+		sk->sk_data_ready = xs_data_ready;
 		sk->sk_state_change = xs_tcp_state_change;
 		sk->sk_write_space = xs_tcp_write_space;
 		sock_set_flag(sk, SOCK_FASYNC);
@@ -2380,7 +2397,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 		/* Start by resetting any existing state */
 		xs_reset_transport(transport);
 
-		queue_delayed_work(rpciod_workqueue,
+		queue_delayed_work(xprtiod_workqueue,
 				   &transport->connect_worker,
 				   xprt->reestablish_timeout);
 		xprt->reestablish_timeout <<= 1;
@@ -2390,7 +2407,7 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
 	} else {
 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
-		queue_delayed_work(rpciod_workqueue,
+		queue_delayed_work(xprtiod_workqueue,
 				   &transport->connect_worker, 0);
 	}
 }