summary refs log tree commit diff
path: root/kernel/padata.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/padata.c')
-rw-r--r--kernel/padata.c307
1 files changed, 130 insertions, 177 deletions
diff --git a/kernel/padata.c b/kernel/padata.c
index 15a8ad63f4ff..c3fec1413295 100644
--- a/kernel/padata.c
+++ b/kernel/padata.c
@@ -46,18 +46,13 @@ static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
 	return target_cpu;
 }
 
-static int padata_cpu_hash(struct parallel_data *pd)
+static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)
 {
-	unsigned int seq_nr;
-	int cpu_index;
-
 	/*
 	 * Hash the sequence numbers to the cpus by taking
 	 * seq_nr mod. number of cpus in use.
 	 */
-
-	seq_nr = atomic_inc_return(&pd->seq_nr);
-	cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
+	int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
 
 	return padata_index_to_cpu(pd, cpu_index);
 }
@@ -94,17 +89,19 @@ static void padata_parallel_worker(struct work_struct *parallel_work)
  *
  * @pinst: padata instance
  * @padata: object to be parallelized
- * @cb_cpu: cpu the serialization callback function will run on,
- *          must be in the serial cpumask of padata(i.e. cpumask.cbcpu).
+ * @cb_cpu: pointer to the CPU that the serialization callback function should
+ *          run on.  If it's not in the serial cpumask of @pinst
+ *          (i.e. cpumask.cbcpu), this function selects a fallback CPU and if
+ *          none found, returns -EINVAL.
  *
  * The parallelization callback function will run with BHs off.
  * Note: Every object which is parallelized by padata_do_parallel
  * must be seen by padata_do_serial.
  */
 int padata_do_parallel(struct padata_instance *pinst,
-		       struct padata_priv *padata, int cb_cpu)
+		       struct padata_priv *padata, int *cb_cpu)
 {
-	int target_cpu, err;
+	int i, cpu, cpu_index, target_cpu, err;
 	struct padata_parallel_queue *queue;
 	struct parallel_data *pd;
 
@@ -116,8 +113,19 @@ int padata_do_parallel(struct padata_instance *pinst,
 	if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
 		goto out;
 
-	if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu))
-		goto out;
+	if (!cpumask_test_cpu(*cb_cpu, pd->cpumask.cbcpu)) {
+		if (!cpumask_weight(pd->cpumask.cbcpu))
+			goto out;
+
+		/* Select an alternate fallback CPU and notify the caller. */
+		cpu_index = *cb_cpu % cpumask_weight(pd->cpumask.cbcpu);
+
+		cpu = cpumask_first(pd->cpumask.cbcpu);
+		for (i = 0; i < cpu_index; i++)
+			cpu = cpumask_next(cpu, pd->cpumask.cbcpu);
+
+		*cb_cpu = cpu;
+	}
 
 	err =  -EBUSY;
 	if ((pinst->flags & PADATA_RESET))
