summary refs log tree commit diff
path: root/kernel
diff options
context:
space:
mode:
Diffstat (limited to 'kernel')
-rw-r--r--kernel/Makefile1
-rw-r--r--kernel/context_tracking.c83
-rw-r--r--kernel/ksysfs.c18
-rw-r--r--kernel/rcu.h2
-rw-r--r--kernel/rcupdate.c3
-rw-r--r--kernel/rcutiny.c2
-rw-r--r--kernel/rcutiny_plugin.h5
-rw-r--r--kernel/rcutorture.c54
-rw-r--r--kernel/rcutree.c347
-rw-r--r--kernel/rcutree.h67
-rw-r--r--kernel/rcutree_plugin.h415
-rw-r--r--kernel/rcutree_trace.c330
-rw-r--r--kernel/sched/core.c23
-rw-r--r--kernel/srcu.c16
14 files changed, 1014 insertions, 352 deletions
diff --git a/kernel/Makefile b/kernel/Makefile
index 86e3285ae7e5..ac0d533eb7de 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -110,6 +110,7 @@ obj-$(CONFIG_USER_RETURN_NOTIFIER) += user-return-notifier.o
 obj-$(CONFIG_PADATA) += padata.o
 obj-$(CONFIG_CRASH_DUMP) += crash_dump.o
 obj-$(CONFIG_JUMP_LABEL) += jump_label.o
+obj-$(CONFIG_CONTEXT_TRACKING) += context_tracking.o
 
 $(obj)/configs.o: $(obj)/config_data.h
 
diff --git a/kernel/context_tracking.c b/kernel/context_tracking.c
new file mode 100644
index 000000000000..e0e07fd55508
--- /dev/null
+++ b/kernel/context_tracking.c
@@ -0,0 +1,83 @@
+#include <linux/context_tracking.h>
+#include <linux/rcupdate.h>
+#include <linux/sched.h>
+#include <linux/percpu.h>
+#include <linux/hardirq.h>
+
+struct context_tracking {
+	/*
+	 * When active is false, hooks are not set to
+	 * minimize overhead: TIF flags are cleared
+	 * and calls to user_enter/exit are ignored. This
+	 * may be further optimized using static keys.
+	 */
+	bool active;
+	enum {
+		IN_KERNEL = 0,
+		IN_USER,
+	} state;
+};
+
+static DEFINE_PER_CPU(struct context_tracking, context_tracking) = {
+#ifdef CONFIG_CONTEXT_TRACKING_FORCE
+	.active = true,
+#endif
+};
+
+void user_enter(void)
+{
+	unsigned long flags;
+
+	/*
+	 * Some contexts may involve an exception occuring in an irq,
+	 * leading to that nesting:
+	 * rcu_irq_enter() rcu_user_exit() rcu_user_exit() rcu_irq_exit()
+	 * This would mess up the dyntick_nesting count though. And rcu_irq_*()
+	 * helpers are enough to protect RCU uses inside the exception. So
+	 * just return immediately if we detect we are in an IRQ.
+	 */
+	if (in_interrupt())
+		return;
+
+	WARN_ON_ONCE(!current->mm);
+
+	local_irq_save(flags);
+	if (__this_cpu_read(context_tracking.active) &&
+	    __this_cpu_read(context_tracking.state) != IN_USER) {
+		__this_cpu_write(context_tracking.state, IN_USER);
+		rcu_user_enter();
+	}
+	local_irq_restore(flags);
+}
+
+void user_exit(void)
+{
+	unsigned long flags;
+
+	/*
+	 * Some contexts may involve an exception occuring in an irq,
+	 * leading to that nesting:
+	 * rcu_irq_enter() rcu_user_exit() rcu_user_exit() rcu_irq_exit()
+	 * This would mess up the dyntick_nesting count though. And rcu_irq_*()
+	 * helpers are enough to protect RCU uses inside the exception. So
+	 * just return immediately if we detect we are in an IRQ.
+	 */
+	if (in_interrupt())
+		return;
+
+	local_irq_save(flags);
+	if (__this_cpu_read(context_tracking.state) == IN_USER) {
+		__this_cpu_write(context_tracking.state, IN_KERNEL);
+		rcu_user_exit();
+	}
+	local_irq_restore(flags);
+}
+
+void context_tracking_task_switch(struct task_struct *prev,
+			     struct task_struct *next)
+{
+	if (__this_cpu_read(context_tracking.active)) {
+		clear_tsk_thread_flag(prev, TIF_NOHZ);
+		set_tsk_thread_flag(next, TIF_NOHZ);
+	}
+}
diff --git a/kernel/ksysfs.c b/kernel/ksysfs.c
index c01eac66c0cc..6ada93c23a9a 100644
--- a/kernel/ksysfs.c
+++ b/kernel/ksysfs.c
@@ -140,6 +140,23 @@ static ssize_t fscaps_show(struct kobject *kobj,
 }
 KERNEL_ATTR_RO(fscaps);
 
+int rcu_expedited;
+static ssize_t rcu_expedited_show(struct kobject *kobj,
+				  struct kobj_attribute *attr, char *buf)
+{
+	return sprintf(buf, "%d\n", rcu_expedited);
+}
+static ssize_t rcu_expedited_store(struct kobject *kobj,
+				   struct kobj_attribute *attr,
+				   const char *buf, size_t count)
+{
+	if (kstrtoint(buf, 0, &rcu_expedited))
+		return -EINVAL;
+
+	return count;
+}
+KERNEL_ATTR_RW(rcu_expedited);
+
 /*
  * Make /sys/kernel/notes give the raw contents of our kernel .notes section.
  */
@@ -179,6 +196,7 @@ static struct attribute * kernel_attrs[] = {
 	&kexec_crash_size_attr.attr,
 	&vmcoreinfo_attr.attr,
 #endif
+	&rcu_expedited_attr.attr,
 	NULL
 };
 
diff --git a/kernel/rcu.h b/kernel/rcu.h
index 8ba99cdc6515..20dfba576c2b 100644
--- a/kernel/rcu.h
+++ b/kernel/rcu.h
@@ -109,4 +109,6 @@ static inline bool __rcu_reclaim(char *rn, struct rcu_head *head)
 	}
 }
 
+extern int rcu_expedited;
+
 #endif /* __LINUX_RCU_H */
diff --git a/kernel/rcupdate.c b/kernel/rcupdate.c
index 29ca1c6da594..a2cf76177b44 100644
--- a/kernel/rcupdate.c
+++ b/kernel/rcupdate.c
@@ -46,12 +46,15 @@
 #include <linux/export.h>
 #include <linux/hardirq.h>
 #include <linux/delay.h>
+#include <linux/module.h>
 
 #define CREATE_TRACE_POINTS
 #include <trace/events/rcu.h>
 
 #include "rcu.h"
 
+module_param(rcu_expedited, int, 0);
+
 #ifdef CONFIG_PREEMPT_RCU
 
 /*
diff --git a/kernel/rcutiny.c b/kernel/rcutiny.c
index e4c6a598d6f7..e7dce58f9c2a 100644
--- a/kernel/rcutiny.c
+++ b/kernel/rcutiny.c
@@ -195,7 +195,7 @@ EXPORT_SYMBOL(rcu_is_cpu_idle);
  */
 int rcu_is_cpu_rrupt_from_idle(void)
 {
-	return rcu_dynticks_nesting <= 0;
+	return rcu_dynticks_nesting <= 1;
 }
 
 /*
diff --git a/kernel/rcutiny_plugin.h b/kernel/rcutiny_plugin.h
index 3d0190282204..f85016a2309b 100644
--- a/kernel/rcutiny_plugin.h
+++ b/kernel/rcutiny_plugin.h
@@ -706,7 +706,10 @@ void synchronize_rcu(void)
 		return;
 
 	/* Once we get past the fastpath checks, same code as rcu_barrier(). */
-	rcu_barrier();
+	if (rcu_expedited)
+		synchronize_rcu_expedited();
+	else
+		rcu_barrier();
 }
 EXPORT_SYMBOL_GPL(synchronize_rcu);
 
diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c
index aaa7b9f3532a..31dea01c85fd 100644
--- a/kernel/rcutorture.c
+++ b/kernel/rcutorture.c
@@ -339,7 +339,6 @@ rcu_stutter_wait(char *title)
 
 struct rcu_torture_ops {
 	void (*init)(void);
-	void (*cleanup)(void);
 	int (*readlock)(void);
 	void (*read_delay)(struct rcu_random_state *rrsp);
 	void (*readunlock)(int idx);
@@ -431,7 +430,6 @@ static void rcu_torture_deferred_free(struct rcu_torture *p)
 
 static struct rcu_torture_ops rcu_ops = {
 	.init		= NULL,
-	.cleanup	= NULL,
 	.readlock	= rcu_torture_read_lock,
 	.read_delay	= rcu_read_delay,
 	.readunlock	= rcu_torture_read_unlock,
@@ -475,7 +473,6 @@ static void rcu_sync_torture_init(void)
 
 static struct rcu_torture_ops rcu_sync_ops = {
 	.init		= rcu_sync_torture_init,
-	.cleanup	= NULL,
 	.readlock	= rcu_torture_read_lock,
 	.read_delay	= rcu_read_delay,
 	.readunlock	= rcu_torture_read_unlock,
@@ -493,7 +490,6 @@ static struct rcu_torture_ops rcu_sync_ops = {
 
 static struct rcu_torture_ops rcu_expedited_ops = {
 	.init		= rcu_sync_torture_init,
-	.cleanup	= NULL,
 	.readlock	= rcu_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= rcu_torture_read_unlock,
@@ -536,7 +532,6 @@ static void rcu_bh_torture_deferred_free(struct rcu_torture *p)
 
 static struct rcu_torture_ops rcu_bh_ops = {
 	.init		= NULL,
-	.cleanup	= NULL,
 	.readlock	= rcu_bh_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= rcu_bh_torture_read_unlock,
@@ -553,7 +548,6 @@ static struct rcu_torture_ops rcu_bh_ops = {
 
 static struct rcu_torture_ops rcu_bh_sync_ops = {
 	.init		= rcu_sync_torture_init,
-	.cleanup	= NULL,
 	.readlock	= rcu_bh_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= rcu_bh_torture_read_unlock,
@@ -570,7 +564,6 @@ static struct rcu_torture_ops rcu_bh_sync_ops = {
 
 static struct rcu_torture_ops rcu_bh_expedited_ops = {
 	.init		= rcu_sync_torture_init,
-	.cleanup	= NULL,
 	.readlock	= rcu_bh_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= rcu_bh_torture_read_unlock,
@@ -589,19 +582,7 @@ static struct rcu_torture_ops rcu_bh_expedited_ops = {
  * Definitions for srcu torture testing.
  */
 
-static struct srcu_struct srcu_ctl;
-
-static void srcu_torture_init(void)
-{
-	init_srcu_struct(&srcu_ctl);
-	rcu_sync_torture_init();
-}
-
-static void srcu_torture_cleanup(void)
-{
-	synchronize_srcu(&srcu_ctl);
-	cleanup_srcu_struct(&srcu_ctl);
-}
+DEFINE_STATIC_SRCU(srcu_ctl);
 
 static int srcu_torture_read_lock(void) __acquires(&srcu_ctl)
 {
@@ -672,8 +653,7 @@ static int srcu_torture_stats(char *page)
 }
 
 static struct rcu_torture_ops srcu_ops = {
-	.init		= srcu_torture_init,
-	.cleanup	= srcu_torture_cleanup,
+	.init		= rcu_sync_torture_init,
 	.readlock	= srcu_torture_read_lock,
 	.read_delay	= srcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock,
@@ -687,8 +667,7 @@ static struct rcu_torture_ops srcu_ops = {
 };
 
 static struct rcu_torture_ops srcu_sync_ops = {
-	.init		= srcu_torture_init,
-	.cleanup	= srcu_torture_cleanup,
+	.init		= rcu_sync_torture_init,
 	.readlock	= srcu_torture_read_lock,
 	.read_delay	= srcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock,
@@ -712,8 +691,7 @@ static void srcu_torture_read_unlock_raw(int idx) __releases(&srcu_ctl)
 }
 
 static struct rcu_torture_ops srcu_raw_ops = {
-	.init		= srcu_torture_init,
-	.cleanup	= srcu_torture_cleanup,
+	.init		= rcu_sync_torture_init,
 	.readlock	= srcu_torture_read_lock_raw,
 	.read_delay	= srcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock_raw,
@@ -727,8 +705,7 @@ static struct rcu_torture_ops srcu_raw_ops = {
 };
 
 static struct rcu_torture_ops srcu_raw_sync_ops = {
-	.init		= srcu_torture_init,
-	.cleanup	= srcu_torture_cleanup,
+	.init		= rcu_sync_torture_init,
 	.readlock	= srcu_torture_read_lock_raw,
 	.read_delay	= srcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock_raw,
@@ -747,8 +724,7 @@ static void srcu_torture_synchronize_expedited(void)
 }
 
 static struct rcu_torture_ops srcu_expedited_ops = {
-	.init		= srcu_torture_init,
-	.cleanup	= srcu_torture_cleanup,
+	.init		= rcu_sync_torture_init,
 	.readlock	= srcu_torture_read_lock,
 	.read_delay	= srcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock,
@@ -783,7 +759,6 @@ static void rcu_sched_torture_deferred_free(struct rcu_torture *p)
 
 static struct rcu_torture_ops sched_ops = {
 	.init		= rcu_sync_torture_init,
-	.cleanup	= NULL,
 	.readlock	= sched_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= sched_torture_read_unlock,
@@ -799,7 +774,6 @@ static struct rcu_torture_ops sched_ops = {
 
 static struct rcu_torture_ops sched_sync_ops = {
 	.init		= rcu_sync_torture_init,
-	.cleanup	= NULL,
 	.readlock	= sched_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= sched_torture_read_unlock,
@@ -814,7 +788,6 @@ static struct rcu_torture_ops sched_sync_ops = {
 
 static struct rcu_torture_ops sched_expedited_ops = {
 	.init		= rcu_sync_torture_init,
-	.cleanup	= NULL,
 	.readlock	= sched_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= sched_torture_read_unlock,
@@ -1396,12 +1369,16 @@ rcu_torture_print_module_parms(struct rcu_torture_ops *cur_ops, char *tag)
 		 "fqs_duration=%d fqs_holdoff=%d fqs_stutter=%d "
 		 "test_boost=%d/%d test_boost_interval=%d "
 		 "test_boost_duration=%d shutdown_secs=%d "
+		 "stall_cpu=%d stall_cpu_holdoff=%d "
+		 "n_barrier_cbs=%d "
 		 "onoff_interval=%d onoff_holdoff=%d\n",
 		 torture_type, tag, nrealreaders, nfakewriters,
 		 stat_interval, verbose, test_no_idle_hz, shuffle_interval,
 		 stutter, irqreader, fqs_duration, fqs_holdoff, fqs_stutter,
 		 test_boost, cur_ops->can_boost,
 		 test_boost_interval, test_boost_duration, shutdown_secs,
+		 stall_cpu, stall_cpu_holdoff,
+		 n_barrier_cbs,
 		 onoff_interval, onoff_holdoff);
 }
 
@@ -1502,6 +1479,7 @@ rcu_torture_onoff(void *arg)
 	unsigned long delta;
 	int maxcpu = -1;
 	DEFINE_RCU_RANDOM(rand);
+	int ret;
 	unsigned long starttime;
 
 	VERBOSE_PRINTK_STRING("rcu_torture_onoff task started");
@@ -1522,7 +1500,13 @@ rcu_torture_onoff(void *arg)
 					 torture_type, cpu);
 			starttime = jiffies;
 			n_offline_attempts++;
-			if (cpu_down(cpu) == 0) {
+			ret = cpu_down(cpu);
+			if (ret) {
+				if (verbose)
+					pr_alert("%s" TORTURE_FLAG
+						 "rcu_torture_onoff task: offline %d failed: errno %d\n",
+						 torture_type, cpu, ret);
+			} else {
 				if (verbose)
 					pr_alert("%s" TORTURE_FLAG
 						 "rcu_torture_onoff task: offlined %d\n",
@@ -1936,8 +1920,6 @@ rcu_torture_cleanup(void)
 
 	rcu_torture_stats_print();  /* -After- the stats thread is stopped! */
 
-	if (cur_ops->cleanup)
-		cur_ops->cleanup();
 	if (atomic_read(&n_rcu_torture_error) || n_rcu_torture_barrier_error)
 		rcu_torture_print_module_parms(cur_ops, "End of test: FAILURE");
 	else if (n_online_successes != n_online_attempts ||
diff --git a/kernel/rcutree.c b/kernel/rcutree.c
index 74df86bd9204..e441b77b614e 100644
--- a/kernel/rcutree.c
+++ b/kernel/rcutree.c
@@ -68,9 +68,9 @@ static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS];
 	.level = { &sname##_state.node[0] }, \
 	.call = cr, \
 	.fqs_state = RCU_GP_IDLE, \
-	.gpnum = -300, \
-	.completed = -300, \
-	.onofflock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.onofflock), \
+	.gpnum = 0UL - 300UL, \
+	.completed = 0UL - 300UL, \
+	.orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
 	.orphan_nxttail = &sname##_state.orphan_nxtlist, \
 	.orphan_donetail = &sname##_state.orphan_donelist, \
 	.barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
@@ -207,18 +207,15 @@ EXPORT_SYMBOL_GPL(rcu_note_context_switch);
 DEFINE_PER_CPU(struct rcu_dynticks, rcu_dynticks) = {
 	.dynticks_nesting = DYNTICK_TASK_EXIT_IDLE,
 	.dynticks = ATOMIC_INIT(1),
-#if defined(CONFIG_RCU_USER_QS) && !defined(CONFIG_RCU_USER_QS_FORCE)
-	.ignore_user_qs = true,
-#endif
 };
 
-static int blimit = 10;		/* Maximum callbacks per rcu_do_batch. */
-static int qhimark = 10000;	/* If this many pending, ignore blimit. */
-static int qlowmark = 100;	/* Once only this many pending, use blimit. */
+static long blimit = 10;	/* Maximum callbacks per rcu_do_batch. */
+static long qhimark = 10000;	/* If this many pending, ignore blimit. */
+static long qlowmark = 100;	/* Once only this many pending, use blimit. */
 
-module_param(blimit, int, 0444);
-module_param(qhimark, int, 0444);
-module_param(qlowmark, int, 0444);
+module_param(blimit, long, 0444);
+module_param(qhimark, long, 0444);
+module_param(qlowmark, long, 0444);
 
 int rcu_cpu_stall_suppress __read_mostly; /* 1 = suppress stall warnings. */
 int rcu_cpu_stall_timeout __read_mostly = CONFIG_RCU_CPU_STALL_TIMEOUT;
@@ -303,7 +300,8 @@ EXPORT_SYMBOL_GPL(rcu_sched_force_quiescent_state);
 static int
 cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 {
-	return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
+	return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
+	       rdp->nxttail[RCU_DONE_TAIL] != NULL;
 }
 
 /*
@@ -312,8 +310,11 @@ cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 static int
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-	return *rdp->nxttail[RCU_DONE_TAIL +
-			     ACCESS_ONCE(rsp->completed) != rdp->completed] &&
+	struct rcu_head **ntp;
+
+	ntp = rdp->nxttail[RCU_DONE_TAIL +
+			   (ACCESS_ONCE(rsp->completed) != rdp->completed)];
+	return rdp->nxttail[RCU_DONE_TAIL] && ntp && *ntp &&
 	       !rcu_gp_in_progress(rsp);
 }
 
@@ -416,29 +417,7 @@ EXPORT_SYMBOL_GPL(rcu_idle_enter);
  */
 void rcu_user_enter(void)
 {
-	unsigned long flags;
-	struct rcu_dynticks *rdtp;
-
-	/*
-	 * Some contexts may involve an exception occuring in an irq,
-	 * leading to that nesting:
-	 * rcu_irq_enter() rcu_user_exit() rcu_user_exit() rcu_irq_exit()
-	 * This would mess up the dyntick_nesting count though. And rcu_irq_*()
-	 * helpers are enough to protect RCU uses inside the exception. So
-	 * just return immediately if we detect we are in an IRQ.
-	 */
-	if (in_interrupt())
-		return;
-
-	WARN_ON_ONCE(!current->mm);
-
-	local_irq_save(flags);
-	rdtp = &__get_cpu_var(rcu_dynticks);
-	if (!rdtp->ignore_user_qs && !rdtp->in_user) {
-		rdtp->in_user = true;
-		rcu_eqs_enter(true);
-	}
-	local_irq_restore(flags);
+	rcu_eqs_enter(1);
 }
 
 /**
@@ -575,27 +554,7 @@ EXPORT_SYMBOL_GPL(rcu_idle_exit);
  */
 void rcu_user_exit(void)
 {
-	unsigned long flags;
-	struct rcu_dynticks *rdtp;
-
-	/*
-	 * Some contexts may involve an exception occuring in an irq,
-	 * leading to that nesting:
-	 * rcu_irq_enter() rcu_user_exit() rcu_user_exit() rcu_irq_exit()
-	 * This would mess up the dyntick_nesting count though. And rcu_irq_*()
-	 * helpers are enough to protect RCU uses inside the exception. So
-	 * just return immediately if we detect we are in an IRQ.
-	 */
-	if (in_interrupt())
-		return;
-
-	local_irq_save(flags);
-	rdtp = &__get_cpu_var(rcu_dynticks);
-	if (rdtp->in_user) {
-		rdtp->in_user = false;
-		rcu_eqs_exit(true);
-	}
-	local_irq_restore(flags);
+	rcu_eqs_exit(1);
 }
 
 /**
@@ -718,21 +677,6 @@ int rcu_is_cpu_idle(void)
 }
 EXPORT_SYMBOL(rcu_is_cpu_idle);
 
-#ifdef CONFIG_RCU_USER_QS
-void rcu_user_hooks_switch(struct task_struct *prev,
-			   struct task_struct *next)
-{
-	struct rcu_dynticks *rdtp;
-
-	/* Interrupts are disabled in context switch */
-	rdtp = &__get_cpu_var(rcu_dynticks);
-	if (!rdtp->ignore_user_qs) {
-		clear_tsk_thread_flag(prev, TIF_NOHZ);
-		set_tsk_thread_flag(next, TIF_NOHZ);
-	}
-}
-#endif /* #ifdef CONFIG_RCU_USER_QS */
-
 #if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU)
 
 /*
@@ -873,6 +817,29 @@ static void record_gp_stall_check_time(struct rcu_state *rsp)
 	rsp->jiffies_stall = jiffies + jiffies_till_stall_check();
 }
 
+/*
+ * Dump stacks of all tasks running on stalled CPUs.  This is a fallback
+ * for architectures that do not implement trigger_all_cpu_backtrace().
+ * The NMI-triggered stack traces are more accurate because they are
+ * printed by the target CPU.
+ */
+static void rcu_dump_cpu_stacks(struct rcu_state *rsp)
+{
+	int cpu;
+	unsigned long flags;
+	struct rcu_node *rnp;
+
+	rcu_for_each_leaf_node(rsp, rnp) {
+		raw_spin_lock_irqsave(&rnp->lock, flags);
+		if (rnp->qsmask != 0) {
+			for (cpu = 0; cpu <= rnp->grphi - rnp->grplo; cpu++)
+				if (rnp->qsmask & (1UL << cpu))
+					dump_cpu_task(rnp->grplo + cpu);
+		}
+		raw_spin_unlock_irqrestore(&rnp->lock, flags);
+	}
+}
+
 static void print_other_cpu_stall(struct rcu_state *rsp)
 {
 	int cpu;
@@ -880,6 +847,7 @@ static void print_other_cpu_stall(struct rcu_state *rsp)
 	unsigned long flags;
 	int ndetected = 0;
 	struct rcu_node *rnp = rcu_get_root(rsp);
+	long totqlen = 0;
 
 	/* Only let one CPU complain about others per time interval. */
 
@@ -924,12 +892,15 @@ static void print_other_cpu_stall(struct rcu_state *rsp)
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
 
 	print_cpu_stall_info_end();
-	printk(KERN_CONT "(detected by %d, t=%ld jiffies)\n",
-	       smp_processor_id(), (long)(jiffies - rsp->gp_start));
+	for_each_possible_cpu(cpu)
+		totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+	pr_cont("(detected by %d, t=%ld jiffies, g=%lu, c=%lu, q=%lu)\n",
+	       smp_processor_id(), (long)(jiffies - rsp->gp_start),
+	       rsp->gpnum, rsp->completed, totqlen);
 	if (ndetected == 0)
 		printk(KERN_ERR "INFO: Stall ended before state dump start\n");
 	else if (!trigger_all_cpu_backtrace())
-		dump_stack();
+		rcu_dump_cpu_stacks(rsp);
 
 	/* Complain about tasks blocking the grace period. */
 
@@ -940,8 +911,10 @@ static void print_other_cpu_stall(struct rcu_state *rsp)
 
 static void print_cpu_stall(struct rcu_state *rsp)
 {
+	int cpu;
 	unsigned long flags;
 	struct rcu_node *rnp = rcu_get_root(rsp);
+	long totqlen = 0;
 
 	/*
 	 * OK, time to rat on ourselves...
@@ -952,7 +925,10 @@ static void print_cpu_stall(struct rcu_state *rsp)
 	print_cpu_stall_info_begin();
 	print_cpu_stall_info(rsp, smp_processor_id());
 	print_cpu_stall_info_end();
-	printk(KERN_CONT " (t=%lu jiffies)\n", jiffies - rsp->gp_start);
+	for_each_possible_cpu(cpu)
+		totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+	pr_cont(" (t=%lu jiffies g=%lu c=%lu q=%lu)\n",
+		jiffies - rsp->gp_start, rsp->gpnum, rsp->completed, totqlen);
 	if (!trigger_all_cpu_backtrace())
 		dump_stack();
 
@@ -1091,6 +1067,7 @@ static void init_callback_list(struct rcu_data *rdp)
 	rdp->nxtlist = NULL;
 	for (i = 0; i < RCU_NEXT_SIZE; i++)
 		rdp->nxttail[i] = &rdp->nxtlist;
+	init_nocb_callback_list(rdp);
 }
 
 /*
@@ -1404,15 +1381,37 @@ rcu_start_gp(struct rcu_state *rsp, unsigned long flags)
 	    !cpu_needs_another_gp(rsp, rdp)) {
 		/*
 		 * Either we have not yet spawned the grace-period
-		 * task or this CPU does not need another grace period.
+		 * task, this CPU does not need another grace period,
+		 * or a grace period is already in progress.
 		 * Either way, don't start a new grace period.
 		 */
 		raw_spin_unlock_irqrestore(&rnp->lock, flags);
 		return;
 	}
 
+	/*
+	 * Because there is no grace period in progress right now,
+	 * any callbacks we have up to this point will be satisfied
+	 * by the next grace period.  So promote all callbacks to be
+	 * handled after the end of the next grace period.  If the
+	 * CPU is not yet aware of the end of the previous grace period,
+	 * we need to allow for the callback advancement that will
+	 * occur when it does become aware.  Deadlock prevents us from
+	 * making it aware at this point: We cannot acquire a leaf
+	 * rcu_node ->lock while holding the root rcu_node ->lock.
+	 */
+	rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+	if (rdp->completed == rsp->completed)
+		rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+
 	rsp->gp_flags = RCU_GP_FLAG_INIT;
-	raw_spin_unlock_irqrestore(&rnp->lock, flags);
+	raw_spin_unlock(&rnp->lock); /* Interrupts remain disabled. */
+
+	/* Ensure that CPU is aware of completion of last grace period. */
+	rcu_process_gp_end(rsp, rdp);
+	local_irq_restore(flags);
+
+	/* Wake up rcu_gp_kthread() to start the grace period. */
 	wake_up(&rsp->gp_wq);
 }
 
@@ -1573,16 +1572,20 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 /*
  * Send the specified CPU's RCU callbacks to the orphanage.  The
  * specified CPU must be offline, and the caller must hold the
- * ->onofflock.
+ * ->orphan_lock.
  */
 static void
 rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 			  struct rcu_node *rnp, struct rcu_data *rdp)
 {
+	/* No-CBs CPUs do not have orphanable callbacks. */
+	if (is_nocb_cpu(rdp->cpu))
+		return;
+
 	/*
 	 * Orphan the callbacks.  First adjust the counts.  This is safe
-	 * because ->onofflock excludes _rcu_barrier()'s adoption of
-	 * the callbacks, thus no memory barrier is required.
+	 * because _rcu_barrier() excludes CPU-hotplug operations, so it
+	 * cannot be running now.  Thus no memory barrier is required.
 	 */
 	if (rdp->nxtlist != NULL) {
 		rsp->qlen_lazy += rdp->qlen_lazy;
@@ -1623,13 +1626,17 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 
 /*
  * Adopt the RCU callbacks from the specified rcu_state structure's
- * orphanage.  The caller must hold the ->onofflock.
+ * orphanage.  The caller must hold the ->orphan_lock.
  */
 static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
 {
 	int i;
 	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
+	/* No-CBs CPUs are handled specially. */
+	if (rcu_nocb_adopt_orphan_cbs(rsp, rdp))
+		return;
+
 	/* Do the accounting first. */
 	rdp->qlen_lazy += rsp->qlen_lazy;
 	rdp->qlen += rsp->qlen;
@@ -1702,7 +1709,7 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
 	/* Exclude any attempts to start a new grace period. */
 	mutex_lock(&rsp->onoff_mutex);
-	raw_spin_lock_irqsave(&rsp->onofflock, flags);
+	raw_spin_lock_irqsave(&rsp->orphan_lock, flags);
 
 	/* Orphan the dead CPU's callbacks, and adopt them if appropriate. */
 	rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp);
@@ -1729,10 +1736,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 	/*
 	 * We still hold the leaf rcu_node structure lock here, and
 	 * irqs are still disabled.  The reason for this subterfuge is
-	 * because invoking rcu_report_unblock_qs_rnp() with ->onofflock
+	 * because invoking rcu_report_unblock_qs_rnp() with ->orphan_lock
 	 * held leads to deadlock.
 	 */
-	raw_spin_unlock(&rsp->onofflock); /* irqs remain disabled. */
+	raw_spin_unlock(&rsp->orphan_lock); /* irqs remain disabled. */
 	rnp = rdp->mynode;
 	if (need_report & RCU_OFL_TASKS_NORM_GP)
 		rcu_report_unblock_qs_rnp(rnp, flags);
@@ -1769,7 +1776,8 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 {
 	unsigned long flags;
 	struct rcu_head *next, *list, **tail;
-	int bl, count, count_lazy, i;
+	long bl, count, count_lazy;
+	int i;
 
 	/* If no callbacks are ready, just return.*/
 	if (!cpu_has_callbacks_ready_to_invoke(rdp)) {
@@ -2107,9 +2115,15 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 	}
 }
 
+/*
+ * Helper function for call_rcu() and friends.  The cpu argument will
+ * normally be -1, indicating "currently running CPU".  It may specify
+ * a CPU only if that CPU is a no-CBs CPU.  Currently, only _rcu_barrier()
+ * is expected to specify a CPU.
+ */
 static void
 __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
-	   struct rcu_state *rsp, bool lazy)
+	   struct rcu_state *rsp, int cpu, bool lazy)
 {
 	unsigned long flags;
 	struct rcu_data *rdp;
@@ -2129,9 +2143,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
 	rdp = this_cpu_ptr(rsp->rda);
 
 	/* Add the callback to our list. */
-	if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL)) {
+	if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+		int offline;
+
+		if (cpu != -1)
+			rdp = per_cpu_ptr(rsp->rda, cpu);
+		offline = !__call_rcu_nocb(rdp, head, lazy);
+		WARN_ON_ONCE(offline);
 		/* _call_rcu() is illegal on offline CPU; leak the callback. */
-		WARN_ON_ONCE(1);
 		local_irq_restore(flags);
 		return;
 	}
@@ -2160,7 +2179,7 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
  */
 void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_sched_state, 0);
+	__call_rcu(head, func, &rcu_sched_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_sched);
 
@@ -2169,7 +2188,7 @@ EXPORT_SYMBOL_GPL(call_rcu_sched);
  */
 void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_bh_state, 0);
+	__call_rcu(head, func, &rcu_bh_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
@@ -2205,10 +2224,28 @@ static inline int rcu_blocking_is_gp(void)
  * rcu_read_lock_sched().
  *
  * This means that all preempt_disable code sequences, including NMI and
- * hardware-interrupt handlers, in progress on entry will have completed
- * before this primitive returns.  However, this does not guarantee that
- * softirq handlers will have completed, since in some kernels, these
- * handlers can run in process context, and can block.
+ * non-threaded hardware-interrupt handlers, in progress on entry will
+ * have completed before this primitive returns.  However, this does not
+ * guarantee that softirq handlers will have completed, since in some
+ * kernels, these handlers can run in process context, and can block.
+ *
+ * Note that this guarantee implies further memory-ordering guarantees.
+ * On systems with more than one CPU, when synchronize_sched() returns,
+ * each CPU is guaranteed to have executed a full memory barrier since the
+ * end of its last RCU-sched read-side critical section whose beginning
+ * preceded the call to synchronize_sched().  In addition, each CPU having
+ * an RCU read-side critical section that extends beyond the return from
+ * synchronize_sched() is guaranteed to have executed a full memory barrier
+ * after the beginning of synchronize_sched() and before the beginning of
+ * that RCU read-side critical section.  Note that these guarantees include
+ * CPUs that are offline, idle, or executing in user mode, as well as CPUs
+ * that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked synchronize_sched(), which returned
+ * to its caller on CPU B, then both CPU A and CPU B are guaranteed
+ * to have executed a full memory barrier during the execution of
+ * synchronize_sched() -- even if CPU A and CPU B are the same CPU (but
+ * again only if the system has more than one CPU).
  *
  * This primitive provides the guarantees made by the (now removed)
  * synchronize_kernel() API.  In contrast, synchronize_rcu() only
@@ -2224,7 +2261,10 @@ void synchronize_sched(void)
 			   "Illegal synchronize_sched() in RCU-sched read-side critical section");
 	if (rcu_blocking_is_gp())
 		return;
-	wait_rcu_gp(call_rcu_sched);
+	if (rcu_expedited)
+		synchronize_sched_expedited();
+	else
+		wait_rcu_gp(call_rcu_sched);
 }
 EXPORT_SYMBOL_GPL(synchronize_sched);
 
@@ -2236,6 +2276,9 @@ EXPORT_SYMBOL_GPL(synchronize_sched);
  * read-side critical sections have completed.  RCU read-side critical
  * sections are delimited by rcu_read_lock_bh() and rcu_read_unlock_bh(),
  * and may be nested.
+ *
+ * See the description of synchronize_sched() for more detailed information
+ * on memory ordering guarantees.
  */
 void synchronize_rcu_bh(void)
 {
@@ -2245,13 +2288,13 @@ void synchronize_rcu_bh(void)
 			   "Illegal synchronize_rcu_bh() in RCU-bh read-side critical section");
 	if (rcu_blocking_is_gp())
 		return;
-	wait_rcu_gp(call_rcu_bh);
+	if (rcu_expedited)
+		synchronize_rcu_bh_expedited();
+	else
+		wait_rcu_gp(call_rcu_bh);
 }
 EXPORT_SYMBOL_GPL(synchronize_rcu_bh);
 
-static atomic_t sync_sched_expedited_started = ATOMIC_INIT(0);
-static atomic_t sync_sched_expedited_done = ATOMIC_INIT(0);
-
 static int synchronize_sched_expedited_cpu_stop(void *data)
 {
 	/*
@@ -2308,10 +2351,32 @@ static int synchronize_sched_expedited_cpu_stop(void *data)
  */
 void synchronize_sched_expedited(void)
 {
-	int firstsnap, s, snap, trycount = 0;
+	long firstsnap, s, snap;
+	int trycount = 0;
+	struct rcu_state *rsp = &rcu_sched_state;
+
+	/*
+	 * If we are in danger of counter wrap, just do synchronize_sched().
+	 * By allowing sync_sched_expedited_started to advance no more than
+	 * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring
+	 * that more than 3.5 billion CPUs would be required to force a
+	 * counter wrap on a 32-bit system.  Quite a few more CPUs would of
+	 * course be required on a 64-bit system.
+	 */
+	if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start),
+			 (ulong)atomic_long_read(&rsp->expedited_done) +
+			 ULONG_MAX / 8)) {
+		synchronize_sched();
+		atomic_long_inc(&rsp->expedited_wrap);
+		return;
+	}
 
-	/* Note that atomic_inc_return() implies full memory barrier. */
-	firstsnap = snap = atomic_inc_return(&sync_sched_expedited_started);
+	/*
+	 * Take a ticket.  Note that atomic_inc_return() implies a
+	 * full memory barrier.
+	 */
+	snap = atomic_long_inc_return(&rsp->expedited_start);
+	firstsnap = snap;
 	get_online_cpus();
 	WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));
 
@@ -2323,48 +2388,65 @@ void synchronize_sched_expedited(void)
 			     synchronize_sched_expedited_cpu_stop,
 			     NULL) == -EAGAIN) {
 		put_online_cpus();
+		atomic_long_inc(&rsp->expedited_tryfail);
+
+		/* Check to see if someone else did our work for us. */
+		s = atomic_long_read(&rsp->expedited_done);
+		if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+			/* ensure test happens before caller kfree */
+			smp_mb__before_atomic_inc(); /* ^^^ */
+			atomic_long_inc(&rsp->expedited_workdone1);
+			return;
+		}
 
 		/* No joy, try again later.  Or just synchronize_sched(). */
 		if (trycount++ < 10) {
 			udelay(trycount * num_online_cpus());
 		} else {
-			synchronize_sched();
+			wait_rcu_gp(call_rcu_sched);
+			atomic_long_inc(&rsp->expedited_normal);
 			return;
 		}
 
-		/* Check to see if someone else did our work for us. */
-		s = atomic_read(&sync_sched_expedited_done);
-		if (UINT_CMP_GE((unsigned)s, (unsigned)firstsnap)) {
-			smp_mb(); /* ensure test happens before caller kfree */
+		/* Recheck to see if someone else did our work for us. */
+		s = atomic_long_read(&rsp->expedited_done);
+		if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
+			/* ensure test happens before caller kfree */
+			smp_mb__before_atomic_inc(); /* ^^^ */
+			atomic_long_inc(&rsp->expedited_workdone2);
 			return;
 		}
 
 		/*
 		 * Refetching sync_sched_expedited_started allows later
-		 * callers to piggyback on our grace period.  We subtract
-		 * 1 to get the same token that the last incrementer got.
-		 * We retry after they started, so our grace period works
-		 * for them, and they started after our first try, so their
-		 * grace period works for us.
+		 * callers to piggyback on our grace period.  We retry
+		 * after they started, so our grace period works for them,
+		 * and they started after our first try, so their grace
+		 * period works for us.
 		 */
 		get_online_cpus();
-		snap = atomic_read(&sync_sched_expedited_started);
+		snap = atomic_long_read(&rsp->expedited_start);
 		smp_mb(); /* ensure read is before try_stop_cpus(). */
 	}
+	atomic_long_inc(&rsp->expedited_stoppedcpus);
 
 	/*
 	 * Everyone up to our most recent fetch is covered by our grace
 	 * period.  Update the counter, but only if our work is still
 	 * relevant -- which it won't be if someone who started later
-	 * than we did beat us to the punch.
+	 * than we did already did their update.
 	 */
 	do {
-		s = atomic_read(&sync_sched_expedited_done);
-		if (UINT_CMP_GE((unsigned)s, (unsigned)snap)) {
-			smp_mb(); /* ensure test happens before caller kfree */
+		atomic_long_inc(&rsp->expedited_done_tries);
+		s = atomic_long_read(&rsp->expedited_done);
+		if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
+			/* ensure test happens before caller kfree */
+			smp_mb__before_atomic_inc(); /* ^^^ */
+			atomic_long_inc(&rsp->expedited_done_lost);
 			break;
 		}
-	} while (atomic_cmpxchg(&sync_sched_expedited_done, s, snap) != s);
+	} while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
+	atomic_long_inc(&rsp->expedited_done_exit);
 
 	put_online_cpus();
 }
@@ -2558,9 +2640,17 @@ static void _rcu_barrier(struct rcu_state *rsp)
 	 * When that callback is invoked, we will know that all of the
 	 * corresponding CPU's preceding callbacks have been invoked.
 	 */
-	for_each_online_cpu(cpu) {
+	for_each_possible_cpu(cpu) {
+		if (!cpu_online(cpu) && !is_nocb_cpu(cpu))
+			continue;
 		rdp = per_cpu_ptr(rsp->rda, cpu);
-		if (ACCESS_ONCE(rdp->qlen)) {
+		if (is_nocb_cpu(cpu)) {
+			_rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
+					   rsp->n_barrier_done);
+			atomic_inc(&rsp->barrier_cpu_count);
+			__call_rcu(&rdp->barrier_head, rcu_barrier_callback,
+				   rsp, cpu, 0);
+		} else if (ACCESS_ONCE(rdp->qlen)) {
 			_rcu_barrier_trace(rsp, "OnlineQ", cpu,
 					   rsp->n_barrier_done);
 			smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -2634,6 +2724,7 @@ rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp)
 #endif
 	rdp->cpu = cpu;
 	rdp->rsp = rsp;
+	rcu_boot_init_nocb_percpu_data(rdp);
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
 }
 
@@ -2715,6 +2806,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu);
 	struct rcu_node *rnp = rdp->mynode;
 	struct rcu_state *rsp;
+	int ret = NOTIFY_OK;
 
 	trace_rcu_utilization("Start CPU hotplug");
 	switch (action) {
@@ -2728,7 +2820,10 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 		rcu_boost_kthread_setaffinity(rnp, -1);
 		break;
 	case CPU_DOWN_PREPARE:
-		rcu_boost_kthread_setaffinity(rnp, cpu);
+		if (nocb_cpu_expendable(cpu))
+			rcu_boost_kthread_setaffinity(rnp, cpu);
+		else
+			ret = NOTIFY_BAD;
 		break;
 	case CPU_DYING:
 	case CPU_DYING_FROZEN:
@@ -2752,7 +2847,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 		break;
 	}
 	trace_rcu_utilization("End CPU hotplug");
-	return NOTIFY_OK;
+	return ret;
 }
 
 /*
@@ -2772,6 +2867,7 @@ static int __init rcu_spawn_gp_kthread(void)
 		raw_spin_lock_irqsave(&rnp->lock, flags);
 		rsp->gp_kthread = t;
 		raw_spin_unlock_irqrestore(&rnp->lock, flags);
+		rcu_spawn_nocb_kthreads(rsp);
 	}
 	return 0;
 }
@@ -2967,6 +3063,7 @@ void __init rcu_init(void)
 	rcu_init_one(&rcu_sched_state, &rcu_sched_data);
 	rcu_init_one(&rcu_bh_state, &rcu_bh_data);
 	__rcu_init_preempt();
+	rcu_init_nocb();
 	 open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 
 	/*
diff --git a/kernel/rcutree.h b/kernel/rcutree.h
index a240f032848e..4b69291b093d 100644
--- a/kernel/rcutree.h
+++ b/kernel/rcutree.h
@@ -287,6 +287,7 @@ struct rcu_data {
 	long		qlen_last_fqs_check;
 					/* qlen at last check for QS forcing */
 	unsigned long	n_cbs_invoked;	/* count of RCU cbs invoked. */
+	unsigned long	n_nocbs_invoked; /* count of no-CBs RCU cbs invoked. */
 	unsigned long   n_cbs_orphaned; /* RCU cbs orphaned by dying CPU */
 	unsigned long   n_cbs_adopted;  /* RCU cbs adopted from dying CPU */
 	unsigned long	n_force_qs_snap;
@@ -317,6 +318,18 @@ struct rcu_data {
 	struct rcu_head oom_head;
 #endif /* #ifdef CONFIG_RCU_FAST_NO_HZ */
 
+	/* 7) Callback offloading. */
+#ifdef CONFIG_RCU_NOCB_CPU
+	struct rcu_head *nocb_head;	/* CBs waiting for kthread. */
+	struct rcu_head **nocb_tail;
+	atomic_long_t nocb_q_count;	/* # CBs waiting for kthread */
+	atomic_long_t nocb_q_count_lazy; /*  (approximate). */
+	int nocb_p_count;		/* # CBs being invoked by kthread */
+	int nocb_p_count_lazy;		/*  (approximate). */
+	wait_queue_head_t nocb_wq;	/* For nocb kthreads to sleep on. */
+	struct task_struct *nocb_kthread;
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
 	int cpu;
 	struct rcu_state *rsp;
 };
@@ -369,6 +382,12 @@ struct rcu_state {
 	struct rcu_data __percpu *rda;		/* pointer of percu rcu_data. */
 	void (*call)(struct rcu_head *head,	/* call_rcu() flavor. */
 		     void (*func)(struct rcu_head *head));
+#ifdef CONFIG_RCU_NOCB_CPU
+	void (*call_remote)(struct rcu_head *head,
+		     void (*func)(struct rcu_head *head));
+						/* call_rcu() flavor, but for */
+						/*  placing on remote CPU. */
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 
 	/* The following fields are guarded by the root rcu_node's lock. */
 
@@ -383,9 +402,8 @@ struct rcu_state {
 
 	/* End of fields guarded by root rcu_node's lock. */
 
-	raw_spinlock_t onofflock ____cacheline_internodealigned_in_smp;
-						/* exclude on/offline and */
-						/*  starting new GP. */
+	raw_spinlock_t orphan_lock ____cacheline_internodealigned_in_smp;
+						/* Protect following fields. */
 	struct rcu_head *orphan_nxtlist;	/* Orphaned callbacks that */
 						/*  need a grace period. */
 	struct rcu_head **orphan_nxttail;	/* Tail of above. */
@@ -394,7 +412,7 @@ struct rcu_state {
 	struct rcu_head **orphan_donetail;	/* Tail of above. */
 	long qlen_lazy;				/* Number of lazy callbacks. */
 	long qlen;				/* Total number of callbacks. */
-	/* End of fields guarded by onofflock. */
+	/* End of fields guarded by orphan_lock. */
 
 	struct mutex onoff_mutex;		/* Coordinate hotplug & GPs. */
 
@@ -405,6 +423,18 @@ struct rcu_state {
 						/*  _rcu_barrier(). */
 	/* End of fields guarded by barrier_mutex. */
 
+	atomic_long_t expedited_start;		/* Starting ticket. */
+	atomic_long_t expedited_done;		/* Done ticket. */
+	atomic_long_t expedited_wrap;		/* # near-wrap incidents. */
+	atomic_long_t expedited_tryfail;	/* # acquisition failures. */
+	atomic_long_t expedited_workdone1;	/* # done by others #1. */
+	atomic_long_t expedited_workdone2;	/* # done by others #2. */
+	atomic_long_t expedited_normal;		/* # fallbacks to normal. */
+	atomic_long_t expedited_stoppedcpus;	/* # successful stop_cpus. */
+	atomic_long_t expedited_done_tries;	/* # tries to update _done. */
+	atomic_long_t expedited_done_lost;	/* # times beaten to _done. */
+	atomic_long_t expedited_done_exit;	/* # times exited _done loop. */
+
 	unsigned long jiffies_force_qs;		/* Time at which to invoke */
 						/*  force_quiescent_state(). */
 	unsigned long n_force_qs;		/* Number of calls to */
@@ -428,6 +458,8 @@ struct rcu_state {
 #define RCU_GP_FLAG_FQS  0x2	/* Need grace-period quiescent-state forcing. */
 
 extern struct list_head rcu_struct_flavors;
+
+/* Sequence through rcu_state structures for each RCU flavor. */
 #define for_each_rcu_flavor(rsp) \
 	list_for_each_entry((rsp), &rcu_struct_flavors, flavors)
 
@@ -504,5 +536,32 @@ static void print_cpu_stall_info(struct rcu_state *rsp, int cpu);
 static void print_cpu_stall_info_end(void);
 static void zero_cpu_stall_ticks(struct rcu_data *rdp);
 static void increment_cpu_stall_ticks(void);
+static bool is_nocb_cpu(int cpu);
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+			    bool lazy);
+static bool rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+				      struct rcu_data *rdp);
+static bool nocb_cpu_expendable(int cpu);
+static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
+static void rcu_spawn_nocb_kthreads(struct rcu_state *rsp);
+static void init_nocb_callback_list(struct rcu_data *rdp);
+static void __init rcu_init_nocb(void);
 
 #endif /* #ifndef RCU_TREE_NONCORE */
+
+#ifdef CONFIG_RCU_TRACE
+#ifdef CONFIG_RCU_NOCB_CPU
+/* Sum up queue lengths for tracing. */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
+{
+	*ql = atomic_long_read(&rdp->nocb_q_count) + rdp->nocb_p_count;
+	*qll = atomic_long_read(&rdp->nocb_q_count_lazy) + rdp->nocb_p_count_lazy;
+}
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
+{
+	*ql = 0;
+	*qll = 0;
+}
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
+#endif /* #ifdef CONFIG_RCU_TRACE */
diff --git a/kernel/rcutree_plugin.h b/kernel/rcutree_plugin.h
index f92115488187..f6e5ec2932b4 100644
--- a/kernel/rcutree_plugin.h
+++ b/kernel/rcutree_plugin.h
@@ -25,6 +25,7 @@
  */
 
 #include <linux/delay.h>
+#include <linux/gfp.h>
 #include <linux/oom.h>
 #include <linux/smpboot.h>
 
@@ -36,6 +37,14 @@
 #define RCU_BOOST_PRIO RCU_KTHREAD_PRIO
 #endif
 
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool have_rcu_nocb_mask;	    /* Was rcu_nocb_mask allocated? */
+static bool rcu_nocb_poll;	    /* Offload kthread are to poll. */
+module_param(rcu_nocb_poll, bool, 0444);
+static char __initdata nocb_buf[NR_CPUS * 5];
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
 /*
  * Check the RCU kernel configuration parameters and print informative
  * messages about anything out of the ordinary.  If you like #ifdef, you
@@ -76,6 +85,18 @@ static void __init rcu_bootup_announce_oddness(void)
 		printk(KERN_INFO "\tExperimental boot-time adjustment of leaf fanout to %d.\n", rcu_fanout_leaf);
 	if (nr_cpu_ids != NR_CPUS)
 		printk(KERN_INFO "\tRCU restricting CPUs from NR_CPUS=%d to nr_cpu_ids=%d.\n", NR_CPUS, nr_cpu_ids);
+#ifdef CONFIG_RCU_NOCB_CPU
+	if (have_rcu_nocb_mask) {
+		if (cpumask_test_cpu(0, rcu_nocb_mask)) {
+			cpumask_clear_cpu(0, rcu_nocb_mask);
+			pr_info("\tCPU 0: illegal no-CBs CPU (cleared).\n");
+		}
+		cpulist_scnprintf(nocb_buf, sizeof(nocb_buf), rcu_nocb_mask);
+		pr_info("\tExperimental no-CBs CPUs: %s.\n", nocb_buf);
+		if (rcu_nocb_poll)
+			pr_info("\tExperimental polled no-CBs CPUs.\n");
+	}
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 }
 
 #ifdef CONFIG_TREE_PREEMPT_RCU
@@ -642,7 +663,7 @@ static void rcu_preempt_do_callbacks(void)
  */
 void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_preempt_state, 0);
+	__call_rcu(head, func, &rcu_preempt_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
@@ -656,7 +677,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
 void kfree_call_rcu(struct rcu_head *head,
 		    void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_preempt_state, 1);
+	__call_rcu(head, func, &rcu_preempt_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -670,6 +691,9 @@ EXPORT_SYMBOL_GPL(kfree_call_rcu);
  * concurrently with new RCU read-side critical sections that began while
  * synchronize_rcu() was waiting.  RCU read-side critical sections are
  * delimited by rcu_read_lock() and rcu_read_unlock(), and may be nested.
+ *
+ * See the description of synchronize_sched() for more detailed information
+ * on memory ordering guarantees.
  */
 void synchronize_rcu(void)
 {
@@ -679,7 +703,10 @@ void synchronize_rcu(void)
 			   "Illegal synchronize_rcu() in RCU read-side critical section");
 	if (!rcu_scheduler_active)
 		return;
-	wait_rcu_gp(call_rcu);
+	if (rcu_expedited)
+		synchronize_rcu_expedited();
+	else
+		wait_rcu_gp(call_rcu);
 }
 EXPORT_SYMBOL_GPL(synchronize_rcu);
 
@@ -757,7 +784,8 @@ static void rcu_report_exp_rnp(struct rcu_state *rsp, struct rcu_node *rnp,
  * grace period for the specified rcu_node structure.  If there are no such
  * tasks, report it up the rcu_node hierarchy.
  *
- * Caller must hold sync_rcu_preempt_exp_mutex and rsp->onofflock.
+ * Caller must hold sync_rcu_preempt_exp_mutex and must exclude
+ * CPU hotplug operations.
  */
 static void
 sync_rcu_preempt_exp_init(struct rcu_state *rsp, struct rcu_node *rnp)
@@ -831,7 +859,7 @@ void synchronize_rcu_expedited(void)
 			udelay(trycount * num_online_cpus());
 		} else {
 			put_online_cpus();
-			synchronize_rcu();
+			wait_rcu_gp(call_rcu);
 			return;
 		}
 	}
@@ -875,6 +903,11 @@ EXPORT_SYMBOL_GPL(synchronize_rcu_expedited);
 
 /**
  * rcu_barrier - Wait until all in-flight call_rcu() callbacks complete.
+ *
+ * Note that this primitive does not necessarily wait for an RCU grace period
+ * to complete.  For example, if there are no RCU callbacks queued anywhere
+ * in the system, then rcu_barrier() is within its rights to return
+ * immediately, without waiting for anything, much less an RCU grace period.
  */
 void rcu_barrier(void)
 {
@@ -1013,7 +1046,7 @@ static void rcu_preempt_check_callbacks(int cpu)
 void kfree_call_rcu(struct rcu_head *head,
 		    void (*func)(struct rcu_head *rcu))
 {
-	__call_rcu(head, func, &rcu_sched_state, 1);
+	__call_rcu(head, func, &rcu_sched_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -2092,3 +2125,373 @@ static void increment_cpu_stall_ticks(void)
 }
 
 #endif /* #else #ifdef CONFIG_RCU_CPU_STALL_INFO */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask.  For each CPU in the set, there is a
+ * kthread created that pulls the callbacks from the corresponding CPU,
+ * waits for a grace period to elapse, and invokes the callbacks.
+ * The no-CBs CPUs do a wake_up() on their kthread when they insert
+ * a callback into any empty list, unless the rcu_nocb_poll boot parameter
+ * has been specified, in which case each kthread actively polls its
+ * CPU.  (Which isn't so great for energy efficiency, but which does
+ * reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callback processing could also in theory be used as
+ * an energy-efficiency measure because CPUs with no RCU callbacks
+ * queued are more aggressive about entering dyntick-idle mode.
+ */
+
+
+/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
+static int __init rcu_nocb_setup(char *str)
+{
+	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+	have_rcu_nocb_mask = true;
+	cpulist_parse(str, rcu_nocb_mask);
+	return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+/* Is the specified CPU a no-CPUs CPU? */
+static bool is_nocb_cpu(int cpu)
+{
+	if (have_rcu_nocb_mask)
+		return cpumask_test_cpu(cpu, rcu_nocb_mask);
+	return false;
+}
+
+/*
+ * Enqueue the specified string of rcu_head structures onto the specified
+ * CPU's no-CBs lists.  The CPU is specified by rdp, the head of the
+ * string by rhp, and the tail of the string by rhtp.  The non-lazy/lazy
+ * counts are supplied by rhcount and rhcount_lazy.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
+				    struct rcu_head *rhp,
+				    struct rcu_head **rhtp,
+				    int rhcount, int rhcount_lazy)
+{
+	int len;
+	struct rcu_head **old_rhpp;
+	struct task_struct *t;
+
+	/* Enqueue the callback on the nocb list and update counts. */
+	old_rhpp = xchg(&rdp->nocb_tail, rhtp);
+	ACCESS_ONCE(*old_rhpp) = rhp;
+	atomic_long_add(rhcount, &rdp->nocb_q_count);
+	atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
+
+	/* If we are not being polled and there is a kthread, awaken it ... */
+	t = ACCESS_ONCE(rdp->nocb_kthread);
+	if (rcu_nocb_poll | !t)
+		return;
+	len = atomic_long_read(&rdp->nocb_q_count);
+	if (old_rhpp == &rdp->nocb_head) {
+		wake_up(&rdp->nocb_wq); /* ... only if queue was empty ... */
+		rdp->qlen_last_fqs_check = 0;
+	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
+		wake_up_process(t); /* ... or if many callbacks queued. */
+		rdp->qlen_last_fqs_check = LONG_MAX / 2;
+	}
+	return;
+}
+
+/*
+ * This is a helper for __call_rcu(), which invokes this when the normal
+ * callback queue is inoperable.  If this is not a no-CBs CPU, this
+ * function returns failure back to __call_rcu(), which can complain
+ * appropriately.
+ *
+ * Otherwise, this function queues the callback where the corresponding
+ * "rcuo" kthread can find it.
+ */
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+			    bool lazy)
+{
+
+	if (!is_nocb_cpu(rdp->cpu))
+		return 0;
+	__call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy);
+	return 1;
+}
+
+/*
+ * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
+ * not a no-CBs CPU.
+ */
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+						     struct rcu_data *rdp)
+{
+	long ql = rsp->qlen;
+	long qll = rsp->qlen_lazy;
+
+	/* If this is not a no-CBs CPU, tell the caller to do it the old way. */
+	if (!is_nocb_cpu(smp_processor_id()))
+		return 0;
+	rsp->qlen = 0;
+	rsp->qlen_lazy = 0;
+
+	/* First, enqueue the donelist, if any.  This preserves CB ordering. */
+	if (rsp->orphan_donelist != NULL) {
+		__call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
+					rsp->orphan_donetail, ql, qll);
+		ql = qll = 0;
+		rsp->orphan_donelist = NULL;
+		rsp->orphan_donetail = &rsp->orphan_donelist;
+	}
+	if (rsp->orphan_nxtlist != NULL) {
+		__call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
+					rsp->orphan_nxttail, ql, qll);
+		ql = qll = 0;
+		rsp->orphan_nxtlist = NULL;
+		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+	}
+	return 1;
+}
+
+/*
+ * There must be at least one non-no-CBs CPU in operation at any given
+ * time, because no-CBs CPUs are not capable of initiating grace periods
+ * independently.  This function therefore complains if the specified
+ * CPU is the last non-no-CBs CPU, allowing the CPU-hotplug system to
+ * avoid offlining the last such CPU.  (Recursion is a wonderful thing,
+ * but you have to have a base case!)
+ */
+static bool nocb_cpu_expendable(int cpu)
+{
+	cpumask_var_t non_nocb_cpus;
+	int ret;
+
+	/*
+	 * If there are no no-CB CPUs or if this CPU is not a no-CB CPU,
+	 * then offlining this CPU is harmless.  Let it happen.
+	 */
+	if (!have_rcu_nocb_mask || is_nocb_cpu(cpu))
+		return 1;
+
+	/* If no memory, play it safe and keep the CPU around. */
+	if (!alloc_cpumask_var(&non_nocb_cpus, GFP_NOIO))
+		return 0;
+	cpumask_andnot(non_nocb_cpus, cpu_online_mask, rcu_nocb_mask);
+	cpumask_clear_cpu(cpu, non_nocb_cpus);
+	ret = !cpumask_empty(non_nocb_cpus);
+	free_cpumask_var(non_nocb_cpus);
+	return ret;
+}
+
+/*
+ * Helper structure for remote registry of RCU callbacks.
+ * This is needed for when a no-CBs CPU needs to start a grace period.
+ * If it just invokes call_rcu(), the resulting callback will be queued,
+ * which can result in deadlock.
+ */
+struct rcu_head_remote {
+	struct rcu_head *rhp;
+	call_rcu_func_t *crf;
+	void (*func)(struct rcu_head *rhp);
+};
+
+/*
+ * Register a callback as specified by the rcu_head_remote struct.
+ * This function is intended to be invoked via smp_call_function_single().
+ */
+static void call_rcu_local(void *arg)
+{
+	struct rcu_head_remote *rhrp =
+		container_of(arg, struct rcu_head_remote, rhp);
+
+	rhrp->crf(rhrp->rhp, rhrp->func);
+}
+
+/*
+ * Set up an rcu_head_remote structure and the invoke call_rcu_local()
+ * on CPU 0 (which is guaranteed to be a non-no-CBs CPU) via
+ * smp_call_function_single().
+ */
+static void invoke_crf_remote(struct rcu_head *rhp,
+			      void (*func)(struct rcu_head *rhp),
+			      call_rcu_func_t crf)
+{
+	struct rcu_head_remote rhr;
+
+	rhr.rhp = rhp;
+	rhr.crf = crf;
+	rhr.func = func;
+	smp_call_function_single(0, call_rcu_local, &rhr, 1);
+}
+
+/*
+ * Helper functions to be passed to wait_rcu_gp(), each of which
+ * invokes invoke_crf_remote() to register a callback appropriately.
+ */
+static void __maybe_unused
+call_rcu_preempt_remote(struct rcu_head *rhp,
+			void (*func)(struct rcu_head *rhp))
+{
+	invoke_crf_remote(rhp, func, call_rcu);
+}
+static void call_rcu_bh_remote(struct rcu_head *rhp,
+			       void (*func)(struct rcu_head *rhp))
+{
+	invoke_crf_remote(rhp, func, call_rcu_bh);
+}
+static void call_rcu_sched_remote(struct rcu_head *rhp,
+				  void (*func)(struct rcu_head *rhp))
+{
+	invoke_crf_remote(rhp, func, call_rcu_sched);
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
+ * callbacks queued by the corresponding no-CBs CPU.
+ */
+static int rcu_nocb_kthread(void *arg)
+{
+	int c, cl;
+	struct rcu_head *list;
+	struct rcu_head *next;
+	struct rcu_head **tail;
+	struct rcu_data *rdp = arg;
+
+	/* Each pass through this loop invokes one batch of callbacks */
+	for (;;) {
+		/* If not polling, wait for next batch of callbacks. */
+		if (!rcu_nocb_poll)
+			wait_event(rdp->nocb_wq, rdp->nocb_head);
+		list = ACCESS_ONCE(rdp->nocb_head);
+		if (!list) {
+			schedule_timeout_interruptible(1);
+			continue;
+		}
+
+		/*
+		 * Extract queued callbacks, update counts, and wait
+		 * for a grace period to elapse.
+		 */
+		ACCESS_ONCE(rdp->nocb_head) = NULL;
+		tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
+		c = atomic_long_xchg(&rdp->nocb_q_count, 0);
+		cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
+		ACCESS_ONCE(rdp->nocb_p_count) += c;
+		ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
+		wait_rcu_gp(rdp->rsp->call_remote);
+
+		/* Each pass through the following loop invokes a callback. */
+		trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
+		c = cl = 0;
+		while (list) {
+			next = list->next;
+			/* Wait for enqueuing to complete, if needed. */
+			while (next == NULL && &list->next != tail) {
+				schedule_timeout_interruptible(1);
+				next = list->next;
+			}
+			debug_rcu_head_unqueue(list);
+			local_bh_disable();
+			if (__rcu_reclaim(rdp->rsp->name, list))
+				cl++;
+			c++;
+			local_bh_enable();
+			list = next;
+		}
+		trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
+		ACCESS_ONCE(rdp->nocb_p_count) -= c;
+		ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl;
+		rdp->n_nocbs_invoked += c;
+	}
+	return 0;
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+	rdp->nocb_tail = &rdp->nocb_head;
+	init_waitqueue_head(&rdp->nocb_wq);
+}
+
+/* Create a kthread for each RCU flavor for each no-CBs CPU. */
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+	int cpu;
+	struct rcu_data *rdp;
+	struct task_struct *t;
+
+	if (rcu_nocb_mask == NULL)
+		return;
+	for_each_cpu(cpu, rcu_nocb_mask) {
+		rdp = per_cpu_ptr(rsp->rda, cpu);
+		t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu);
+		BUG_ON(IS_ERR(t));
+		ACCESS_ONCE(rdp->nocb_kthread) = t;
+	}
+}
+
+/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+	if (rcu_nocb_mask == NULL ||
+	    !cpumask_test_cpu(rdp->cpu, rcu_nocb_mask))
+		return;
+	rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+}
+
+/* Initialize the ->call_remote fields in the rcu_state structures. */
+static void __init rcu_init_nocb(void)
+{
+#ifdef CONFIG_PREEMPT_RCU
+	rcu_preempt_state.call_remote = call_rcu_preempt_remote;
+#endif /* #ifdef CONFIG_PREEMPT_RCU */
+	rcu_bh_state.call_remote = call_rcu_bh_remote;
+	rcu_sched_state.call_remote = call_rcu_sched_remote;
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static bool is_nocb_cpu(int cpu)
+{
+	return false;
+}
+
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+			    bool lazy)
+{
+	return 0;
+}
+
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+						     struct rcu_data *rdp)
+{
+	return 0;
+}
+
+static bool nocb_cpu_expendable(int cpu)
+{
+	return 1;
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+}
+
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_init_nocb(void)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
index 693513bc50e6..0d095dcaa670 100644
--- a/kernel/rcutree_trace.c
+++ b/kernel/rcutree_trace.c
@@ -46,29 +46,58 @@
 #define RCU_TREE_NONCORE
 #include "rcutree.h"
 
