summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--include/linux/cpu.h2
-rw-r--r--kernel/workqueue.c293
2 files changed, 279 insertions, 16 deletions
diff --git a/include/linux/cpu.h b/include/linux/cpu.h
index de6b1722cdca..4823af64e9db 100644
--- a/include/linux/cpu.h
+++ b/include/linux/cpu.h
@@ -71,6 +71,8 @@ enum {
 	/* migration should happen before other stuff but after perf */
 	CPU_PRI_PERF		= 20,
 	CPU_PRI_MIGRATION	= 10,
+	/* prepare workqueues for other notifiers */
+	CPU_PRI_WORKQUEUE	= 5,
 };
 
 #ifdef CONFIG_SMP
diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index d64913aa486a..f57855f718d7 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -36,14 +36,27 @@
 #include <linux/idr.h>
 
 enum {
+	/* global_cwq flags */
+	GCWQ_FREEZING		= 1 << 3,	/* freeze in progress */
+
 	/* worker flags */
 	WORKER_STARTED		= 1 << 0,	/* started */
 	WORKER_DIE		= 1 << 1,	/* die die die */
 	WORKER_IDLE		= 1 << 2,	/* is idle */
+	WORKER_ROGUE		= 1 << 4,	/* not bound to any cpu */
+
+	/* gcwq->trustee_state */
+	TRUSTEE_START		= 0,		/* start */
+	TRUSTEE_IN_CHARGE	= 1,		/* trustee in charge of gcwq */
+	TRUSTEE_BUTCHER		= 2,		/* butcher workers */
+	TRUSTEE_RELEASE		= 3,		/* release workers */
+	TRUSTEE_DONE		= 4,		/* trustee is done */
 
 	BUSY_WORKER_HASH_ORDER	= 6,		/* 64 pointers */
 	BUSY_WORKER_HASH_SIZE	= 1 << BUSY_WORKER_HASH_ORDER,
 	BUSY_WORKER_HASH_MASK	= BUSY_WORKER_HASH_SIZE - 1,
+
+	TRUSTEE_COOLDOWN	= HZ / 10,	/* for trustee draining */
 };
 
 /*
@@ -83,6 +96,7 @@ struct worker {
 struct global_cwq {
 	spinlock_t		lock;		/* the gcwq lock */
 	unsigned int		cpu;		/* I: the associated cpu */
+	unsigned int		flags;		/* L: GCWQ_* flags */
 
 	int			nr_workers;	/* L: total number of workers */
 	int			nr_idle;	/* L: currently idle ones */
@@ -93,6 +107,10 @@ struct global_cwq {
 						/* L: hash of busy workers */
 
 	struct ida		worker_ida;	/* L: for worker IDs */
+
+	struct task_struct	*trustee;	/* L: for gcwq shutdown */
+	unsigned int		trustee_state;	/* L: trustee state */
+	wait_queue_head_t	trustee_wait;	/* trustee wait */
 } ____cacheline_aligned_in_smp;
 
 /*
@@ -148,6 +166,10 @@ struct workqueue_struct {
 #endif
 };
 
+#define for_each_busy_worker(worker, i, pos, gcwq)			\
+	for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)			\
+		hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
+
 #ifdef CONFIG_DEBUG_OBJECTS_WORK
 
 static struct debug_obj_descr work_debug_descr;
@@ -546,6 +568,9 @@ static void worker_enter_idle(struct worker *worker)
 
 	/* idle_list is LIFO */
 	list_add(&worker->entry, &gcwq->idle_list);
+
+	if (unlikely(worker->flags & WORKER_ROGUE))
+		wake_up_all(&gcwq->trustee_wait);
 }
 
 /**
@@ -622,8 +647,15 @@ static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
 	if (IS_ERR(worker->task))
 		goto fail;
 
+	/*
+	 * A rogue worker will become a regular one if CPU comes
+	 * online later on.  Make sure every worker has
+	 * PF_THREAD_BOUND set.
+	 */
 	if (bind)
 		kthread_bind(worker->task, gcwq->cpu);