@@ -129,9 +137,10 @@ int padata_do_parallel(struct padata_instance *pinst,
 	err = 0;
 	atomic_inc(&pd->refcnt);
 	padata->pd = pd;
-	padata->cb_cpu = cb_cpu;
+	padata->cb_cpu = *cb_cpu;
 
-	target_cpu = padata_cpu_hash(pd);
+	padata->seq_nr = atomic_inc_return(&pd->seq_nr);
+	target_cpu = padata_cpu_hash(pd, padata->seq_nr);
 	padata->cpu = target_cpu;
 	queue = per_cpu_ptr(pd->pqueue, target_cpu);
 
@@ -139,7 +148,7 @@ int padata_do_parallel(struct padata_instance *pinst,
 	list_add_tail(&padata->list, &queue->parallel.list);
 	spin_unlock(&queue->parallel.lock);
 
-	queue_work_on(target_cpu, pinst->wq, &queue->work);
+	queue_work(pinst->parallel_wq, &queue->work);
 
 out:
 	rcu_read_unlock_bh();
@@ -149,63 +158,53 @@ out:
 EXPORT_SYMBOL(padata_do_parallel);
 
 /*
- * padata_get_next - Get the next object that needs serialization.
+ * padata_find_next - Find the next object that needs serialization.
  *
  * Return values are:
  *
  * A pointer to the control struct of the next object that needs
  * serialization, if present in one of the percpu reorder queues.
  *
- * -EINPROGRESS, if the next object that needs serialization will
+ * NULL, if the next object that needs serialization will
  *  be parallel processed by another cpu and is not yet present in
  *  the cpu's reorder queue.
- *
- * -ENODATA, if this cpu has to do the parallel processing for
- *  the next object.
  */
-static struct padata_priv *padata_get_next(struct parallel_data *pd)
+static struct padata_priv *padata_find_next(struct parallel_data *pd,
+					    bool remove_object)
 {
-	int cpu, num_cpus;
-	unsigned int next_nr, next_index;
 	struct padata_parallel_queue *next_queue;
 	struct padata_priv *padata;
 	struct padata_list *reorder;
+	int cpu = pd->cpu;
 
-	num_cpus = cpumask_weight(pd->cpumask.pcpu);
-
-	/*
-	 * Calculate the percpu reorder queue and the sequence
-	 * number of the next object.
-	 */
-	next_nr = pd->processed;
-	next_index = next_nr % num_cpus;
-	cpu = padata_index_to_cpu(pd, next_index);
 	next_queue = per_cpu_ptr(pd->pqueue, cpu);
-
 	reorder = &next_queue->reorder;
 
 	spin_lock(&reorder->lock);
-	if (!list_empty(&reorder->list)) {
-		padata = list_entry(reorder->list.next,
-				    struct padata_priv, list);
-
-		list_del_init(&padata->list);
-		atomic_dec(&pd->reorder_objects);
+	if (list_empty(&reorder->list)) {
+		spin_unlock(&reorder->lock);
+		return NULL;
+	}
 
-		pd->processed++;
+	padata = list_entry(reorder->list.next, struct padata_priv, list);
 
+	/*
+	 * Checks the rare case where two or more parallel jobs have hashed to
+	 * the same CPU and one of the later ones finishes first.
+	 */
+	if (padata->seq_nr != pd->processed) {
 		spin_unlock(&reorder->lock);
-		goto out;
+		return NULL;
 	}
-	spin_unlock(&reorder->lock);
 
-	if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) {
-		padata = ERR_PTR(-ENODATA);
-		goto out;
+	if (remove_object) {
+		list_del_init(&padata->list);
+		atomic_dec(&pd->reorder_objects);
+		++pd->processed;
+		pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1, false);
 	}
 
-	padata = ERR_PTR(-EINPROGRESS);
-out:
+	spin_unlock(&reorder->lock);
 	return padata;
 }
 
@@ -215,6 +214,7 @@ static void padata_reorder(struct parallel_data *pd)
 	struct padata_priv *padata;
 	struct padata_serial_queue *squeue;
 	struct padata_instance *pinst = pd->pinst;
+	struct padata_parallel_queue *next_queue;
 
 	/*
 	 * We need to ensure that only one cpu can work on dequeueing of
@@ -230,27 +230,16 @@ static void padata_reorder(struct parallel_data *pd)
 		return;
 
 	while (1) {
-		padata = padata_get_next(pd);
+		padata = padata_find_next(pd, true);
 
 		/*
 		 * If the next object that needs serialization is parallel
 		 * processed by another cpu and is still on it's way to the
 		 * cpu's reorder queue, nothing to do for now.
 		 */
-		if (PTR_ERR(padata) == -EINPROGRESS)
+		if (!padata)
 			break;
 
-		/*
-		 * This cpu has to do the parallel processing of the next
-		 * object. It's waiting in the cpu's parallelization queue,
-		 * so exit immediately.
-		 */
-		if (PTR_ERR(padata) == -ENODATA) {
-			del_timer(&pd->timer);
-			spin_unlock_bh(&pd->lock);
-			return;
-		}
-
 		cb_cpu = padata->cb_cpu;
 		squeue = per_cpu_ptr(pd->squeue, cb_cpu);
 
@@ -258,77 +247,37 @@ static void padata_reorder(struct parallel_data *pd)
 		list_add_tail(&padata->list, &squeue->serial.list);
 		spin_unlock(&squeue->serial.lock);
 
-		queue_work_on(cb_cpu, pinst->wq, &squeue->work);
+		queue_work_on(cb_cpu, pinst->serial_wq, &squeue->work);
 	}
 
 	spin_unlock_bh(&pd->lock);
 
 	/*
 	 * The next object that needs serialization might have arrived to
-	 * the reorder queues in the meantime, we will be called again
-	 * from the timer function if no one else cares for it.
+	 * the reorder queues in the meantime.
 	 *
-	 * Ensure reorder_objects is read after pd->lock is dropped so we see
-	 * an increment from another task in padata_do_serial.  Pairs with
+	 * Ensure reorder queue is read after pd->lock is dropped so we see
+	 * new objects from another task in padata_do_serial.  Pairs with
 	 * smp_mb__after_atomic in padata_do_serial.
 	 */
 	smp_mb();
-	if (atomic_read(&pd->reorder_objects)
-			&& !(pinst->flags & PADATA_RESET))
-		mod_timer(&pd->timer, jiffies + HZ);
-	else
-		del_timer(&pd->timer);
 
-	return;
+	next_queue = per_cpu_ptr(pd->pqueue, pd->cpu);
+	if (!list_empty(&next_queue->reorder.list) &&
+	    padata_find_next(pd, false))
+		queue_work(pinst->serial_wq, &pd->reorder_work);
 }
 
 static void invoke_padata_reorder(struct work_struct *work)
 {
-	struct padata_parallel_queue *pqueue;
 	struct parallel_data *pd;
 
 	local_bh_disable();
-	pqueue = container_of(work, struct padata_parallel_queue, reorder_work);
-	pd = pqueue->pd;
+	pd = container_of(work, struct parallel_data, reorder_work);
 	padata_reorder(pd);
 	local_bh_enable();
 }
 
