summary refs log tree commit diff
path: root/kernel/rcu
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-01-20 10:25:12 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2014-01-20 10:25:12 -0800
commita693c46e14c9fdadbcd68ddfa94a4f72495531a9 (patch)
treeae8cd363c78959159b3b897b13c2d78c6923d355 /kernel/rcu
parent6ffbe7d1fabddc768724656f159759cae7818cd9 (diff)
parent73a7ac2808fa52bdab1781646568b6f90c3d7034 (diff)
downloadlinux-a693c46e14c9fdadbcd68ddfa94a4f72495531a9.tar.gz
Merge branch 'core-rcu-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip
Pull RCU updates from Ingo Molnar:
 - add RCU torture scripts/tooling
 - static analysis improvements
 - update RCU documentation
 - miscellaneous fixes

* 'core-rcu-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip: (52 commits)
  rcu: Remove "extern" from function declarations in kernel/rcu/rcu.h
  rcu: Remove "extern" from function declarations in include/linux/*rcu*.h
  rcu/torture: Dynamically allocate SRCU output buffer to avoid overflow
  rcu: Don't activate RCU core on NO_HZ_FULL CPUs
  rcu: Warn on allegedly impossible rcu_read_unlock_special() from irq
  rcu: Add an RCU_INITIALIZER for global RCU-protected pointers
  rcu: Make rcu_assign_pointer's assignment volatile and type-safe
  bonding: Use RCU_INIT_POINTER() for better overhead and for sparse
  rcu: Add comment on evaluate-once properties of rcu_assign_pointer().
  rcu: Provide better diagnostics for blocking in RCU callback functions
  rcu: Improve SRCU's grace-period comments
  rcu: Fix CONFIG_RCU_FANOUT_EXACT for odd fanout/leaf values
  rcu: Fix coccinelle warnings
  rcutorture: Stop tracking FSF's postal address
  rcutorture: Move checkarg to functions.sh
  rcutorture: Flag errors and warnings with color coding
  rcutorture: Record results from repeated runs of the same test scenario
  rcutorture: Test summary at end of run with less chattiness
  rcutorture: Update comment in kvm.sh listing typical RCU trace events
  rcutorture: Add tracing-enabled version of TREE08
  ...
Diffstat (limited to 'kernel/rcu')
-rw-r--r--kernel/rcu/rcu.h5
-rw-r--r--kernel/rcu/srcu.c57
-rw-r--r--kernel/rcu/torture.c75
-rw-r--r--kernel/rcu/tree.c79
-rw-r--r--kernel/rcu/tree.h12
-rw-r--r--kernel/rcu/tree_plugin.h89
-rw-r--r--kernel/rcu/tree_trace.c3
-rw-r--r--kernel/rcu/update.c5
8 files changed, 251 insertions, 74 deletions
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index 7859a0a3951e..79c3877e9c5b 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -96,19 +96,22 @@ static inline void debug_rcu_head_unqueue(struct rcu_head *head)
 }
 #endif	/* #else !CONFIG_DEBUG_OBJECTS_RCU_HEAD */
 
-extern void kfree(const void *);
+void kfree(const void *);
 
 static inline bool __rcu_reclaim(const char *rn, struct rcu_head *head)
 {
 	unsigned long offset = (unsigned long)head->func;
 
+	rcu_lock_acquire(&rcu_callback_map);
 	if (__is_kfree_rcu_offset(offset)) {
 		RCU_TRACE(trace_rcu_invoke_kfree_callback(rn, head, offset));
 		kfree((void *)head - offset);
+		rcu_lock_release(&rcu_callback_map);
 		return 1;
 	} else {
 		RCU_TRACE(trace_rcu_invoke_callback(rn, head));
 		head->func(head);
+		rcu_lock_release(&rcu_callback_map);
 		return 0;
 	}
 }
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c
index 01d5ccb8bfe3..3318d8284384 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcu.c
@@ -363,6 +363,29 @@ static void srcu_flip(struct srcu_struct *sp)
 /*
  * Enqueue an SRCU callback on the specified srcu_struct structure,
  * initiating grace-period processing if it is not already running.
+ *
+ * Note that all CPUs must agree that the grace period extended beyond
+ * all pre-existing SRCU read-side critical section.  On systems with
+ * more than one CPU, this means that when "func()" is invoked, each CPU
+ * is guaranteed to have executed a full memory barrier since the end of
+ * its last corresponding SRCU read-side critical section whose beginning
+ * preceded the call to call_rcu().  It also means that each CPU executing
+ * an SRCU read-side critical section that continues beyond the start of
+ * "func()" must have executed a memory barrier after the call_rcu()
+ * but before the beginning of that SRCU read-side critical section.
+ * Note that these guarantees include CPUs that are offline, idle, or
+ * executing in user mode, as well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
+ * resulting SRCU callback function "func()", then both CPU A and CPU
+ * B are guaranteed to execute a full memory barrier during the time
+ * interval between the call to call_rcu() and the invocation of "func()".
+ * This guarantee applies even if CPU A and CPU B are the same CPU (but
+ * again only if the system has more than one CPU).
+ *
+ * Of course, these guarantees apply only for invocations of call_srcu(),
+ * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
+ * srcu_struct structure.
  */
 void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
 		void (*func)(struct rcu_head *head))