+	else
+		worker->task->flags |= PF_THREAD_BOUND;
 
 	return worker;
 fail:
@@ -882,10 +914,6 @@ static int worker_thread(void *__worker)
 	struct cpu_workqueue_struct *cwq = worker->cwq;
 
 woke_up:
-	if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
-				    get_cpu_mask(gcwq->cpu))))
-		set_cpus_allowed_ptr(worker->task, get_cpu_mask(gcwq->cpu));
-
 	spin_lock_irq(&gcwq->lock);
 
 	/* DIE can be set only while we're idle, checking here is enough */
@@ -895,7 +923,7 @@ woke_up:
 	}
 
 	worker_leave_idle(worker);
-
+recheck:
 	/*
 	 * ->scheduled list can only be filled while a worker is
 	 * preparing to process a work or actually processing it.
@@ -908,6 +936,22 @@ woke_up:
 			list_first_entry(&cwq->worklist,
 					 struct work_struct, entry);
 
+		/*
+		 * The following is a rather inefficient way to close
+		 * race window against cpu hotplug operations.  Will
+		 * be replaced soon.
+		 */
+		if (unlikely(!(worker->flags & WORKER_ROGUE) &&
+			     !cpumask_equal(&worker->task->cpus_allowed,
+					    get_cpu_mask(gcwq->cpu)))) {
+			spin_unlock_irq(&gcwq->lock);
+			set_cpus_allowed_ptr(worker->task,
+					     get_cpu_mask(gcwq->cpu));
+			cpu_relax();
+			spin_lock_irq(&gcwq->lock);
+			goto recheck;
+		}
+
 		if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
 			/* optimization path, not strictly necessary */
 			process_one_work(worker, work);
@@ -1812,29 +1856,237 @@ void destroy_workqueue(struct workqueue_struct *wq)
 }
 EXPORT_SYMBOL_GPL(destroy_workqueue);
 