-static void padata_reorder_timer(struct timer_list *t)
-{
-	struct parallel_data *pd = from_timer(pd, t, timer);
-	unsigned int weight;
-	int target_cpu, cpu;
-
-	cpu = get_cpu();
-
-	/* We don't lock pd here to not interfere with parallel processing
-	 * padata_reorder() calls on other CPUs. We just need any CPU out of
-	 * the cpumask.pcpu set. It would be nice if it's the right one but
-	 * it doesn't matter if we're off to the next one by using an outdated
-	 * pd->processed value.
-	 */
-	weight = cpumask_weight(pd->cpumask.pcpu);
-	target_cpu = padata_index_to_cpu(pd, pd->processed % weight);
-
-	/* ensure to call the reorder callback on the correct CPU */
-	if (cpu != target_cpu) {
-		struct padata_parallel_queue *pqueue;
-		struct padata_instance *pinst;
-
-		/* The timer function is serialized wrt itself -- no locking
-		 * needed.
-		 */
-		pinst = pd->pinst;
-		pqueue = per_cpu_ptr(pd->pqueue, target_cpu);
-		queue_work_on(target_cpu, pinst->wq, &pqueue->reorder_work);
-	} else {
-		padata_reorder(pd);
-	}
-
-	put_cpu();
-}
-
 static void padata_serial_worker(struct work_struct *serial_work)
 {
 	struct padata_serial_queue *squeue;
@@ -367,47 +316,28 @@ static void padata_serial_worker(struct work_struct *serial_work)
  */
 void padata_do_serial(struct padata_priv *padata)
 {
-	int cpu;
-	struct padata_parallel_queue *pqueue;
-	struct parallel_data *pd;
-	int reorder_via_wq = 0;
-
-	pd = padata->pd;
-
-	cpu = get_cpu();
-
-	/* We need to run on the same CPU padata_do_parallel(.., padata, ..)
-	 * was called on -- or, at least, enqueue the padata object into the
-	 * correct per-cpu queue.
-	 */
-	if (cpu != padata->cpu) {
-		reorder_via_wq = 1;
-		cpu = padata->cpu;
-	}
-
-	pqueue = per_cpu_ptr(pd->pqueue, cpu);
+	struct parallel_data *pd = padata->pd;
+	struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue,
+							   padata->cpu);
+	struct padata_priv *cur;
 
 	spin_lock(&pqueue->reorder.lock);
+	/* Sort in ascending order of sequence number. */
+	list_for_each_entry_reverse(cur, &pqueue->reorder.list, list)
+		if (cur->seq_nr < padata->seq_nr)
+			break;
+	list_add(&padata->list, &cur->list);
 	atomic_inc(&pd->reorder_objects);
-	list_add_tail(&padata->list, &pqueue->reorder.list);
 	spin_unlock(&pqueue->reorder.lock);
 
 	/*
-	 * Ensure the atomic_inc of reorder_objects above is ordered correctly
+	 * Ensure the addition to the reorder list is ordered correctly
 	 * with the trylock of pd->lock in padata_reorder.  Pairs with smp_mb
 	 * in padata_reorder.
 	 */
 	smp_mb__after_atomic();
 
-	put_cpu();
-
-	/* If we're running on the wrong CPU, call padata_reorder() via a
-	 * kernel worker.
-	 */
-	if (reorder_via_wq)
-		queue_work_on(cpu, pd->pinst->wq, &pqueue->reorder_work);
-	else
-		padata_reorder(pd);
+	padata_reorder(pd);
 }
 EXPORT_SYMBOL(padata_do_serial);
 