@@ -459,7 +482,30 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
  * Note that it is illegal to call synchronize_srcu() from the corresponding
  * SRCU read-side critical section; doing so will result in deadlock.
  * However, it is perfectly legal to call synchronize_srcu() on one
- * srcu_struct from some other srcu_struct's read-side critical section.
+ * srcu_struct from some other srcu_struct's read-side critical section,
+ * as long as the resulting graph of srcu_structs is acyclic.
+ *
+ * There are memory-ordering constraints implied by synchronize_srcu().
+ * On systems with more than one CPU, when synchronize_srcu() returns,
+ * each CPU is guaranteed to have executed a full memory barrier since
+ * the end of its last corresponding SRCU-sched read-side critical section
+ * whose beginning preceded the call to synchronize_srcu().  In addition,
+ * each CPU having an SRCU read-side critical section that extends beyond
+ * the return from synchronize_srcu() is guaranteed to have executed a
+ * full memory barrier after the beginning of synchronize_srcu() and before
+ * the beginning of that SRCU read-side critical section.  Note that these
+ * guarantees include CPUs that are offline, idle, or executing in user mode,
+ * as well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked synchronize_srcu(), which returned
+ * to its caller on CPU B, then both CPU A and CPU B are guaranteed
+ * to have executed a full memory barrier during the execution of
+ * synchronize_srcu().  This guarantee applies even if CPU A and CPU B
+ * are the same CPU, but again only if the system has more than one CPU.
+ *
+ * Of course, these memory-ordering guarantees apply only when
+ * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are
+ * passed the same srcu_struct structure.
  */
 void synchronize_srcu(struct srcu_struct *sp)
 {
@@ -476,12 +522,8 @@ EXPORT_SYMBOL_GPL(synchronize_srcu);
  * Wait for an SRCU grace period to elapse, but be more aggressive about
  * spinning rather than blocking when waiting.
  *
- * Note that it is also illegal to call synchronize_srcu_expedited()
- * from the corresponding SRCU read-side critical section;
- * doing so will result in deadlock.  However, it is perfectly legal
- * to call synchronize_srcu_expedited() on one srcu_struct from some
- * other srcu_struct's read-side critical section, as long as
- * the resulting graph of srcu_structs is acyclic.
+ * Note that synchronize_srcu_expedited() has the same deadlock and
+ * memory-ordering properties as does synchronize_srcu().
  */
 void synchronize_srcu_expedited(struct srcu_struct *sp)
 {
@@ -491,6 +533,7 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
 /**
  * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
+ * @sp: srcu_struct on which to wait for in-flight callbacks.
  */
 void srcu_barrier(struct srcu_struct *sp)
 {
diff --git a/kernel/rcu/torture.c b/kernel/rcu/torture.c
index 3929cd451511..732f8ae3086a 100644
--- a/kernel/rcu/torture.c
+++ b/kernel/rcu/torture.c
@@ -139,8 +139,6 @@ MODULE_PARM_DESC(verbose, "Enable verbose debugging printk()s");
 #define VERBOSE_PRINTK_ERRSTRING(s) \
 	do { if (verbose) pr_alert("%s" TORTURE_FLAG "!!! " s "\n", torture_type); } while (0)
 
-static char printk_buf[4096];
-
 static int nrealreaders;
 static struct task_struct *writer_task;
 static struct task_struct **fakewriter_tasks;
@@ -376,7 +374,7 @@ struct rcu_torture_ops {
 	void (*call)(struct rcu_head *head, void (*func)(struct rcu_head *rcu));
 	void (*cb_barrier)(void);
 	void (*fqs)(void);
-	int (*stats)(char *page);
+	void (*stats)(char *page);
 	int irq_capable;
 	int can_boost;
 	const char *name;
@@ -578,21 +576,19 @@ static void srcu_torture_barrier(void)
 	srcu_barrier(&srcu_ctl);
 }
 
-static int srcu_torture_stats(char *page)
+static void srcu_torture_stats(char *page)
 {
-	int cnt = 0;
 	int cpu;
 	int idx = srcu_ctl.completed & 0x1;
 
-	cnt += sprintf(&page[cnt], "%s%s per-CPU(idx=%d):",
+	page += sprintf(page, "%s%s per-CPU(idx=%d):",
 		       torture_type, TORTURE_FLAG, idx);
 	for_each_possible_cpu(cpu) {
-		cnt += sprintf(&page[cnt], " %d(%lu,%lu)", cpu,
+		page += sprintf(page, " %d(%lu,%lu)", cpu,
 			       per_cpu_ptr(srcu_ctl.per_cpu_ref, cpu)->c[!idx],
 			       per_cpu_ptr(srcu_ctl.per_cpu_ref, cpu)->c[idx]);
 	}
-	cnt += sprintf(&page[cnt], "\n");
-	return cnt;
+	sprintf(page, "\n");
 }
 
 static void srcu_torture_synchronize_expedited(void)
@@ -1052,10 +1048,9 @@ rcu_torture_reader(void *arg)
 /*
  * Create an RCU-torture statistics message in the specified buffer.
  */
-static int
+static void
 rcu_torture_printk(char *page)
 {
-	int cnt = 0;
 	int cpu;
 	int i;
 	long pipesummary[RCU_TORTURE_PIPE_LEN + 1] = { 0 };
@@ -1071,8 +1066,8 @@ rcu_torture_printk(char *page)
 		if (pipesummary[i] != 0)
 			break;
 	}
-	cnt += sprintf(&page[cnt], "%s%s ", torture_type, TORTURE_FLAG);
-	cnt += sprintf(&page[cnt],
+	page += sprintf(page, "%s%s ", torture_type, TORTURE_FLAG);
+	page += sprintf(page,
 		       "rtc: %p ver: %lu tfle: %d rta: %d rtaf: %d rtf: %d ",
 		       rcu_torture_current,
 		       rcu_torture_current_version,
@@ -1080,53 +1075,52 @@ rcu_torture_printk(char *page)
 		       atomic_read(&n_rcu_torture_alloc),
 		       atomic_read(&n_rcu_torture_alloc_fail),
 		       atomic_read(&n_rcu_torture_free));
-	cnt += sprintf(&page[cnt], "rtmbe: %d rtbke: %ld rtbre: %ld ",
+	page += sprintf(page, "rtmbe: %d rtbke: %ld rtbre: %ld ",
 		       atomic_read(&n_rcu_torture_mberror),
 		       n_rcu_torture_boost_ktrerror,
 		       n_rcu_torture_boost_rterror);
-	cnt += sprintf(&page[cnt], "rtbf: %ld rtb: %ld nt: %ld ",
+	page += sprintf(page, "rtbf: %ld rtb: %ld nt: %ld ",
 		       n_rcu_torture_boost_failure,
 		       n_rcu_torture_boosts,
 		       n_rcu_torture_timers);
-	cnt += sprintf(&page[cnt],
+	page += sprintf(page,
 		       "onoff: %ld/%ld:%ld/%ld %d,%d:%d,%d %lu:%lu (HZ=%d) ",
 		       n_online_successes, n_online_attempts,
 		       n_offline_successes, n_offline_attempts,
 		       min_online, max_online,
 		       min_offline, max_offline,
 		       sum_online, sum_offline, HZ);
-	cnt += sprintf(&page[cnt], "barrier: %ld/%ld:%ld",
+	page += sprintf(page, "barrier: %ld/%ld:%ld",
 		       n_barrier_successes,
 		       n_barrier_attempts,
 		       n_rcu_torture_barrier_error);
-	cnt += sprintf(&page[cnt], "\n%s%s ", torture_type, TORTURE_FLAG);
+	page += sprintf(page, "\n%s%s ", torture_type, TORTURE_FLAG);
 	if (atomic_read(&n_rcu_torture_mberror) != 0 ||
 	    n_rcu_torture_barrier_error != 0 ||
 	    n_rcu_torture_boost_ktrerror != 0 ||
 	    n_rcu_torture_boost_rterror != 0 ||
 	    n_rcu_torture_boost_failure != 0 ||
 	    i > 1) {
-		cnt += sprintf(&page[cnt], "!!! ");
+		page += sprintf(page, "!!! ");
 		atomic_inc(&n_rcu_torture_error);
 		WARN_ON_ONCE(1);
 	}
-	cnt += sprintf(&page[cnt], "Reader Pipe: ");
+	page += sprintf(page, "Reader Pipe: ");
 	for (i = 0; i < RCU_TORTURE_PIPE_LEN + 1; i++)
-		cnt += sprintf(&page[cnt], " %ld", pipesummary[i]);
-	cnt += sprintf(&page[cnt], "\n%s%s ", torture_type, TORTURE_FLAG);
-	cnt += sprintf(&page[cnt], "Reader Batch: ");
+		page += sprintf(page, " %ld", pipesummary[i]);
+	page += sprintf(page, "\n%s%s ", torture_type, TORTURE_FLAG);
+	page += sprintf(page, "Reader Batch: ");
 	for (i = 0; i < RCU_TORTURE_PIPE_LEN + 1; i++)
-		cnt += sprintf(&page[cnt], " %ld", batchsummary[i]);
-	cnt += sprintf(&page[cnt], "\n%s%s ", torture_type, TORTURE_FLAG);
-	cnt += sprintf(&page[cnt], "Free-Block Circulation: ");
+		page += sprintf(page, " %ld", batchsummary[i]);
+	page += sprintf(page, "\n%s%s ", torture_type, TORTURE_FLAG);
+	page += sprintf(page, "Free-Block Circulation: ");
 	for (i = 0; i < RCU_TORTURE_PIPE_LEN + 1; i++) {
-		cnt += sprintf(&page[cnt], " %d",
+		page += sprintf(page, " %d",
 			       atomic_read(&rcu_torture_wcount[i]));
 	}
-	cnt += sprintf(&page[cnt], "\n");
+	page += sprintf(page, "\n");
 	if (cur_ops->stats)
-		cnt += cur_ops->stats(&page[cnt]);
-	return cnt;
+		cur_ops->stats(page);
 }
 
 /*
@@ -1140,10 +1134,17 @@ rcu_torture_printk(char *page)
 static void
 rcu_torture_stats_print(void)
 {
-	int cnt;
+	int size = nr_cpu_ids * 200 + 8192;
+	char *buf;
 
-	cnt = rcu_torture_printk(printk_buf);
-	pr_alert("%s", printk_buf);
+	buf = kmalloc(size, GFP_KERNEL);
+	if (!buf) {
+		pr_err("rcu-torture: Out of memory, need: %d", size);
+		return;
+	}
+	rcu_torture_printk(buf);
+	pr_alert("%s", buf);
+	kfree(buf);
 }
 
 /*
@@ -1578,6 +1579,7 @@ static int rcu_torture_barrier_cbs(void *arg)
 {
 	long myid = (long)arg;
 	bool lastphase = 0;
+	bool newphase;
 	struct rcu_head rcu;
 
 	init_rcu_head_on_stack(&rcu);
@@ -1585,10 +1587,11 @@ static int rcu_torture_barrier_cbs(void *arg)
 	set_user_nice(current, 19);
 	do {
 		wait_event(barrier_cbs_wq[myid],
-			   barrier_phase != lastphase ||
+			   (newphase =
+			    ACCESS_ONCE(barrier_phase)) != lastphase ||
 			   kthread_should_stop() ||
 			   fullstop != FULLSTOP_DONTSTOP);
-		lastphase = barrier_phase;
+		lastphase = newphase;
 		smp_mb(); /* ensure barrier_phase load before ->call(). */
 		if (kthread_should_stop() || fullstop != FULLSTOP_DONTSTOP)
 			break;
@@ -1625,7 +1628,7 @@ static int rcu_torture_barrier(void *arg)
 		if (kthread_should_stop() || fullstop != FULLSTOP_DONTSTOP)
 			break;
 		n_barrier_attempts++;
-		cur_ops->cb_barrier();
+		cur_ops->cb_barrier(); /* Implies smp_mb() for wait_event(). */
 		if (atomic_read(&barrier_cbs_invoked) != n_barrier_cbs) {
 			n_rcu_torture_barrier_error++;
 			WARN_ON_ONCE(1);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index a6205a05b5e4..b3d116cd072d 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -369,6 +369,9 @@ static struct rcu_node *rcu_get_root(struct rcu_state *rsp)
 static void rcu_eqs_enter_common(struct rcu_dynticks *rdtp, long long oldval,
 				bool user)
 {
+	struct rcu_state *rsp;
+	struct rcu_data *rdp;
+
 	trace_rcu_dyntick(TPS("Start"), oldval, rdtp->dynticks_nesting);
 	if (!user && !is_idle_task(current)) {
 		struct task_struct *idle __maybe_unused =
@@ -380,6 +383,10 @@ static void rcu_eqs_enter_common(struct rcu_dynticks *rdtp, long long oldval,
 			  current->pid, current->comm,
 			  idle->pid, idle->comm); /* must be idle task! */
 	}
+	for_each_rcu_flavor(rsp) {
+		rdp = this_cpu_ptr(rsp->rda);
+		do_nocb_deferred_wakeup(rdp);
+	}
 	rcu_prepare_for_idle(smp_processor_id());
 	/* CPUs seeing atomic_inc() must see prior RCU read-side crit sects */
 	smp_mb__before_atomic_inc();  /* See above. */
@@ -411,11 +418,12 @@ static void rcu_eqs_enter(bool user)
 	rdtp = this_cpu_ptr(&rcu_dynticks);
 	oldval = rdtp->dynticks_nesting;
 	WARN_ON_ONCE((oldval & DYNTICK_TASK_NEST_MASK) == 0);
-	if ((oldval & DYNTICK_TASK_NEST_MASK) == DYNTICK_TASK_NEST_VALUE)
+	if ((oldval & DYNTICK_TASK_NEST_MASK) == DYNTICK_TASK_NEST_VALUE) {
 		rdtp->dynticks_nesting = 0;
-	else
+		rcu_eqs_enter_common(rdtp, oldval, user);
+	} else {
 		rdtp->dynticks_nesting -= DYNTICK_TASK_NEST_VALUE;
-	rcu_eqs_enter_common(rdtp, oldval, user);
+	}
 }
 
 /**
@@ -533,11 +541,12 @@ static void rcu_eqs_exit(bool user)
 	rdtp = this_cpu_ptr(&rcu_dynticks);
 	oldval = rdtp->dynticks_nesting;
 	WARN_ON_ONCE(oldval < 0);
-	if (oldval & DYNTICK_TASK_NEST_MASK)
+	if (oldval & DYNTICK_TASK_NEST_MASK) {
 		rdtp->dynticks_nesting += DYNTICK_TASK_NEST_VALUE;
-	else
+	} else {
 		rdtp->dynticks_nesting = DYNTICK_TASK_EXIT_IDLE;
-	rcu_eqs_exit_common(rdtp, oldval, user);
+		rcu_eqs_exit_common(rdtp, oldval, user);
+	}
 }
 
 /**
@@ -716,7 +725,7 @@ bool rcu_lockdep_current_cpu_online(void)
 	bool ret;
 
 	if (in_nmi())
-		return 1;
+		return true;
 	preempt_disable();
 	rdp = this_cpu_ptr(&rcu_sched_data);
 	rnp = rdp->mynode;
@@ -755,6 +764,12 @@ static int dyntick_save_progress_counter(struct rcu_data *rdp,
 }
 
 /*
+ * This function really isn't for public consumption, but RCU is special in
+ * that context switches can allow the state machine to make progress.
+ */
+extern void resched_cpu(int cpu);
+
+/*
  * Return true if the specified CPU has passed through a quiescent
  * state by virtue of being in or having passed through an dynticks
  * idle state since the last call to dyntick_save_progress_counter()
@@ -812,16 +827,34 @@ static int rcu_implicit_dynticks_qs(struct rcu_data *rdp,
 	 */
 	rcu_kick_nohz_cpu(rdp->cpu);
 
+	/*
+	 * Alternatively, the CPU might be running in the kernel
+	 * for an extended period of time without a quiescent state.
+	 * Attempt to force the CPU through the scheduler to gain the
+	 * needed quiescent state, but only if the grace period has gone
+	 * on for an uncommonly long time.  If there are many stuck CPUs,
+	 * we will beat on the first one until it gets unstuck, then move
+	 * to the next.  Only do this for the primary flavor of RCU.
+	 */
+	if (rdp->rsp == rcu_state &&
+	    ULONG_CMP_GE(ACCESS_ONCE(jiffies), rdp->rsp->jiffies_resched)) {
+		rdp->rsp->jiffies_resched += 5;
+		resched_cpu(rdp->cpu);
+	}
+
 	return 0;
 }
 
 static void record_gp_stall_check_time(struct rcu_state *rsp)
 {
 	unsigned long j = ACCESS_ONCE(jiffies);
+	unsigned long j1;
 
 	rsp->gp_start = j;
 	smp_wmb(); /* Record start time before stall time. */
-	rsp->jiffies_stall = j + rcu_jiffies_till_stall_check();
+	j1 = rcu_jiffies_till_stall_check();
+	rsp->jiffies_stall = j + j1;
+	rsp->jiffies_resched = j + j1 / 2;
 }
 
 /*
@@ -1517,6 +1550,7 @@ static void rcu_gp_cleanup(struct rcu_state *rsp)
 		rdp = this_cpu_ptr(rsp->rda);
 		if (rnp == rdp->mynode)
 			__note_gp_changes(rsp, rnp, rdp);
+		/* smp_mb() provided by prior unlock-lock pair. */
 		nocb += rcu_future_gp_cleanup(rsp, rnp);
 		raw_spin_unlock_irq(&rnp->lock);
 		cond_resched();
@@ -1562,6 +1596,7 @@ static int __noreturn rcu_gp_kthread(void *arg)
 			wait_event_interruptible(rsp->gp_wq,
 						 ACCESS_ONCE(rsp->gp_flags) &
 						 RCU_GP_FLAG_INIT);
+			/* Locking provides needed memory barrier. */
 			if (rcu_gp_init(rsp))
 				break;
 			cond_resched();
@@ -1591,6 +1626,7 @@ static int __noreturn rcu_gp_kthread(void *arg)
 					(!ACCESS_ONCE(rnp->qsmask) &&
 					 !rcu_preempt_blocked_readers_cgp(rnp)),
 					j);
+			/* Locking provides needed memory barriers. */
 			/* If grace period done, leave loop. */
 			if (!ACCESS_ONCE(rnp->qsmask) &&
 			    !rcu_preempt_blocked_readers_cgp(rnp))
@@ -1912,13 +1948,13 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
  * Adopt the RCU callbacks from the specified rcu_state structure's
  * orphanage.  The caller must hold the ->orphan_lock.
  */
-static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 {
 	int i;
 	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
 	/* No-CBs CPUs are handled specially. */
-	if (rcu_nocb_adopt_orphan_cbs(rsp, rdp))
+	if (rcu_nocb_adopt_orphan_cbs(rsp, rdp, flags))
 		return;
 
 	/* Do the accounting first. */
@@ -1997,7 +2033,7 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
 	/* Orphan the dead CPU's callbacks, and adopt them if appropriate. */
 	rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp);
-	rcu_adopt_orphan_cbs(rsp);
+	rcu_adopt_orphan_cbs(rsp, flags);
 
 	/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
 	mask = rdp->grpmask;	/* rnp->grplo is constant. */
@@ -2318,6 +2354,9 @@ __rcu_process_callbacks(struct rcu_state *rsp)
 	/* If there are callbacks ready, invoke them. */
 	if (cpu_has_callbacks_ready_to_invoke(rdp))
 		invoke_rcu_callbacks(rsp, rdp);
+
+	/* Do any needed deferred wakeups of rcuo kthreads. */
+	do_nocb_deferred_wakeup(rdp);
 }
 
 /*
@@ -2453,7 +2492,7 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
 
 		if (cpu != -1)
 			rdp = per_cpu_ptr(rsp->rda, cpu);
-		offline = !__call_rcu_nocb(rdp, head, lazy);
+		offline = !__call_rcu_nocb(rdp, head, lazy, flags);
 		WARN_ON_ONCE(offline);
 		/* _call_rcu() is illegal on offline CPU; leak the callback. */
 		local_irq_restore(flags);
@@ -2773,6 +2812,10 @@ static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
 	/* Check for CPU stalls, if enabled. */
 	check_cpu_stall(rsp, rdp);
 
+	/* Is this CPU a NO_HZ_FULL CPU that should ignore RCU? */
+	if (rcu_nohz_full_cpu(rsp))
+		return 0;
+
 	/* Is the RCU core waiting for a quiescent state from this CPU? */
 	if (rcu_scheduler_fully_active &&
 	    rdp->qs_pending && !rdp->passed_quiesce) {
@@ -2806,6 +2849,12 @@ static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
 		return 1;
 	}
 
+	/* Does this CPU need a deferred NOCB wakeup? */
+	if (rcu_nocb_need_deferred_wakeup(rdp)) {
+		rdp->n_rp_nocb_defer_wakeup++;
+		return 1;
+	}
+
 	/* nothing to do */
 	rdp->n_rp_need_nothing++;
 	return 0;
@@ -3230,9 +3279,9 @@ static void __init rcu_init_levelspread(struct rcu_state *rsp)
 {
 	int i;
 
-	for (i = rcu_num_lvls - 1; i > 0; i--)
+	rsp->levelspread[rcu_num_lvls - 1] = rcu_fanout_leaf;
+	for (i = rcu_num_lvls - 2; i >= 0; i--)
 		rsp->levelspread[i] = CONFIG_RCU_FANOUT;
-	rsp->levelspread[0] = rcu_fanout_leaf;
 }
 #else /* #ifdef CONFIG_RCU_FANOUT_EXACT */
 static void __init rcu_init_levelspread(struct rcu_state *rsp)
@@ -3362,6 +3411,8 @@ static void __init rcu_init_geometry(void)
 	if (rcu_fanout_leaf == CONFIG_RCU_FANOUT_LEAF &&
 	    nr_cpu_ids == NR_CPUS)
 		return;
+	pr_info("RCU: Adjusting geometry for rcu_fanout_leaf=%d, nr_cpu_ids=%d\n",
+		rcu_fanout_leaf, nr_cpu_ids);
 
 	/*
 	 * Compute number of nodes that can be handled an rcu_node tree
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 52be957c9fe2..8c19873f1ac9 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -317,6 +317,7 @@ struct rcu_data {
 	unsigned long n_rp_cpu_needs_gp;
 	unsigned long n_rp_gp_completed;
 	unsigned long n_rp_gp_started;
+	unsigned long n_rp_nocb_defer_wakeup;
 	unsigned long n_rp_need_nothing;
 
 	/* 6) _rcu_barrier() and OOM callbacks. */
@@ -335,6 +336,7 @@ struct rcu_data {
 	int nocb_p_count_lazy;		/*  (approximate). */
 	wait_queue_head_t nocb_wq;	/* For nocb kthreads to sleep on. */
 	struct task_struct *nocb_kthread;
+	bool nocb_defer_wakeup;		/* Defer wakeup of nocb_kthread. */
 #endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 
 	/* 8) RCU CPU stall data. */
@@ -453,6 +455,8 @@ struct rcu_state {
 						/*  but in jiffies. */
 	unsigned long jiffies_stall;		/* Time at which to check */
 						/*  for CPU stalls. */
+	unsigned long jiffies_resched;		/* Time at which to resched */
+						/*  a reluctant CPU. */
 	unsigned long gp_max;			/* Maximum GP duration in */
 						/*  jiffies. */
 	const char *name;			/* Name of structure. */
@@ -548,9 +552,12 @@ static void rcu_nocb_gp_set(struct rcu_node *rnp, int nrq);
 static void rcu_nocb_gp_cleanup(struct rcu_state *rsp, struct rcu_node *rnp);
 static void rcu_init_one_nocb(struct rcu_node *rnp);
 static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
-			    bool lazy);
+			    bool lazy, unsigned long flags);
 static bool rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
-				      struct rcu_data *rdp);
+				      struct rcu_data *rdp,
+				      unsigned long flags);
+static bool rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp);
+static void do_nocb_deferred_wakeup(struct rcu_data *rdp);
 static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
 static void rcu_spawn_nocb_kthreads(struct rcu_state *rsp);
 static void rcu_kick_nohz_cpu(int cpu);
@@ -564,6 +571,7 @@ static void rcu_sysidle_report_gp(struct rcu_state *rsp, int isidle,
 				  unsigned long maxj);
 static void rcu_bind_gp_kthread(void);
 static void rcu_sysidle_init_percpu_data(struct rcu_dynticks *rdtp);
+static bool rcu_nohz_full_cpu(struct rcu_state *rsp);
 
 #endif /* #ifndef RCU_TREE_NONCORE */
 
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 506a7a97a2e2..6e2ef4b2b920 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -363,10 +363,14 @@ void rcu_read_unlock_special(struct task_struct *t)
 	special = t->rcu_read_unlock_special;
 	if (special & RCU_READ_UNLOCK_NEED_QS) {
 		rcu_preempt_qs(smp_processor_id());
+		if (!t->rcu_read_unlock_special) {
+			local_irq_restore(flags);
+			return;
+		}
 	}
 
-	/* Hardware IRQ handlers cannot block. */
-	if (in_irq() || in_serving_softirq()) {
+	/* Hardware IRQ handlers cannot block, complain if they get here. */
+	if (WARN_ON_ONCE(in_irq() || in_serving_softirq())) {
 		local_irq_restore(flags);
 		return;
 	}
@@ -785,8 +789,10 @@ static void rcu_report_exp_rnp(struct rcu_state *rsp, struct rcu_node *rnp,
 		}
 		if (rnp->parent == NULL) {
 			raw_spin_unlock_irqrestore(&rnp->lock, flags);
-			if (wake)
+			if (wake) {
+				smp_mb(); /* EGP done before wake_up(). */
 				wake_up(&sync_rcu_preempt_exp_wq);
+			}
 			break;
 		}
 		mask = rnp->grpmask;
@@ -1864,6 +1870,7 @@ static int rcu_oom_notify(struct notifier_block *self,
 
 	/* Wait for callbacks from earlier instance to complete. */
 	wait_event(oom_callback_wq, atomic_read(&oom_callback_count) == 0);
+	smp_mb(); /* Ensure callback reuse happens after callback invocation. */
 
 	/*
 	 * Prevent premature wakeup: ensure that all increments happen
@@ -2113,7 +2120,8 @@ bool rcu_is_nocb_cpu(int cpu)
 static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
 				    struct rcu_head *rhp,
 				    struct rcu_head **rhtp,
-				    int rhcount, int rhcount_lazy)
+				    int rhcount, int rhcount_lazy,
+				    unsigned long flags)
 {
 	int len;
 	struct rcu_head **old_rhpp;
@@ -2134,9 +2142,16 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
 	}
 	len = atomic_long_read(&rdp->nocb_q_count);
 	if (old_rhpp == &rdp->nocb_head) {
-		wake_up(&rdp->nocb_wq); /* ... only if queue was empty ... */
+		if (!irqs_disabled_flags(flags)) {
+			wake_up(&rdp->nocb_wq); /* ... if queue was empty ... */
+			trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
+					    TPS("WakeEmpty"));
+		} else {
+			rdp->nocb_defer_wakeup = true;
+			trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
+					    TPS("WakeEmptyIsDeferred"));
+		}
 		rdp->qlen_last_fqs_check = 0;
-		trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WakeEmpty"));
 	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
 		wake_up_process(t); /* ... or if many callbacks queued. */
 		rdp->qlen_last_fqs_check = LONG_MAX / 2;
@@ -2157,12 +2172,12 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
  * "rcuo" kthread can find it.
  */
 static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
-			    bool lazy)
+			    bool lazy, unsigned long flags)
 {
 
 	if (!rcu_is_nocb_cpu(rdp->cpu))
 		return 0;
-	__call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy);
+	__call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags);
 	if (__is_kfree_rcu_offset((unsigned long)rhp->func))
 		trace_rcu_kfree_callback(rdp->rsp->name, rhp,
 					 (unsigned long)rhp->func,
@@ -2180,7 +2195,8 @@ static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
  * not a no-CBs CPU.
  */
 static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
-						     struct rcu_data *rdp)
+						     struct rcu_data *rdp,
+						     unsigned long flags)
 {
 	long ql = rsp->qlen;
 	long qll = rsp->qlen_lazy;
@@ -2194,14 +2210,14 @@ static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
 	/* First, enqueue the donelist, if any.  This preserves CB ordering. */
 	if (rsp->orphan_donelist != NULL) {
 		__call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
-					rsp->orphan_donetail, ql, qll);
+					rsp->orphan_donetail, ql, qll, flags);
 		ql = qll = 0;
 		rsp->orphan_donelist = NULL;
 		rsp->orphan_donetail = &rsp->orphan_donelist;
 	}
 	if (rsp->orphan_nxtlist != NULL) {
 		__call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
-					rsp->orphan_nxttail, ql, qll);
+					rsp->orphan_nxttail, ql, qll, flags);
 		ql = qll = 0;
 		rsp->orphan_nxtlist = NULL;
 		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