+/*
+ * CPU hotplug.
+ *
+ * CPU hotplug is implemented by allowing cwqs to be detached from
+ * CPU, running with unbound workers and allowing them to be
+ * reattached later if the cpu comes back online.  A separate thread
+ * is created to govern cwqs in such state and is called the trustee.
+ *
+ * Trustee states and their descriptions.
+ *
+ * START	Command state used on startup.  On CPU_DOWN_PREPARE, a
+ *		new trustee is started with this state.
+ *
+ * IN_CHARGE	Once started, trustee will enter this state after
+ *		making all existing workers rogue.  DOWN_PREPARE waits
+ *		for trustee to enter this state.  After reaching
+ *		IN_CHARGE, trustee tries to execute the pending
+ *		worklist until it's empty and the state is set to
+ *		BUTCHER, or the state is set to RELEASE.
+ *
+ * BUTCHER	Command state which is set by the cpu callback after
+ *		the cpu has went down.  Once this state is set trustee
+ *		knows that there will be no new works on the worklist
+ *		and once the worklist is empty it can proceed to
+ *		killing idle workers.
+ *
+ * RELEASE	Command state which is set by the cpu callback if the
+ *		cpu down has been canceled or it has come online
+ *		again.  After recognizing this state, trustee stops
+ *		trying to drain or butcher and transits to DONE.
+ *
+ * DONE		Trustee will enter this state after BUTCHER or RELEASE
+ *		is complete.
+ *
+ *          trustee                 CPU                draining
+ *         took over                down               complete
+ * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
+ *                        |                     |                  ^
+ *                        | CPU is back online  v   return workers |
+ *                         ----------------> RELEASE --------------
+ */
+
+/**
+ * trustee_wait_event_timeout - timed event wait for trustee
+ * @cond: condition to wait for
+ * @timeout: timeout in jiffies
+ *
+ * wait_event_timeout() for trustee to use.  Handles locking and
+ * checks for RELEASE request.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.  To be used by trustee.
+ *
+ * RETURNS:
+ * Positive indicating left time if @cond is satisfied, 0 if timed
+ * out, -1 if canceled.
+ */
+#define trustee_wait_event_timeout(cond, timeout) ({			\
+	long __ret = (timeout);						\
+	while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) &&	\
+	       __ret) {							\
+		spin_unlock_irq(&gcwq->lock);				\
+		__wait_event_timeout(gcwq->trustee_wait, (cond) ||	\
+			(gcwq->trustee_state == TRUSTEE_RELEASE),	\
+			__ret);						\
+		spin_lock_irq(&gcwq->lock);				\
+	}								\
+	gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret);		\
+})
+
+/**
+ * trustee_wait_event - event wait for trustee
+ * @cond: condition to wait for
+ *
+ * wait_event() for trustee to use.  Automatically handles locking and
+ * checks for CANCEL request.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.  To be used by trustee.
+ *
+ * RETURNS:
+ * 0 if @cond is satisfied, -1 if canceled.
+ */
+#define trustee_wait_event(cond) ({					\
+	long __ret1;							\
+	__ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
+	__ret1 < 0 ? -1 : 0;						\
+})
+
+static int __cpuinit trustee_thread(void *__gcwq)
+{
+	struct global_cwq *gcwq = __gcwq;
+	struct worker *worker;
+	struct hlist_node *pos;
+	int i;
+
+	BUG_ON(gcwq->cpu != smp_processor_id());
+
+	spin_lock_irq(&gcwq->lock);
+	/*
+	 * Make all multithread workers rogue.  Trustee must be bound
+	 * to the target cpu and can't be cancelled.
+	 */
+	BUG_ON(gcwq->cpu != smp_processor_id());
+
+	list_for_each_entry(worker, &gcwq->idle_list, entry)
+		if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
+			worker->flags |= WORKER_ROGUE;
+
+	for_each_busy_worker(worker, i, pos, gcwq)
+		if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
+			worker->flags |= WORKER_ROGUE;
+
+	/*
+	 * We're now in charge.  Notify and proceed to drain.  We need
+	 * to keep the gcwq running during the whole CPU down
+	 * procedure as other cpu hotunplug callbacks may need to
+	 * flush currently running tasks.
+	 */
+	gcwq->trustee_state = TRUSTEE_IN_CHARGE;
+	wake_up_all(&gcwq->trustee_wait);
+
+	/*
+	 * The original cpu is in the process of dying and may go away
+	 * anytime now.  When that happens, we and all workers would
+	 * be migrated to other cpus.  Try draining any left work.
+	 * Note that if the gcwq is frozen, there may be frozen works
+	 * in freezeable cwqs.  Don't declare completion while frozen.
+	 */
+	while (gcwq->nr_workers != gcwq->nr_idle ||
+	       gcwq->flags & GCWQ_FREEZING ||
+	       gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
+		/* give a breather */
+		if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
+			break;
+	}
+
+	/* notify completion */
+	gcwq->trustee = NULL;
+	gcwq->trustee_state = TRUSTEE_DONE;
+	wake_up_all(&gcwq->trustee_wait);
+	spin_unlock_irq(&gcwq->lock);
+	return 0;
+}
+
+/**
+ * wait_trustee_state - wait for trustee to enter the specified state
+ * @gcwq: gcwq the trustee of interest belongs to
+ * @state: target state to wait for
+ *
+ * Wait for the trustee to reach @state.  DONE is already matched.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.  To be used by cpu_callback.
+ */
+static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
+{
+	if (!(gcwq->trustee_state == state ||
+	      gcwq->trustee_state == TRUSTEE_DONE)) {
+		spin_unlock_irq(&gcwq->lock);
+		__wait_event(gcwq->trustee_wait,
+			     gcwq->trustee_state == state ||
+			     gcwq->trustee_state == TRUSTEE_DONE);
+		spin_lock_irq(&gcwq->lock);
+	}
+}
+
 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 						unsigned long action,
 						void *hcpu)
 {
 	unsigned int cpu = (unsigned long)hcpu;
-	struct cpu_workqueue_struct *cwq;
-	struct workqueue_struct *wq;
+	struct global_cwq *gcwq = get_gcwq(cpu);
+	struct task_struct *new_trustee = NULL;
+	struct worker *worker;
+	struct hlist_node *pos;
+	unsigned long flags;
+	int i;
 
 	action &= ~CPU_TASKS_FROZEN;
 
-	list_for_each_entry(wq, &workqueues, list) {
-		if (wq->flags & WQ_SINGLE_THREAD)
-			continue;
+	switch (action) {
+	case CPU_DOWN_PREPARE:
+		new_trustee = kthread_create(trustee_thread, gcwq,
+					     "workqueue_trustee/%d\n", cpu);
+		if (IS_ERR(new_trustee))
+			return notifier_from_errno(PTR_ERR(new_trustee));
+		kthread_bind(new_trustee, cpu);
+	}
 
-		cwq = get_cwq(cpu, wq);
+	/* some are called w/ irq disabled, don't disturb irq status */
+	spin_lock_irqsave(&gcwq->lock, flags);
 
-		switch (action) {
-		case CPU_POST_DEAD:
-			flush_workqueue(wq);
-			break;
+	switch (action) {
+	case CPU_DOWN_PREPARE:
+		/* initialize trustee and tell it to acquire the gcwq */
+		BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
+		gcwq->trustee = new_trustee;
+		gcwq->trustee_state = TRUSTEE_START;
+		wake_up_process(gcwq->trustee);
+		wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
+		break;
+
+	case CPU_POST_DEAD:
+		gcwq->trustee_state = TRUSTEE_BUTCHER;
+		break;
+
+	case CPU_DOWN_FAILED:
+	case CPU_ONLINE:
+		if (gcwq->trustee_state != TRUSTEE_DONE) {
+			gcwq->trustee_state = TRUSTEE_RELEASE;
+			wake_up_process(gcwq->trustee);
+			wait_trustee_state(gcwq, TRUSTEE_DONE);
 		}
+
+		/* clear ROGUE from all multithread workers */
+		list_for_each_entry(worker, &gcwq->idle_list, entry)
+			if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
+				worker->flags &= ~WORKER_ROGUE;
+
+		for_each_busy_worker(worker, i, pos, gcwq)
+			if (!(worker->cwq->wq->flags & WQ_SINGLE_THREAD))
+				worker->flags &= ~WORKER_ROGUE;
+		break;
 	}
 
+	spin_unlock_irqrestore(&gcwq->lock, flags);
+
 	return notifier_from_errno(0);
 }
 
@@ -1912,6 +2164,9 @@ void freeze_workqueues_begin(void)
 
 		spin_lock_irq(&gcwq->lock);
 
+		BUG_ON(gcwq->flags & GCWQ_FREEZING);
+		gcwq->flags |= GCWQ_FREEZING;
+
 		list_for_each_entry(wq, &workqueues, list) {
 			struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
 
@@ -1995,6 +2250,9 @@ void thaw_workqueues(void)
 
 		spin_lock_irq(&gcwq->lock);
 
+		BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
+		gcwq->flags &= ~GCWQ_FREEZING;
+
 		list_for_each_entry(wq, &workqueues, list) {
 			struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
 
@@ -2026,7 +2284,7 @@ void __init init_workqueues(void)
 	int i;
 
 	singlethread_cpu = cpumask_first(cpu_possible_mask);
-	hotcpu_notifier(workqueue_cpu_callback, 0);
+	hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
 
 	/* initialize gcwqs */
 	for_each_possible_cpu(cpu) {
@@ -2040,6 +2298,9 @@ void __init init_workqueues(void)
 			INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
 
 		ida_init(&gcwq->worker_ida);
+
+		gcwq->trustee_state = TRUSTEE_DONE;
+		init_waitqueue_head(&gcwq->trustee_wait);
 	}
 
 	keventd_wq = create_workqueue("events");