@@ -415,17 +345,36 @@ static int padata_setup_cpumasks(struct parallel_data *pd,
 				 const struct cpumask *pcpumask,
 				 const struct cpumask *cbcpumask)
 {
-	if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
-		return -ENOMEM;
+	struct workqueue_attrs *attrs;
+	int err = -ENOMEM;
 
+	if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
+		goto out;
 	cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask);
-	if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
-		free_cpumask_var(pd->cpumask.pcpu);
-		return -ENOMEM;
-	}
 
+	if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL))
+		goto free_pcpu_mask;
 	cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask);
+
+	attrs = alloc_workqueue_attrs();
+	if (!attrs)
+		goto free_cbcpu_mask;
+
+	/* Restrict parallel_wq workers to pd->cpumask.pcpu. */
+	cpumask_copy(attrs->cpumask, pd->cpumask.pcpu);
+	err = apply_workqueue_attrs(pd->pinst->parallel_wq, attrs);
+	free_workqueue_attrs(attrs);
+	if (err < 0)
+		goto free_cbcpu_mask;
+
 	return 0;
+
+free_cbcpu_mask:
+	free_cpumask_var(pd->cpumask.cbcpu);
+free_pcpu_mask:
+	free_cpumask_var(pd->cpumask.pcpu);
+out:
+	return err;
 }
 
 static void __padata_list_init(struct padata_list *pd_list)
@@ -451,26 +400,15 @@ static void padata_init_squeues(struct parallel_data *pd)
 /* Initialize all percpu queues used by parallel workers */
 static void padata_init_pqueues(struct parallel_data *pd)
 {
-	int cpu_index, cpu;
+	int cpu;
 	struct padata_parallel_queue *pqueue;
 
-	cpu_index = 0;
-	for_each_possible_cpu(cpu) {
+	for_each_cpu(cpu, pd->cpumask.pcpu) {
 		pqueue = per_cpu_ptr(pd->pqueue, cpu);
 
-		if (!cpumask_test_cpu(cpu, pd->cpumask.pcpu)) {
-			pqueue->cpu_index = -1;
-			continue;
-		}
-
-		pqueue->pd = pd;
-		pqueue->cpu_index = cpu_index;
-		cpu_index++;
-
 		__padata_list_init(&pqueue->reorder);
 		__padata_list_init(&pqueue->parallel);
 		INIT_WORK(&pqueue->work, padata_parallel_worker);
-		INIT_WORK(&pqueue->reorder_work, invoke_padata_reorder);
 		atomic_set(&pqueue->num_obj, 0);
 	}
 }
@@ -493,17 +431,19 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
 	pd->squeue = alloc_percpu(struct padata_serial_queue);
 	if (!pd->squeue)
 		goto err_free_pqueue;
+
+	pd->pinst = pinst;
 	if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
 		goto err_free_squeue;
 
 	padata_init_pqueues(pd);
 	padata_init_squeues(pd);
-	timer_setup(&pd->timer, padata_reorder_timer, 0);
 	atomic_set(&pd->seq_nr, -1);
 	atomic_set(&pd->reorder_objects, 0);
 	atomic_set(&pd->refcnt, 0);
-	pd->pinst = pinst;
 	spin_lock_init(&pd->lock);
+	pd->cpu = cpumask_first(pd->cpumask.pcpu);
+	INIT_WORK(&pd->reorder_work, invoke_padata_reorder);
 
 	return pd;
 
@@ -538,8 +478,6 @@ static void padata_flush_queues(struct parallel_data *pd)
 		flush_work(&pqueue->work);
 	}
 