-static int show_rcubarrier(struct seq_file *m, void *unused)
+#define ulong2long(a) (*(long *)(&(a)))
+
+static int r_open(struct inode *inode, struct file *file,
+					const struct seq_operations *op)
 {
-	struct rcu_state *rsp;
+	int ret = seq_open(file, op);
+	if (!ret) {
+		struct seq_file *m = (struct seq_file *)file->private_data;
+		m->private = inode->i_private;
+	}
+	return ret;
+}
+
+static void *r_start(struct seq_file *m, loff_t *pos)
+{
+	struct rcu_state *rsp = (struct rcu_state *)m->private;
+	*pos = cpumask_next(*pos - 1, cpu_possible_mask);
+	if ((*pos) < nr_cpu_ids)
+		return per_cpu_ptr(rsp->rda, *pos);
+	return NULL;
+}
 
-	for_each_rcu_flavor(rsp)
-		seq_printf(m, "%s: bcc: %d nbd: %lu\n",
-			   rsp->name,
-			   atomic_read(&rsp->barrier_cpu_count),
-			   rsp->n_barrier_done);
+static void *r_next(struct seq_file *m, void *v, loff_t *pos)
+{
+	(*pos)++;
+	return r_start(m, pos);
+}
+
+static void r_stop(struct seq_file *m, void *v)
+{
+}
+
+static int show_rcubarrier(struct seq_file *m, void *v)
+{
+	struct rcu_state *rsp = (struct rcu_state *)m->private;
+	seq_printf(m, "bcc: %d nbd: %lu\n",
+		   atomic_read(&rsp->barrier_cpu_count),
+		   rsp->n_barrier_done);
 	return 0;
 }
 
 static int rcubarrier_open(struct inode *inode, struct file *file)
 {
-	return single_open(file, show_rcubarrier, NULL);
+	return single_open(file, show_rcubarrier, inode->i_private);
 }
 
 static const struct file_operations rcubarrier_fops = {
 	.owner = THIS_MODULE,
 	.open = rcubarrier_open,
 	.read = seq_read,
-	.llseek = seq_lseek,
-	.release = single_release,
+	.llseek = no_llseek,
+	.release = seq_release,
 };
 
 #ifdef CONFIG_RCU_BOOST