@@ -2263,6 +2279,7 @@ static int rcu_nocb_kthread(void *arg)
 			trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
 					    TPS("Sleep"));
 			wait_event_interruptible(rdp->nocb_wq, rdp->nocb_head);
+			/* Memory barrier provide by xchg() below. */
 		} else if (firsttime) {
 			firsttime = 0;
 			trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
@@ -2323,6 +2340,22 @@ static int rcu_nocb_kthread(void *arg)
 	return 0;
 }
 
+/* Is a deferred wakeup of rcu_nocb_kthread() required? */
+static bool rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
+{
+	return ACCESS_ONCE(rdp->nocb_defer_wakeup);
+}
+
+/* Do a deferred wakeup of rcu_nocb_kthread(). */
+static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+	if (!rcu_nocb_need_deferred_wakeup(rdp))
+		return;
+	ACCESS_ONCE(rdp->nocb_defer_wakeup) = false;
+	wake_up(&rdp->nocb_wq);
+	trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWakeEmpty"));
+}
+
 /* Initialize per-rcu_data variables for no-CBs CPUs. */
 static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
 {
@@ -2378,13 +2411,14 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
 }
 
 static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
-			    bool lazy)
+			    bool lazy, unsigned long flags)
 {
 	return 0;
 }
 
 static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