-	del_timer_sync(&pd->timer);
-
 	if (atomic_read(&pd->reorder_objects))
 		padata_reorder(pd);
 
@@ -883,6 +821,8 @@ static void __padata_free(struct padata_instance *pinst)
 	padata_free_pd(pinst->pd);
 	free_cpumask_var(pinst->cpumask.pcpu);
 	free_cpumask_var(pinst->cpumask.cbcpu);
+	destroy_workqueue(pinst->serial_wq);
+	destroy_workqueue(pinst->parallel_wq);
 	kfree(pinst);
 }
 
@@ -1016,13 +956,11 @@ static struct kobj_type padata_attr_type = {
  * padata_alloc - allocate and initialize a padata instance and specify
  *                cpumasks for serial and parallel workers.
  *
- * @wq: workqueue to use for the allocated padata instance
+ * @name: used to identify the instance
  * @pcpumask: cpumask that will be used for padata parallelization
  * @cbcpumask: cpumask that will be used for padata serialization
- *
- * Must be called from a cpus_read_lock() protected region
  */
-static struct padata_instance *padata_alloc(struct workqueue_struct *wq,
+static struct padata_instance *padata_alloc(const char *name,
 					    const struct cpumask *pcpumask,
 					    const struct cpumask *cbcpumask)
 {
@@ -1033,11 +971,23 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq,
 	if (!pinst)
 		goto err;
 
-	if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
+	pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0,
+					     name);
+	if (!pinst->parallel_wq)
 		goto err_free_inst;
+
+	get_online_cpus();
+
+	pinst->serial_wq = alloc_workqueue("%s_serial", WQ_MEM_RECLAIM |
+					   WQ_CPU_INTENSIVE, 1, name);
+	if (!pinst->serial_wq)
+		goto err_put_cpus;
+
+	if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
+		goto err_free_serial_wq;
 	if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
 		free_cpumask_var(pinst->cpumask.pcpu);
-		goto err_free_inst;
+		goto err_free_serial_wq;
 	}
 	if (!padata_validate_cpumask(pinst, pcpumask) ||
 	    !padata_validate_cpumask(pinst, cbcpumask))
@@ -1049,8 +999,6 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq,
 
 	rcu_assign_pointer(pinst->pd, pd);
 
-	pinst->wq = wq;
-
 	cpumask_copy(pinst->cpumask.pcpu, pcpumask);
 	cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
 
@@ -1063,11 +1011,19 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq,
 #ifdef CONFIG_HOTPLUG_CPU
 	cpuhp_state_add_instance_nocalls_cpuslocked(hp_online, &pinst->node);
 #endif
+
+	put_online_cpus();
+
 	return pinst;
 
 err_free_masks:
 	free_cpumask_var(pinst->cpumask.pcpu);
 	free_cpumask_var(pinst->cpumask.cbcpu);
+err_free_serial_wq:
+	destroy_workqueue(pinst->serial_wq);
+err_put_cpus:
+	put_online_cpus();
+	destroy_workqueue(pinst->parallel_wq);
 err_free_inst:
 	kfree(pinst);
 err:
@@ -1079,14 +1035,11 @@ err:
  *                         Use the cpu_possible_mask for serial and
  *                         parallel workers.
  *
- * @wq: workqueue to use for the allocated padata instance
- *
- * Must be called from a cpus_read_lock() protected region
+ * @name: used to identify the instance
  */
-struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq)
+struct padata_instance *padata_alloc_possible(const char *name)
 {
-	lockdep_assert_cpus_held();
-	return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
+	return padata_alloc(name, cpu_possible_mask, cpu_possible_mask);
 }
 EXPORT_SYMBOL(padata_alloc_possible);