@@ -84,12 +113,14 @@ static char convert_kthread_status(unsigned int kthread_status)
 
 static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 {
+	long ql, qll;
+
 	if (!rdp->beenonline)
 		return;
-	seq_printf(m, "%3d%cc=%lu g=%lu pq=%d qp=%d",
+	seq_printf(m, "%3d%cc=%ld g=%ld pq=%d qp=%d",
 		   rdp->cpu,
 		   cpu_is_offline(rdp->cpu) ? '!' : ' ',
-		   rdp->completed, rdp->gpnum,
+		   ulong2long(rdp->completed), ulong2long(rdp->gpnum),
 		   rdp->passed_quiesce, rdp->qs_pending);
 	seq_printf(m, " dt=%d/%llx/%d df=%lu",
 		   atomic_read(&rdp->dynticks->dynticks),
@@ -97,8 +128,11 @@ static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 		   rdp->dynticks->dynticks_nmi_nesting,
 		   rdp->dynticks_fqs);
 	seq_printf(m, " of=%lu", rdp->offline_fqs);
+	rcu_nocb_q_lengths(rdp, &ql, &qll);
+	qll += rdp->qlen_lazy;
+	ql += rdp->qlen;
 	seq_printf(m, " ql=%ld/%ld qs=%c%c%c%c",
-		   rdp->qlen_lazy, rdp->qlen,
+		   qll, ql,
 		   ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
 			rdp->nxttail[RCU_NEXT_TAIL]],
 		   ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=
@@ -114,101 +148,67 @@ static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 		   per_cpu(rcu_cpu_kthread_loops, rdp->cpu) & 0xffff);
 #endif /* #ifdef CONFIG_RCU_BOOST */
 	seq_printf(m, " b=%ld", rdp->blimit);
-	seq_printf(m, " ci=%lu co=%lu ca=%lu\n",
-		   rdp->n_cbs_invoked, rdp->n_cbs_orphaned, rdp->n_cbs_adopted);
+	seq_printf(m, " ci=%lu nci=%lu co=%lu ca=%lu\n",
+		   rdp->n_cbs_invoked, rdp->n_nocbs_invoked,
+		   rdp->n_cbs_orphaned, rdp->n_cbs_adopted);
 }
 