-						     struct rcu_data *rdp)
+						     struct rcu_data *rdp,
+						     unsigned long flags)
 {
 	return 0;
 }
@@ -2393,6 +2427,15 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
 {
 }
 
+static bool rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
+{
+	return false;
+}
+
+static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+}
+
 static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
 {
 }
@@ -2842,3 +2885,23 @@ static void rcu_sysidle_init_percpu_data(struct rcu_dynticks *rdtp)
 }
 
 #endif /* #else #ifdef CONFIG_NO_HZ_FULL_SYSIDLE */
+
+/*
+ * Is this CPU a NO_HZ_FULL CPU that should ignore RCU so that the
+ * grace-period kthread will do force_quiescent_state() processing?
+ * The idea is to avoid waking up RCU core processing on such a
+ * CPU unless the grace period has extended for too long.
+ *
+ * This code relies on the fact that all NO_HZ_FULL CPUs are also
+ * CONFIG_RCU_NOCB_CPUs.
+ */
+static bool rcu_nohz_full_cpu(struct rcu_state *rsp)
+{
+#ifdef CONFIG_NO_HZ_FULL
+	if (tick_nohz_full_cpu(smp_processor_id()) &&
+	    (!rcu_gp_in_progress(rsp) ||
+	     ULONG_CMP_LT(jiffies, ACCESS_ONCE(rsp->gp_start) + HZ)))
+		return 1;
+#endif /* #ifdef CONFIG_NO_HZ_FULL */
+	return 0;
+}
diff --git a/kernel/rcu/tree_trace.c b/kernel/rcu/tree_trace.c
index 3596797b7e46..4def475336d4 100644
--- a/kernel/rcu/tree_trace.c
+++ b/kernel/rcu/tree_trace.c
@@ -364,9 +364,10 @@ static void print_one_rcu_pending(struct seq_file *m, struct rcu_data *rdp)
 		   rdp->n_rp_report_qs,
 		   rdp->n_rp_cb_ready,
 		   rdp->n_rp_cpu_needs_gp);
-	seq_printf(m, "gpc=%ld gps=%ld nn=%ld\n",
+	seq_printf(m, "gpc=%ld gps=%ld nn=%ld ndw%ld\n",
 		   rdp->n_rp_gp_completed,
 		   rdp->n_rp_gp_started,
+		   rdp->n_rp_nocb_defer_wakeup,
 		   rdp->n_rp_need_nothing);
 }
 
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index 6cb3dff89e2b..802365ccd591 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -128,6 +128,11 @@ struct lockdep_map rcu_sched_lock_map =
 	STATIC_LOCKDEP_MAP_INIT("rcu_read_lock_sched", &rcu_sched_lock_key);
 EXPORT_SYMBOL_GPL(rcu_sched_lock_map);
 
+static struct lock_class_key rcu_callback_key;
+struct lockdep_map rcu_callback_map =
+	STATIC_LOCKDEP_MAP_INIT("rcu_callback", &rcu_callback_key);
+EXPORT_SYMBOL_GPL(rcu_callback_map);
+
 int notrace debug_lockdep_rcu_enabled(void)
 {
 	return rcu_scheduler_active && debug_locks &&