-static int show_rcudata(struct seq_file *m, void *unused)
+static int show_rcudata(struct seq_file *m, void *v)
 {
-	int cpu;
-	struct rcu_state *rsp;
-
-	for_each_rcu_flavor(rsp) {
-		seq_printf(m, "%s:\n", rsp->name);
-		for_each_possible_cpu(cpu)
-			print_one_rcu_data(m, per_cpu_ptr(rsp->rda, cpu));
-	}
+	print_one_rcu_data(m, (struct rcu_data *)v);
 	return 0;
 }
 
+static const struct seq_operations rcudate_op = {
+	.start = r_start,
+	.next  = r_next,
+	.stop  = r_stop,
+	.show  = show_rcudata,
+};
+
 static int rcudata_open(struct inode *inode, struct file *file)
 {
-	return single_open(file, show_rcudata, NULL);
+	return r_open(inode, file, &rcudate_op);
 }
 
 static const struct file_operations rcudata_fops = {
 	.owner = THIS_MODULE,
 	.open = rcudata_open,
 	.read = seq_read,
-	.llseek = seq_lseek,
-	.release = single_release,
+	.llseek = no_llseek,
+	.release = seq_release,
 };
 
-static void print_one_rcu_data_csv(struct seq_file *m, struct rcu_data *rdp)
-{
-	if (!rdp->beenonline)
-		return;
-	seq_printf(m, "%d,%s,%lu,%lu,%d,%d",
-		   rdp->cpu,
-		   cpu_is_offline(rdp->cpu) ? "\"N\"" : "\"Y\"",
-		   rdp->completed, rdp->gpnum,
-		   rdp->passed_quiesce, rdp->qs_pending);
-	seq_printf(m, ",%d,%llx,%d,%lu",
-		   atomic_read(&rdp->dynticks->dynticks),
-		   rdp->dynticks->dynticks_nesting,
-		   rdp->dynticks->dynticks_nmi_nesting,
-		   rdp->dynticks_fqs);
-	seq_printf(m, ",%lu", rdp->offline_fqs);
-	seq_printf(m, ",%ld,%ld,\"%c%c%c%c\"", rdp->qlen_lazy, rdp->qlen,
-		   ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
-			rdp->nxttail[RCU_NEXT_TAIL]],
-		   ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=
-			rdp->nxttail[RCU_NEXT_READY_TAIL]],
-		   ".W"[rdp->nxttail[RCU_DONE_TAIL] !=
-			rdp->nxttail[RCU_WAIT_TAIL]],
-		   ".D"[&rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL]]);
-#ifdef CONFIG_RCU_BOOST
-	seq_printf(m, ",%d,\"%c\"",
-		   per_cpu(rcu_cpu_has_work, rdp->cpu),
-		   convert_kthread_status(per_cpu(rcu_cpu_kthread_status,
-					  rdp->cpu)));
-#endif /* #ifdef CONFIG_RCU_BOOST */
-	seq_printf(m, ",%ld", rdp->blimit);
-	seq_printf(m, ",%lu,%lu,%lu\n",
-		   rdp->n_cbs_invoked, rdp->n_cbs_orphaned, rdp->n_cbs_adopted);
-}
-
-static int show_rcudata_csv(struct seq_file *m, void *unused)
+static int show_rcuexp(struct seq_file *m, void *v)
 {
-	int cpu;
-	struct rcu_state *rsp;
-
-	seq_puts(m, "\"CPU\",\"Online?\",\"c\",\"g\",\"pq\",\"pq\",");
-	seq_puts(m, "\"dt\",\"dt nesting\",\"dt NMI nesting\",\"df\",");
-	seq_puts(m, "\"of\",\"qll\",\"ql\",\"qs\"");
-#ifdef CONFIG_RCU_BOOST
-	seq_puts(m, "\"kt\",\"ktl\"");
-#endif /* #ifdef CONFIG_RCU_BOOST */
-	seq_puts(m, ",\"b\",\"ci\",\"co\",\"ca\"\n");
-	for_each_rcu_flavor(rsp) {
-		seq_printf(m, "\"%s:\"\n", rsp->name);
-		for_each_possible_cpu(cpu)
-			print_one_rcu_data_csv(m, per_cpu_ptr(rsp->rda, cpu));
-	}
+	struct rcu_state *rsp = (struct rcu_state *)m->private;
+
+	seq_printf(m, "s=%lu d=%lu w=%lu tf=%lu wd1=%lu wd2=%lu n=%lu sc=%lu dt=%lu dl=%lu dx=%lu\n",
+		   atomic_long_read(&rsp->expedited_start),
+		   atomic_long_read(&rsp->expedited_done),
+		   atomic_long_read(&rsp->expedited_wrap),
+		   atomic_long_read(&rsp->expedited_tryfail),
+		   atomic_long_read(&rsp->expedited_workdone1),
+		   atomic_long_read(&rsp->expedited_workdone2),
+		   atomic_long_read(&rsp->expedited_normal),
+		   atomic_long_read(&rsp->expedited_stoppedcpus),
+		   atomic_long_read(&rsp->expedited_done_tries),
+		   atomic_long_read(&rsp->expedited_done_lost),
+		   atomic_long_read(&rsp->expedited_done_exit));
 	return 0;
 }
 
-static int rcudata_csv_open(struct inode *inode, struct file *file)
+static int rcuexp_open(struct inode *inode, struct file *file)
 {
-	return single_open(file, show_rcudata_csv, NULL);
+	return single_open(file, show_rcuexp, inode->i_private);
 }
 
-static const struct file_operations rcudata_csv_fops = {
+static const struct file_operations rcuexp_fops = {
 	.owner = THIS_MODULE,
-	.open = rcudata_csv_open,
+	.open = rcuexp_open,
 	.read = seq_read,
-	.llseek = seq_lseek,
-	.release = single_release,
+	.llseek = no_llseek,
+	.release = seq_release,
 };
 
 #ifdef CONFIG_RCU_BOOST
@@ -254,27 +254,11 @@ static const struct file_operations rcu_node_boost_fops = {
 	.owner = THIS_MODULE,
 	.open = rcu_node_boost_open,
 	.read = seq_read,
-	.llseek = seq_lseek,
+	.llseek = no_llseek,
 	.release = single_release,
 };
 
-/*
- * Create the rcuboost debugfs entry.  Standard error return.
- */
-static int rcu_boost_trace_create_file(struct dentry *rcudir)
-{
-	return !debugfs_create_file("rcuboost", 0444, rcudir, NULL,
-				    &rcu_node_boost_fops);
-}
-
-#else /* #ifdef CONFIG_RCU_BOOST */
-
-static int rcu_boost_trace_create_file(struct dentry *rcudir)
-{
-	return 0;  /* There cannot be an error if we didn't create it! */
-}
-
-#endif /* #else #ifdef CONFIG_RCU_BOOST */
+#endif /* #ifdef CONFIG_RCU_BOOST */
 
 static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 {
@@ -283,8 +267,9 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 	struct rcu_node *rnp;
 
 	gpnum = rsp->gpnum;
-	seq_printf(m, "%s: c=%lu g=%lu s=%d jfq=%ld j=%x ",
-		   rsp->name, rsp->completed, gpnum, rsp->fqs_state,
+	seq_printf(m, "c=%ld g=%ld s=%d jfq=%ld j=%x ",
+		   ulong2long(rsp->completed), ulong2long(gpnum),
+		   rsp->fqs_state,
 		   (long)(rsp->jiffies_force_qs - jiffies),
 		   (int)(jiffies & 0xffff));
 	seq_printf(m, "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
@@ -306,26 +291,24 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 	seq_puts(m, "\n");
 }
 
-static int show_rcuhier(struct seq_file *m, void *unused)
+static int show_rcuhier(struct seq_file *m, void *v)
 {
-	struct rcu_state *rsp;
-
-	for_each_rcu_flavor(rsp)
-		print_one_rcu_state(m, rsp);
+	struct rcu_state *rsp = (struct rcu_state *)m->private;
+	print_one_rcu_state(m, rsp);
 	return 0;
 }
 
 static int rcuhier_open(struct inode *inode, struct file *file)
 {
-	return single_open(file, show_rcuhier, NULL);
+	return single_open(file, show_rcuhier, inode->i_private);
 }
 
 static const struct file_operations rcuhier_fops = {
 	.owner = THIS_MODULE,
 	.open = rcuhier_open,
 	.read = seq_read,
-	.llseek = seq_lseek,
-	.release = single_release,
+	.llseek = no_llseek,
+	.release = seq_release,
 };
 
 static void show_one_rcugp(struct seq_file *m, struct rcu_state *rsp)
@@ -338,42 +321,42 @@ static void show_one_rcugp(struct seq_file *m, struct rcu_state *rsp)
 	struct rcu_node *rnp = &rsp->node[0];
 
 	raw_spin_lock_irqsave(&rnp->lock, flags);
-	completed = rsp->completed;
-	gpnum = rsp->gpnum;
-	if (rsp->completed == rsp->gpnum)
+	completed = ACCESS_ONCE(rsp->completed);
+	gpnum = ACCESS_ONCE(rsp->gpnum);
+	if (completed == gpnum)
 		gpage = 0;
 	else
 		gpage = jiffies - rsp->gp_start;
 	gpmax = rsp->gp_max;
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
-	seq_printf(m, "%s: completed=%ld  gpnum=%lu  age=%ld  max=%ld\n",
-		   rsp->name, completed, gpnum, gpage, gpmax);
+	seq_printf(m, "completed=%ld  gpnum=%ld  age=%ld  max=%ld\n",
+		   ulong2long(completed), ulong2long(gpnum), gpage, gpmax);
 }
 
-static int show_rcugp(struct seq_file *m, void *unused)
+static int show_rcugp(struct seq_file *m, void *v)
 {
-	struct rcu_state *rsp;
-
-	for_each_rcu_flavor(rsp)
-		show_one_rcugp(m, rsp);
+	struct rcu_state *rsp = (struct rcu_state *)m->private;
+	show_one_rcugp(m, rsp);
 	return 0;
 }
 
 static int rcugp_open(struct inode *inode, struct file *file)
 {
-	return single_open(file, show_rcugp, NULL);
+	return single_open(file, show_rcugp, inode->i_private);
 }
 
 static const struct file_operations rcugp_fops = {
 	.owner = THIS_MODULE,
 	.open = rcugp_open,
 	.read = seq_read,
-	.llseek = seq_lseek,
-	.release = single_release,
+	.llseek = no_llseek,
+	.release = seq_release,
 };
 
 static void print_one_rcu_pending(struct seq_file *m, struct rcu_data *rdp)
 {
+	if (!rdp->beenonline)
+		return;
 	seq_printf(m, "%3d%cnp=%ld ",
 		   rdp->cpu,
 		   cpu_is_offline(rdp->cpu) ? '!' : ' ',
@@ -389,34 +372,30 @@ static void print_one_rcu_pending(struct seq_file *m, struct rcu_data *rdp)
 		   rdp->n_rp_need_nothing);
 }
 
-static int show_rcu_pending(struct seq_file *m, void *unused)
+static int show_rcu_pending(struct seq_file *m, void *v)
 {
-	int cpu;
-	struct rcu_data *rdp;
-	struct rcu_state *rsp;
-
-	for_each_rcu_flavor(rsp) {
-		seq_printf(m, "%s:\n", rsp->name);
-		for_each_possible_cpu(cpu) {
-			rdp = per_cpu_ptr(rsp->rda, cpu);
-			if (rdp->beenonline)
-				print_one_rcu_pending(m, rdp);
-		}
-	}
+	print_one_rcu_pending(m, (struct rcu_data *)v);
 	return 0;
 }
 
+static const struct seq_operations rcu_pending_op = {
+	.start = r_start,
+	.next  = r_next,
+	.stop  = r_stop,
+	.show  = show_rcu_pending,
+};
+
 static int rcu_pending_open(struct inode *inode, struct file *file)
 {
-	return single_open(file, show_rcu_pending, NULL);
+	return r_open(inode, file, &rcu_pending_op);
 }
 
 static const struct file_operations rcu_pending_fops = {
 	.owner = THIS_MODULE,
 	.open = rcu_pending_open,
 	.read = seq_read,
-	.llseek = seq_lseek,
-	.release = single_release,
+	.llseek = no_llseek,
+	.release = seq_release,
 };
 
 static int show_rcutorture(struct seq_file *m, void *unused)
@@ -446,43 +425,58 @@ static struct dentry *rcudir;
 
 static int __init rcutree_trace_init(void)
 {
+	struct rcu_state *rsp;
 	struct dentry *retval;
+	struct dentry *rspdir;
 
 	rcudir = debugfs_create_dir("rcu", NULL);
 	if (!rcudir)
 		goto free_out;
 
-	retval = debugfs_create_file("rcubarrier", 0444, rcudir,
-						NULL, &rcubarrier_fops);
-	if (!retval)
-		goto free_out;
-
-	retval = debugfs_create_file("rcudata", 0444, rcudir,
-						NULL, &rcudata_fops);
-	if (!retval)
-		goto free_out;
-
-	retval = debugfs_create_file("rcudata.csv", 0444, rcudir,
-						NULL, &rcudata_csv_fops);
-	if (!retval)
-		goto free_out;
-
-	if (rcu_boost_trace_create_file(rcudir))
-		goto free_out;
+	for_each_rcu_flavor(rsp) {
+		rspdir = debugfs_create_dir(rsp->name, rcudir);
+		if (!rspdir)
+			goto free_out;
+
+		retval = debugfs_create_file("rcudata", 0444,
+				rspdir, rsp, &rcudata_fops);
+		if (!retval)
+			goto free_out;
+
+		retval = debugfs_create_file("rcuexp", 0444,
+				rspdir, rsp, &rcuexp_fops);
+		if (!retval)
+			goto free_out;
+
+		retval = debugfs_create_file("rcu_pending", 0444,
+				rspdir, rsp, &rcu_pending_fops);
+		if (!retval)
+			goto free_out;
+
+		retval = debugfs_create_file("rcubarrier", 0444,
+				rspdir, rsp, &rcubarrier_fops);
+		if (!retval)
+			goto free_out;
 
-	retval = debugfs_create_file("rcugp", 0444, rcudir, NULL, &rcugp_fops);
-	if (!retval)
-		goto free_out;
+#ifdef CONFIG_RCU_BOOST
+		if (rsp == &rcu_preempt_state) {
+			retval = debugfs_create_file("rcuboost", 0444,
+				rspdir, NULL, &rcu_node_boost_fops);
+			if (!retval)
+				goto free_out;
+		}
+#endif
 
-	retval = debugfs_create_file("rcuhier", 0444, rcudir,
-						NULL, &rcuhier_fops);
-	if (!retval)
-		goto free_out;
+		retval = debugfs_create_file("rcugp", 0444,
+				rspdir, rsp, &rcugp_fops);
+		if (!retval)
+			goto free_out;
 
-	retval = debugfs_create_file("rcu_pending", 0444, rcudir,
-						NULL, &rcu_pending_fops);
-	if (!retval)
-		goto free_out;
+		retval = debugfs_create_file("rcuhier", 0444,
+				rspdir, rsp, &rcuhier_fops);
+		if (!retval)
+			goto free_out;
+	}
 
 	retval = debugfs_create_file("rcutorture", 0444, rcudir,
 						NULL, &rcutorture_fops);
diff --git a/kernel/sched/core.c b/kernel/sched/core.c
index 2d8927fda712..80f80dfca70e 100644
--- a/kernel/sched/core.c
+++ b/kernel/sched/core.c
@@ -72,6 +72,7 @@
 #include <linux/slab.h>
 #include <linux/init_task.h>
 #include <linux/binfmts.h>
+#include <linux/context_tracking.h>
 
 #include <asm/switch_to.h>
 #include <asm/tlb.h>
@@ -1886,8 +1887,8 @@ context_switch(struct rq *rq, struct task_struct *prev,
 	spin_release(&rq->lock.dep_map, 1, _THIS_IP_);
 #endif
 
+	context_tracking_task_switch(prev, next);
 	/* Here we just switch the register state and the stack. */
-	rcu_switch(prev, next);
 	switch_to(prev, next, prev);
 
 	barrier();
@@ -2911,7 +2912,7 @@ asmlinkage void __sched schedule(void)
 }
 EXPORT_SYMBOL(schedule);
 
-#ifdef CONFIG_RCU_USER_QS
+#ifdef CONFIG_CONTEXT_TRACKING
 asmlinkage void __sched schedule_user(void)
 {
 	/*
@@ -2920,9 +2921,9 @@ asmlinkage void __sched schedule_user(void)
 	 * we haven't yet exited the RCU idle mode. Do it here manually until
 	 * we find a better solution.
 	 */
-	rcu_user_exit();
+	user_exit();
 	schedule();
-	rcu_user_enter();
+	user_enter();
 }
 #endif
 
@@ -3027,7 +3028,7 @@ asmlinkage void __sched preempt_schedule_irq(void)
 	/* Catch callers which need to be fixed */
 	BUG_ON(ti->preempt_count || !irqs_disabled());
 
-	rcu_user_exit();
+	user_exit();
 	do {
 		add_preempt_count(PREEMPT_ACTIVE);
 		local_irq_enable();
@@ -4474,6 +4475,7 @@ static const char stat_nam[] = TASK_STATE_TO_CHAR_STR;
 void sched_show_task(struct task_struct *p)
 {
 	unsigned long free = 0;
+	int ppid;
 	unsigned state;
 
 	state = p->state ? __ffs(p->state) + 1 : 0;
@@ -4493,8 +4495,11 @@ void sched_show_task(struct task_struct *p)
 #ifdef CONFIG_DEBUG_STACK_USAGE
 	free = stack_not_used(p);
 #endif
+	rcu_read_lock();
+	ppid = task_pid_nr(rcu_dereference(p->real_parent));
+	rcu_read_unlock();
 	printk(KERN_CONT "%5lu %5d %6d 0x%08lx\n", free,
-		task_pid_nr(p), task_pid_nr(rcu_dereference(p->real_parent)),
+		task_pid_nr(p), ppid,
 		(unsigned long)task_thread_info(p)->flags);
 
 	show_stack(p, NULL);
@@ -8076,3 +8081,9 @@ struct cgroup_subsys cpuacct_subsys = {
 	.base_cftypes = files,
 };
 #endif	/* CONFIG_CGROUP_CPUACCT */
+
+void dump_cpu_task(int cpu)
+{
+	pr_info("Task dump for CPU %d:\n", cpu);
+	sched_show_task(cpu_curr(cpu));
+}
diff --git a/kernel/srcu.c b/kernel/srcu.c
index 97c465ebd844..2b859828cdc3 100644
--- a/kernel/srcu.c
+++ b/kernel/srcu.c
@@ -16,8 +16,10 @@
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  *
  * Copyright (C) IBM Corporation, 2006
+ * Copyright (C) Fujitsu, 2012
  *
  * Author: Paul McKenney <paulmck@us.ibm.com>
+ *	   Lai Jiangshan <laijs@cn.fujitsu.com>
  *
  * For detailed explanation of Read-Copy Update mechanism see -
  * 		Documentation/RCU/ *.txt
@@ -34,6 +36,10 @@
 #include <linux/delay.h>
 #include <linux/srcu.h>
 
+#include <trace/events/rcu.h>
+
+#include "rcu.h"
+
 /*
  * Initialize an rcu_batch structure to empty.
  */
@@ -92,9 +98,6 @@ static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
 	}
 }
 
-/* single-thread state-machine */
-static void process_srcu(struct work_struct *work);
-
 static int init_srcu_struct_fields(struct srcu_struct *sp)
 {
 	sp->completed = 0;
@@ -464,7 +467,9 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
  */
 void synchronize_srcu(struct srcu_struct *sp)
 {
-	__synchronize_srcu(sp, SYNCHRONIZE_SRCU_TRYCOUNT);
+	__synchronize_srcu(sp, rcu_expedited
+			   ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT
+			   : SYNCHRONIZE_SRCU_TRYCOUNT);
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
@@ -637,7 +642,7 @@ static void srcu_reschedule(struct srcu_struct *sp)
 /*
  * This is the work-queue function that handles SRCU grace periods.
  */
-static void process_srcu(struct work_struct *work)
+void process_srcu(struct work_struct *work)
 {
 	struct srcu_struct *sp;
 
@@ -648,3 +653,4 @@ static void process_srcu(struct work_struct *work)
 	srcu_invoke_callbacks(sp);
 	srcu_reschedule(sp);
 }
+EXPORT_SYMBOL_GPL(process_srcu);