summary refs log tree commit diff
path: root/kernel
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2021-07-04 12:58:33 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2021-07-04 12:58:33 -0700
commit28e92f990337b8b4c5fdec47667f8b96089c503e (patch)
tree0dc55280883e7262d831ad24deb3a4fd2a56031a /kernel
parentda803f82faa5ceeff34aa56c08ceba5384e44e47 (diff)
parent641faf1b9064c270a476a424e60063bb05df3ee9 (diff)
downloadlinux-28e92f990337b8b4c5fdec47667f8b96089c503e.tar.gz
Merge branch 'core-rcu-2021.07.04' of git://git.kernel.org/pub/scm/linux/kernel/git/paulmck/linux-rcu
Pull RCU updates from Paul McKenney:

 - Bitmap parsing support for "all" as an alias for all bits

 - Documentation updates

 - Miscellaneous fixes, including some that overlap into mm and lockdep

 - kvfree_rcu() updates

 - mem_dump_obj() updates, with acks from one of the slab-allocator
   maintainers

 - RCU NOCB CPU updates, including limited deoffloading

 - SRCU updates

 - Tasks-RCU updates

 - Torture-test updates

* 'core-rcu-2021.07.04' of git://git.kernel.org/pub/scm/linux/kernel/git/paulmck/linux-rcu: (78 commits)
  tasks-rcu: Make show_rcu_tasks_gp_kthreads() be static inline
  rcu-tasks: Make ksoftirqd provide RCU Tasks quiescent states
  rcu: Add missing __releases() annotation
  rcu: Remove obsolete rcu_read_unlock() deadlock commentary
  rcu: Improve comments describing RCU read-side critical sections
  rcu: Create an unrcu_pointer() to remove __rcu from a pointer
  srcu: Early test SRCU polling start
  rcu: Fix various typos in comments
  rcu/nocb: Unify timers
  rcu/nocb: Prepare for fine-grained deferred wakeup
  rcu/nocb: Only cancel nocb timer if not polling
  rcu/nocb: Delete bypass_timer upon nocb_gp wakeup
  rcu/nocb: Cancel nocb_timer upon nocb_gp wakeup
  rcu/nocb: Allow de-offloading rdp leader
  rcu/nocb: Directly call __wake_nocb_gp() from bypass timer
  rcu: Don't penalize priority boosting when there is nothing to boost
  rcu: Point to documentation of ordering guarantees
  rcu: Make rcu_gp_cleanup() be noinline for tracing
  rcu: Restrict RCU_STRICT_GRACE_PERIOD to at most four CPUs
  rcu: Make show_rcu_gp_kthreads() dump rcu_node structures blocking GP
  ...
Diffstat (limited to 'kernel')
-rw-r--r--kernel/locking/lockdep.c6
-rw-r--r--kernel/rcu/Kconfig.debug2
-rw-r--r--kernel/rcu/rcu.h14
-rw-r--r--kernel/rcu/rcutorture.c315
-rw-r--r--kernel/rcu/refscale.c109
-rw-r--r--kernel/rcu/srcutree.c28
-rw-r--r--kernel/rcu/sync.c4
-rw-r--r--kernel/rcu/tasks.h58
-rw-r--r--kernel/rcu/tiny.c1
-rw-r--r--kernel/rcu/tree.c313
-rw-r--r--kernel/rcu/tree.h14
-rw-r--r--kernel/rcu/tree_plugin.h239
-rw-r--r--kernel/rcu/tree_stall.h84
-rw-r--r--kernel/rcu/update.c8
-rw-r--r--kernel/time/timer.c14
15 files changed, 734 insertions, 475 deletions
diff --git a/kernel/locking/lockdep.c b/kernel/locking/lockdep.c
index e97d08001437..bf1c00c881e4 100644
--- a/kernel/locking/lockdep.c
+++ b/kernel/locking/lockdep.c
@@ -6506,6 +6506,7 @@ asmlinkage __visible void lockdep_sys_exit(void)
 void lockdep_rcu_suspicious(const char *file, const int line, const char *s)
 {
 	struct task_struct *curr = current;
+	int dl = READ_ONCE(debug_locks);
 
 	/* Note: the following can be executed concurrently, so be careful. */
 	pr_warn("\n");
@@ -6515,11 +6516,12 @@ void lockdep_rcu_suspicious(const char *file, const int line, const char *s)
 	pr_warn("-----------------------------\n");
 	pr_warn("%s:%d %s!\n", file, line, s);
 	pr_warn("\nother info that might help us debug this:\n\n");
-	pr_warn("\n%srcu_scheduler_active = %d, debug_locks = %d\n",
+	pr_warn("\n%srcu_scheduler_active = %d, debug_locks = %d\n%s",
 	       !rcu_lockdep_current_cpu_online()
 			? "RCU used illegally from offline CPU!\n"
 			: "",
-	       rcu_scheduler_active, debug_locks);
+	       rcu_scheduler_active, dl,
+	       dl ? "" : "Possible false positive due to lockdep disabling via debug_locks = 0\n");
 
 	/*
 	 * If a CPU is in the RCU-free window in idle (ie: in the section
diff --git a/kernel/rcu/Kconfig.debug b/kernel/rcu/Kconfig.debug
index 1942c1f1bb65..4fd64999300f 100644
--- a/kernel/rcu/Kconfig.debug
+++ b/kernel/rcu/Kconfig.debug
@@ -116,7 +116,7 @@ config RCU_EQS_DEBUG
 
 config RCU_STRICT_GRACE_PERIOD
 	bool "Provide debug RCU implementation with short grace periods"
-	depends on DEBUG_KERNEL && RCU_EXPERT
+	depends on DEBUG_KERNEL && RCU_EXPERT && NR_CPUS <= 4
 	default n
 	select PREEMPT_COUNT if PREEMPT=n
 	help
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index bf0827d4b659..24b5f2c2de87 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -308,6 +308,8 @@ static inline void rcu_init_levelspread(int *levelspread, const int *levelcnt)
 	}
 }
 
+extern void rcu_init_geometry(void);
+
 /* Returns a pointer to the first leaf rcu_node structure. */
 #define rcu_first_leaf_node() (rcu_state.level[rcu_num_lvls - 1])
 
@@ -422,12 +424,6 @@ do {									\
 
 #endif /* #if defined(CONFIG_SRCU) || !defined(CONFIG_TINY_RCU) */
 
-#ifdef CONFIG_SRCU
-void srcu_init(void);
-#else /* #ifdef CONFIG_SRCU */
-static inline void srcu_init(void) { }
-#endif /* #else #ifdef CONFIG_SRCU */
-
 #ifdef CONFIG_TINY_RCU
 /* Tiny RCU doesn't expedite, as its purpose in life is instead to be tiny. */
 static inline bool rcu_gp_is_normal(void) { return true; }
@@ -441,7 +437,11 @@ bool rcu_gp_is_expedited(void);  /* Internal RCU use. */
 void rcu_expedite_gp(void);
 void rcu_unexpedite_gp(void);
 void rcupdate_announce_bootup_oddness(void);
+#ifdef CONFIG_TASKS_RCU_GENERIC
 void show_rcu_tasks_gp_kthreads(void);
+#else /* #ifdef CONFIG_TASKS_RCU_GENERIC */
+static inline void show_rcu_tasks_gp_kthreads(void) {}
+#endif /* #else #ifdef CONFIG_TASKS_RCU_GENERIC */
 void rcu_request_urgent_qs_task(struct task_struct *t);
 #endif /* #else #ifdef CONFIG_TINY_RCU */
 
@@ -519,6 +519,7 @@ static inline unsigned long rcu_exp_batches_completed(void) { return 0; }
 static inline unsigned long
 srcu_batches_completed(struct srcu_struct *sp) { return 0; }
 static inline void rcu_force_quiescent_state(void) { }
+static inline bool rcu_check_boost_fail(unsigned long gp_state, int *cpup) { return true; }
 static inline void show_rcu_gp_kthreads(void) { }
 static inline int rcu_get_gp_kthreads_prio(void) { return 0; }
 static inline void rcu_fwd_progress_check(unsigned long j) { }
@@ -527,6 +528,7 @@ bool rcu_dynticks_zero_in_eqs(int cpu, int *vp);
 unsigned long rcu_get_gp_seq(void);
 unsigned long rcu_exp_batches_completed(void);
 unsigned long srcu_batches_completed(struct srcu_struct *sp);
+bool rcu_check_boost_fail(unsigned long gp_state, int *cpup);
 void show_rcu_gp_kthreads(void);
 int rcu_get_gp_kthreads_prio(void);
 void rcu_fwd_progress_check(unsigned long j);
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 194b9c145c40..40ef5417d954 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -245,12 +245,6 @@ static const char *rcu_torture_writer_state_getname(void)
 	return rcu_torture_writer_state_names[i];
 }
 
-#if defined(CONFIG_RCU_BOOST) && defined(CONFIG_PREEMPT_RT)
-# define rcu_can_boost() 1
-#else
-# define rcu_can_boost() 0
-#endif
-
 #ifdef CONFIG_RCU_TRACE
 static u64 notrace rcu_trace_clock_local(void)
 {
@@ -331,6 +325,7 @@ struct rcu_torture_ops {
 	void (*read_delay)(struct torture_random_state *rrsp,
 			   struct rt_read_seg *rtrsp);
 	void (*readunlock)(int idx);
+	int (*readlock_held)(void);
 	unsigned long (*get_gp_seq)(void);
 	unsigned long (*gp_diff)(unsigned long new, unsigned long old);
 	void (*deferred_free)(struct rcu_torture *p);
@@ -345,6 +340,7 @@ struct rcu_torture_ops {
 	void (*fqs)(void);
 	void (*stats)(void);
 	void (*gp_kthread_dbg)(void);
+	bool (*check_boost_failed)(unsigned long gp_state, int *cpup);
 	int (*stall_dur)(void);
 	int irq_capable;
 	int can_boost;
@@ -359,6 +355,11 @@ static struct rcu_torture_ops *cur_ops;
  * Definitions for rcu torture testing.
  */
 
+static int torture_readlock_not_held(void)
+{
+	return rcu_read_lock_bh_held() || rcu_read_lock_sched_held();
+}
+
 static int rcu_torture_read_lock(void) __acquires(RCU)
 {
 	rcu_read_lock();
@@ -483,30 +484,32 @@ static void rcu_sync_torture_init(void)
 }
 
 static struct rcu_torture_ops rcu_ops = {
-	.ttype		= RCU_FLAVOR,
-	.init		= rcu_sync_torture_init,
-	.readlock	= rcu_torture_read_lock,
-	.read_delay	= rcu_read_delay,
-	.readunlock	= rcu_torture_read_unlock,
-	.get_gp_seq	= rcu_get_gp_seq,
-	.gp_diff	= rcu_seq_diff,
-	.deferred_free	= rcu_torture_deferred_free,
-	.sync		= synchronize_rcu,
-	.exp_sync	= synchronize_rcu_expedited,
-	.get_gp_state	= get_state_synchronize_rcu,
-	.start_gp_poll	= start_poll_synchronize_rcu,
-	.poll_gp_state	= poll_state_synchronize_rcu,
-	.cond_sync	= cond_synchronize_rcu,
-	.call		= call_rcu,
-	.cb_barrier	= rcu_barrier,
-	.fqs		= rcu_force_quiescent_state,
-	.stats		= NULL,
-	.gp_kthread_dbg	= show_rcu_gp_kthreads,
-	.stall_dur	= rcu_jiffies_till_stall_check,
-	.irq_capable	= 1,
-	.can_boost	= rcu_can_boost(),
-	.extendables	= RCUTORTURE_MAX_EXTEND,
-	.name		= "rcu"
+	.ttype			= RCU_FLAVOR,
+	.init			= rcu_sync_torture_init,
+	.readlock		= rcu_torture_read_lock,
+	.read_delay		= rcu_read_delay,
+	.readunlock		= rcu_torture_read_unlock,
+	.readlock_held		= torture_readlock_not_held,
+	.get_gp_seq		= rcu_get_gp_seq,
+	.gp_diff		= rcu_seq_diff,
+	.deferred_free		= rcu_torture_deferred_free,
+	.sync			= synchronize_rcu,
+	.exp_sync		= synchronize_rcu_expedited,
+	.get_gp_state		= get_state_synchronize_rcu,
+	.start_gp_poll		= start_poll_synchronize_rcu,
+	.poll_gp_state		= poll_state_synchronize_rcu,
+	.cond_sync		= cond_synchronize_rcu,
+	.call			= call_rcu,
+	.cb_barrier		= rcu_barrier,
+	.fqs			= rcu_force_quiescent_state,
+	.stats			= NULL,
+	.gp_kthread_dbg		= show_rcu_gp_kthreads,
+	.check_boost_failed	= rcu_check_boost_fail,
+	.stall_dur		= rcu_jiffies_till_stall_check,
+	.irq_capable		= 1,
+	.can_boost		= IS_ENABLED(CONFIG_RCU_BOOST),
+	.extendables		= RCUTORTURE_MAX_EXTEND,
+	.name			= "rcu"
 };
 
 /*
@@ -540,6 +543,7 @@ static struct rcu_torture_ops rcu_busted_ops = {
 	.readlock	= rcu_torture_read_lock,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= rcu_torture_read_unlock,
+	.readlock_held	= torture_readlock_not_held,
 	.get_gp_seq	= rcu_no_completed,
 	.deferred_free	= rcu_busted_torture_deferred_free,
 	.sync		= synchronize_rcu_busted,
@@ -589,6 +593,11 @@ static void srcu_torture_read_unlock(int idx) __releases(srcu_ctlp)
 	srcu_read_unlock(srcu_ctlp, idx);
 }
 
+static int torture_srcu_read_lock_held(void)
+{
+	return srcu_read_lock_held(srcu_ctlp);
+}
+
 static unsigned long srcu_torture_completed(void)
 {
 	return srcu_batches_completed(srcu_ctlp);
@@ -646,6 +655,7 @@ static struct rcu_torture_ops srcu_ops = {
 	.readlock	= srcu_torture_read_lock,
 	.read_delay	= srcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock,
+	.readlock_held	= torture_srcu_read_lock_held,
 	.get_gp_seq	= srcu_torture_completed,
 	.deferred_free	= srcu_torture_deferred_free,
 	.sync		= srcu_torture_synchronize,
@@ -681,6 +691,7 @@ static struct rcu_torture_ops srcud_ops = {
 	.readlock	= srcu_torture_read_lock,
 	.read_delay	= srcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock,
+	.readlock_held	= torture_srcu_read_lock_held,
 	.get_gp_seq	= srcu_torture_completed,
 	.deferred_free	= srcu_torture_deferred_free,
 	.sync		= srcu_torture_synchronize,
@@ -700,6 +711,7 @@ static struct rcu_torture_ops busted_srcud_ops = {
 	.readlock	= srcu_torture_read_lock,
 	.read_delay	= rcu_read_delay,
 	.readunlock	= srcu_torture_read_unlock,
+	.readlock_held	= torture_srcu_read_lock_held,
 	.get_gp_seq	= srcu_torture_completed,
 	.deferred_free	= srcu_torture_deferred_free,
 	.sync		= srcu_torture_synchronize,
@@ -787,6 +799,7 @@ static struct rcu_torture_ops trivial_ops = {
 	.readlock	= rcu_torture_read_lock_trivial,
 	.read_delay	= rcu_read_delay,  /* just reuse rcu's version. */
 	.readunlock	= rcu_torture_read_unlock_trivial,
+	.readlock_held	= torture_readlock_not_held,
 	.get_gp_seq	= rcu_no_completed,
 	.sync		= synchronize_rcu_trivial,
 	.exp_sync	= synchronize_rcu_trivial,
@@ -850,6 +863,7 @@ static struct rcu_torture_ops tasks_tracing_ops = {
 	.readlock	= tasks_tracing_torture_read_lock,
 	.read_delay	= srcu_read_delay,  /* just reuse srcu's version. */
 	.readunlock	= tasks_tracing_torture_read_unlock,
+	.readlock_held	= rcu_read_lock_trace_held,
 	.get_gp_seq	= rcu_no_completed,
 	.deferred_free	= rcu_tasks_tracing_torture_deferred_free,
 	.sync		= synchronize_rcu_tasks_trace,
@@ -871,32 +885,13 @@ static unsigned long rcutorture_seq_diff(unsigned long new, unsigned long old)
 	return cur_ops->gp_diff(new, old);
 }
 
-static bool __maybe_unused torturing_tasks(void)
-{
-	return cur_ops == &tasks_ops || cur_ops == &tasks_rude_ops;
-}
-
 /*
  * RCU torture priority-boost testing.  Runs one real-time thread per
- * CPU for moderate bursts, repeatedly registering RCU callbacks and
- * spinning waiting for them to be invoked.  If a given callback takes
- * too long to be invoked, we assume that priority inversion has occurred.
+ * CPU for moderate bursts, repeatedly starting grace periods and waiting
+ * for them to complete.  If a given grace period takes too long, we assume
+ * that priority inversion has occurred.
  */
 
-struct rcu_boost_inflight {
-	struct rcu_head rcu;
-	int inflight;
-};
-
-static void rcu_torture_boost_cb(struct rcu_head *head)
-{
-	struct rcu_boost_inflight *rbip =
-		container_of(head, struct rcu_boost_inflight, rcu);
-
-	/* Ensure RCU-core accesses precede clearing ->inflight */
-	smp_store_release(&rbip->inflight, 0);
-}
-
 static int old_rt_runtime = -1;
 
 static void rcu_torture_disable_rt_throttle(void)
@@ -923,49 +918,68 @@ static void rcu_torture_enable_rt_throttle(void)
 	old_rt_runtime = -1;
 }
 
-static bool rcu_torture_boost_failed(unsigned long start, unsigned long end)
+static bool rcu_torture_boost_failed(unsigned long gp_state, unsigned long *start)
 {
+	int cpu;
 	static int dbg_done;
-
-	if (end - start > test_boost_duration * HZ - HZ / 2) {
+	unsigned long end = jiffies;
+	bool gp_done;
+	unsigned long j;
+	static unsigned long last_persist;
+	unsigned long lp;
+	unsigned long mininterval = test_boost_duration * HZ - HZ / 2;
+
+	if (end - *start > mininterval) {
+		// Recheck after checking time to avoid false positives.
+		smp_mb(); // Time check before grace-period check.
+		if (cur_ops->poll_gp_state(gp_state))
+			return false; // passed, though perhaps just barely
+		if (cur_ops->check_boost_failed && !cur_ops->check_boost_failed(gp_state, &cpu)) {
+			// At most one persisted message per boost test.
+			j = jiffies;
+			lp = READ_ONCE(last_persist);
+			if (time_after(j, lp + mininterval) && cmpxchg(&last_persist, lp, j) == lp)
+				pr_info("Boost inversion persisted: No QS from CPU %d\n", cpu);
+			return false; // passed on a technicality
+		}
 		VERBOSE_TOROUT_STRING("rcu_torture_boost boosting failed");
 		n_rcu_torture_boost_failure++;
-		if (!xchg(&dbg_done, 1) && cur_ops->gp_kthread_dbg)
+		if (!xchg(&dbg_done, 1) && cur_ops->gp_kthread_dbg) {
+			pr_info("Boost inversion thread ->rt_priority %u gp_state %lu jiffies %lu\n",
+				current->rt_priority, gp_state, end - *start);
 			cur_ops->gp_kthread_dbg();
+			// Recheck after print to flag grace period ending during splat.
+			gp_done = cur_ops->poll_gp_state(gp_state);
+			pr_info("Boost inversion: GP %lu %s.\n", gp_state,
+				gp_done ? "ended already" : "still pending");
 
-		return true; /* failed */
+		}
+
+		return true; // failed
+	} else if (cur_ops->check_boost_failed && !cur_ops->check_boost_failed(gp_state, NULL)) {
+		*start = jiffies;
 	}
 
-	return false; /* passed */
+	return false; // passed
 }
 
 static int rcu_torture_boost(void *arg)
 {
-	unsigned long call_rcu_time;
 	unsigned long endtime;
+	unsigned long gp_state;
+	unsigned long gp_state_time;
 	unsigned long oldstarttime;
-	struct rcu_boost_inflight rbi = { .inflight = 0 };
 
 	VERBOSE_TOROUT_STRING("rcu_torture_boost started");
 
 	/* Set real-time priority. */
 	sched_set_fifo_low(current);
 
-	init_rcu_head_on_stack(&rbi.rcu);
 	/* Each pass through the following loop does one boost-test cycle. */
 	do {
 		bool failed = false; // Test failed already in this test interval
-		bool firsttime = true;
+		bool gp_initiated = false;
 
-		/* Increment n_rcu_torture_boosts once per boost-test */
-		while (!kthread_should_stop()) {
-			if (mutex_trylock(&boost_mutex)) {
-				n_rcu_torture_boosts++;
-				mutex_unlock(&boost_mutex);
-				break;
-			}
-			schedule_timeout_uninterruptible(1);
-		}
 		if (kthread_should_stop())
 			goto checkwait;
 
@@ -979,33 +993,33 @@ static int rcu_torture_boost(void *arg)
 				goto checkwait;
 		}
 
-		/* Do one boost-test interval. */
+		// Do one boost-test interval.
 		endtime = oldstarttime + test_boost_duration * HZ;
 		while (time_before(jiffies, endtime)) {
-			/* If we don't have a callback in flight, post one. */
-			if (!smp_load_acquire(&rbi.inflight)) {
-				/* RCU core before ->inflight = 1. */
-				smp_store_release(&rbi.inflight, 1);
-				cur_ops->call(&rbi.rcu, rcu_torture_boost_cb);
-				/* Check if the boost test failed */
-				if (!firsttime && !failed)
-					failed = rcu_torture_boost_failed(call_rcu_time, jiffies);
-				call_rcu_time = jiffies;
-				firsttime = false;
+			// Has current GP gone too long?
+			if (gp_initiated && !failed && !cur_ops->poll_gp_state(gp_state))
+				failed = rcu_torture_boost_failed(gp_state, &gp_state_time);
+			// If we don't have a grace period in flight, start one.
+			if (!gp_initiated || cur_ops->poll_gp_state(gp_state)) {
+				gp_state = cur_ops->start_gp_poll();
+				gp_initiated = true;
+				gp_state_time = jiffies;
 			}
-			if (stutter_wait("rcu_torture_boost"))
+			if (stutter_wait("rcu_torture_boost")) {
 				sched_set_fifo_low(current);
+				// If the grace period already ended,
+				// we don't know when that happened, so
+				// start over.
+				if (cur_ops->poll_gp_state(gp_state))
+					gp_initiated = false;
+			}
 			if (torture_must_stop())
 				goto checkwait;
 		}
 
-		/*
-		 * If boost never happened, then inflight will always be 1, in
-		 * this case the boost check would never happen in the above
-		 * loop so do another one here.
-		 */
-		if (!firsttime && !failed && smp_load_acquire(&rbi.inflight))
-			rcu_torture_boost_failed(call_rcu_time, jiffies);
+		// In case the grace period extended beyond the end of the loop.
+		if (gp_initiated && !failed && !cur_ops->poll_gp_state(gp_state))
+			rcu_torture_boost_failed(gp_state, &gp_state_time);
 
 		/*
 		 * Set the start time of the next test interval.
@@ -1014,11 +1028,12 @@ static int rcu_torture_boost(void *arg)
 		 * interval.  Besides, we are running at RT priority,
 		 * so delays should be relatively rare.
 		 */
-		while (oldstarttime == boost_starttime &&
-		       !kthread_should_stop()) {
+		while (oldstarttime == boost_starttime && !kthread_should_stop()) {
 			if (mutex_trylock(&boost_mutex)) {
-				boost_starttime = jiffies +
-						  test_boost_interval * HZ;
+				if (oldstarttime == boost_starttime) {
+					boost_starttime = jiffies + test_boost_interval * HZ;
+					n_rcu_torture_boosts++;
+				}
 				mutex_unlock(&boost_mutex);
 				break;
 			}
@@ -1030,15 +1045,11 @@ checkwait:	if (stutter_wait("rcu_torture_boost"))
 			sched_set_fifo_low(current);
 	} while (!torture_must_stop());
 
-	while (smp_load_acquire(&rbi.inflight))
-		schedule_timeout_uninterruptible(1); // rcu_barrier() deadlocks.
-
 	/* Clean up and exit. */
-	while (!kthread_should_stop() || smp_load_acquire(&rbi.inflight)) {
+	while (!kthread_should_stop()) {
 		torture_shutdown_absorb("rcu_torture_boost");
 		schedule_timeout_uninterruptible(1);
 	}
-	destroy_rcu_head_on_stack(&rbi.rcu);
 	torture_kthread_stopping("rcu_torture_boost");
 	return 0;
 }
@@ -1553,11 +1564,7 @@ static bool rcu_torture_one_read(struct torture_random_state *trsp, long myid)
 	started = cur_ops->get_gp_seq();
 	ts = rcu_trace_clock_local();
 	p = rcu_dereference_check(rcu_torture_current,
-				  rcu_read_lock_bh_held() ||
-				  rcu_read_lock_sched_held() ||
-				  srcu_read_lock_held(srcu_ctlp) ||
-				  rcu_read_lock_trace_held() ||
-				  torturing_tasks());
+				  !cur_ops->readlock_held || cur_ops->readlock_held());
 	if (p == NULL) {
 		/* Wait for rcu_torture_writer to get underway */
 		rcutorture_one_extend(&readstate, 0, trsp, rtrsp);
@@ -1861,48 +1868,49 @@ rcu_torture_stats(void *arg)
 		torture_shutdown_absorb("rcu_torture_stats");
 	} while (!torture_must_stop());
 	torture_kthread_stopping("rcu_torture_stats");
-
-	{
-		struct rcu_head *rhp;
-		struct kmem_cache *kcp;
-		static int z;
-
-		kcp = kmem_cache_create("rcuscale", 136, 8, SLAB_STORE_USER, NULL);
-		rhp = kmem_cache_alloc(kcp, GFP_KERNEL);
-		pr_alert("mem_dump_obj() slab test: rcu_torture_stats = %px, &rhp = %px, rhp = %px, &z = %px\n", stats_task, &rhp, rhp, &z);
-		pr_alert("mem_dump_obj(ZERO_SIZE_PTR):");
-		mem_dump_obj(ZERO_SIZE_PTR);
-		pr_alert("mem_dump_obj(NULL):");
-		mem_dump_obj(NULL);
-		pr_alert("mem_dump_obj(%px):", &rhp);
-		mem_dump_obj(&rhp);
-		pr_alert("mem_dump_obj(%px):", rhp);
-		mem_dump_obj(rhp);
-		pr_alert("mem_dump_obj(%px):", &rhp->func);
-		mem_dump_obj(&rhp->func);
-		pr_alert("mem_dump_obj(%px):", &z);
-		mem_dump_obj(&z);
-		kmem_cache_free(kcp, rhp);
-		kmem_cache_destroy(kcp);
-		rhp = kmalloc(sizeof(*rhp), GFP_KERNEL);
-		pr_alert("mem_dump_obj() kmalloc test: rcu_torture_stats = %px, &rhp = %px, rhp = %px\n", stats_task, &rhp, rhp);
-		pr_alert("mem_dump_obj(kmalloc %px):", rhp);
-		mem_dump_obj(rhp);
-		pr_alert("mem_dump_obj(kmalloc %px):", &rhp->func);
-		mem_dump_obj(&rhp->func);
-		kfree(rhp);
-		rhp = vmalloc(4096);
-		pr_alert("mem_dump_obj() vmalloc test: rcu_torture_stats = %px, &rhp = %px, rhp = %px\n", stats_task, &rhp, rhp);
-		pr_alert("mem_dump_obj(vmalloc %px):", rhp);
-		mem_dump_obj(rhp);
-		pr_alert("mem_dump_obj(vmalloc %px):", &rhp->func);
-		mem_dump_obj(&rhp->func);
-		vfree(rhp);
-	}
-
 	return 0;
 }
 
+/* Test mem_dump_obj() and friends.  */
+static void rcu_torture_mem_dump_obj(void)
+{
+	struct rcu_head *rhp;
+	struct kmem_cache *kcp;
+	static int z;
+
+	kcp = kmem_cache_create("rcuscale", 136, 8, SLAB_STORE_USER, NULL);
+	rhp = kmem_cache_alloc(kcp, GFP_KERNEL);
+	pr_alert("mem_dump_obj() slab test: rcu_torture_stats = %px, &rhp = %px, rhp = %px, &z = %px\n", stats_task, &rhp, rhp, &z);
+	pr_alert("mem_dump_obj(ZERO_SIZE_PTR):");
+	mem_dump_obj(ZERO_SIZE_PTR);
+	pr_alert("mem_dump_obj(NULL):");
+	mem_dump_obj(NULL);
+	pr_alert("mem_dump_obj(%px):", &rhp);
+	mem_dump_obj(&rhp);
+	pr_alert("mem_dump_obj(%px):", rhp);
+	mem_dump_obj(rhp);
+	pr_alert("mem_dump_obj(%px):", &rhp->func);
+	mem_dump_obj(&rhp->func);
+	pr_alert("mem_dump_obj(%px):", &z);
+	mem_dump_obj(&z);
+	kmem_cache_free(kcp, rhp);
+	kmem_cache_destroy(kcp);
+	rhp = kmalloc(sizeof(*rhp), GFP_KERNEL);
+	pr_alert("mem_dump_obj() kmalloc test: rcu_torture_stats = %px, &rhp = %px, rhp = %px\n", stats_task, &rhp, rhp);
+	pr_alert("mem_dump_obj(kmalloc %px):", rhp);
+	mem_dump_obj(rhp);
+	pr_alert("mem_dump_obj(kmalloc %px):", &rhp->func);
+	mem_dump_obj(&rhp->func);
+	kfree(rhp);
+	rhp = vmalloc(4096);
+	pr_alert("mem_dump_obj() vmalloc test: rcu_torture_stats = %px, &rhp = %px, rhp = %px\n", stats_task, &rhp, rhp);
+	pr_alert("mem_dump_obj(vmalloc %px):", rhp);
+	mem_dump_obj(rhp);
+	pr_alert("mem_dump_obj(vmalloc %px):", &rhp->func);
+	mem_dump_obj(&rhp->func);
+	vfree(rhp);
+}
+
 static void
 rcu_torture_print_module_parms(struct rcu_torture_ops *cur_ops, const char *tag)
 {
@@ -2634,7 +2642,7 @@ static bool rcu_torture_can_boost(void)
 
 	if (!(test_boost == 1 && cur_ops->can_boost) && test_boost != 2)
 		return false;
-	if (!cur_ops->call)
+	if (!cur_ops->start_gp_poll || !cur_ops->poll_gp_state)
 		return false;
 
 	prio = rcu_get_gp_kthreads_prio();
@@ -2642,7 +2650,7 @@ static bool rcu_torture_can_boost(void)
 		return false;
 
 	if (prio < 2) {
-		if (boost_warn_once  == 1)
+		if (boost_warn_once == 1)
 			return false;
 
 		pr_alert("%s: WARN: RCU kthread priority too low to test boosting.  Skipping RCU boost test. Try passing rcutree.kthread_prio > 1 on the kernel command line.\n", KBUILD_MODNAME);
@@ -2818,6 +2826,8 @@ rcu_torture_cleanup(void)
 	if (cur_ops->cleanup != NULL)
 		cur_ops->cleanup();
 
+	rcu_torture_mem_dump_obj();
+
 	rcu_torture_stats_print();  /* -After- the stats thread is stopped! */
 
 	if (err_segs_recorded) {
@@ -3120,6 +3130,21 @@ rcu_torture_init(void)
 		if (firsterr < 0)
 			goto unwind;
 		rcutor_hp = firsterr;
+
+		// Testing RCU priority boosting requires rcutorture do
+		// some serious abuse.  Counter this by running ksoftirqd
+		// at higher priority.
+		if (IS_BUILTIN(CONFIG_RCU_TORTURE_TEST)) {
+			for_each_online_cpu(cpu) {
+				struct sched_param sp;
+				struct task_struct *t;
+
+				t = per_cpu(ksoftirqd, cpu);
+				WARN_ON_ONCE(!t);
+				sp.sched_priority = 2;
+				sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
+			}
+		}
 	}
 	shutdown_jiffies = jiffies + shutdown_secs * HZ;
 	firsterr = torture_shutdown_init(shutdown_secs, rcu_torture_cleanup);
diff --git a/kernel/rcu/refscale.c b/kernel/rcu/refscale.c
index 02dd9767b559..313d4547cbc7 100644
--- a/kernel/rcu/refscale.c
+++ b/kernel/rcu/refscale.c
@@ -362,6 +362,111 @@ static struct ref_scale_ops rwsem_ops = {
 	.name		= "rwsem"
 };
 
+// Definitions for global spinlock
+static DEFINE_SPINLOCK(test_lock);
+
+static void ref_lock_section(const int nloops)
+{
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--) {
+		spin_lock(&test_lock);
+		spin_unlock(&test_lock);
+	}
+	preempt_enable();
+}
+
+static void ref_lock_delay_section(const int nloops, const int udl, const int ndl)
+{
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--) {
+		spin_lock(&test_lock);
+		un_delay(udl, ndl);
+		spin_unlock(&test_lock);
+	}
+	preempt_enable();
+}
+
+static struct ref_scale_ops lock_ops = {
+	.readsection	= ref_lock_section,
+	.delaysection	= ref_lock_delay_section,
+	.name		= "lock"
+};
+
+// Definitions for global irq-save spinlock
+
+static void ref_lock_irq_section(const int nloops)
+{
+	unsigned long flags;
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--) {
+		spin_lock_irqsave(&test_lock, flags);
+		spin_unlock_irqrestore(&test_lock, flags);
+	}
+	preempt_enable();
+}
+
+static void ref_lock_irq_delay_section(const int nloops, const int udl, const int ndl)
+{
+	unsigned long flags;
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--) {
+		spin_lock_irqsave(&test_lock, flags);
+		un_delay(udl, ndl);
+		spin_unlock_irqrestore(&test_lock, flags);
+	}
+	preempt_enable();
+}
+
+static struct ref_scale_ops lock_irq_ops = {
+	.readsection	= ref_lock_irq_section,
+	.delaysection	= ref_lock_irq_delay_section,
+	.name		= "lock-irq"
+};
+
+// Definitions acquire-release.
+static DEFINE_PER_CPU(unsigned long, test_acqrel);
+
+static void ref_acqrel_section(const int nloops)
+{
+	unsigned long x;
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--) {
+		x = smp_load_acquire(this_cpu_ptr(&test_acqrel));
+		smp_store_release(this_cpu_ptr(&test_acqrel), x + 1);
+	}
+	preempt_enable();
+}
+
+static void ref_acqrel_delay_section(const int nloops, const int udl, const int ndl)
+{
+	unsigned long x;
+	int i;
+
+	preempt_disable();
+	for (i = nloops; i >= 0; i--) {
+		x = smp_load_acquire(this_cpu_ptr(&test_acqrel));
+		un_delay(udl, ndl);
+		smp_store_release(this_cpu_ptr(&test_acqrel), x + 1);
+	}
+	preempt_enable();
+}
+
+static struct ref_scale_ops acqrel_ops = {
+	.readsection	= ref_acqrel_section,
+	.delaysection	= ref_acqrel_delay_section,
+	.name		= "acqrel"
+};
+
 static void rcu_scale_one_reader(void)
 {
 	if (readdelay <= 0)
@@ -653,8 +758,8 @@ ref_scale_init(void)
 	long i;
 	int firsterr = 0;
 	static struct ref_scale_ops *scale_ops[] = {
-		&rcu_ops, &srcu_ops, &rcu_trace_ops, &rcu_tasks_ops,
-		&refcnt_ops, &rwlock_ops, &rwsem_ops,
+		&rcu_ops, &srcu_ops, &rcu_trace_ops, &rcu_tasks_ops, &refcnt_ops, &rwlock_ops,
+		&rwsem_ops, &lock_ops, &lock_irq_ops, &acqrel_ops,
 	};
 
 	if (!torture_init_begin(scale_type, verbose))
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index e26547b34ad3..6833d8887181 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -80,7 +80,7 @@ do {									\
  * srcu_read_unlock() running against them.  So if the is_static parameter
  * is set, don't initialize ->srcu_lock_count[] and ->srcu_unlock_count[].
  */
-static void init_srcu_struct_nodes(struct srcu_struct *ssp, bool is_static)
+static void init_srcu_struct_nodes(struct srcu_struct *ssp)
 {
 	int cpu;
 	int i;
@@ -90,6 +90,9 @@ static void init_srcu_struct_nodes(struct srcu_struct *ssp, bool is_static)
 	struct srcu_node *snp;
 	struct srcu_node *snp_first;
 
+	/* Initialize geometry if it has not already been initialized. */
+	rcu_init_geometry();
+
 	/* Work out the overall tree geometry. */
 	ssp->level[0] = &ssp->node[0];
 	for (i = 1; i < rcu_num_lvls; i++)
@@ -148,14 +151,6 @@ static void init_srcu_struct_nodes(struct srcu_struct *ssp, bool is_static)
 		timer_setup(&sdp->delay_work, srcu_delay_timer, 0);
 		sdp->ssp = ssp;
 		sdp->grpmask = 1 << (cpu - sdp->mynode->grplo);
-		if (is_static)
-			continue;
-
-		/* Dynamically allocated, better be no srcu_read_locks()! */
-		for (i = 0; i < ARRAY_SIZE(sdp->srcu_lock_count); i++) {
-			sdp->srcu_lock_count[i] = 0;
-			sdp->srcu_unlock_count[i] = 0;
-		}
 	}
 }
 
@@ -179,7 +174,7 @@ static int init_srcu_struct_fields(struct srcu_struct *ssp, bool is_static)
 		ssp->sda = alloc_percpu(struct srcu_data);
 	if (!ssp->sda)
 		return -ENOMEM;
-	init_srcu_struct_nodes(ssp, is_static);
+	init_srcu_struct_nodes(ssp);
 	ssp->srcu_gp_seq_needed_exp = 0;
 	ssp->srcu_last_gp_end = ktime_get_mono_fast_ns();
 	smp_store_release(&ssp->srcu_gp_seq_needed, 0); /* Init done. */
@@ -777,9 +772,9 @@ static bool srcu_might_be_idle(struct srcu_struct *ssp)
 	spin_unlock_irqrestore_rcu_node(sdp, flags);
 
 	/*
-	 * No local callbacks, so probabalistically probe global state.
+	 * No local callbacks, so probabilistically probe global state.
 	 * Exact information would require acquiring locks, which would
-	 * kill scalability, hence the probabalistic nature of the probe.
+	 * kill scalability, hence the probabilistic nature of the probe.
 	 */
 
 	/* First, see if enough time has passed since the last GP. */
@@ -1000,6 +995,9 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
  * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are
  * passed the same srcu_struct structure.
  *
+ * Implementation of these memory-ordering guarantees is similar to
+ * that of synchronize_rcu().
+ *
  * If SRCU is likely idle, expedite the first request.  This semantic
  * was provided by Classic SRCU, and is relied upon by its users, so TREE
  * SRCU must also provide it.  Note that detecting idleness is heuristic
@@ -1392,11 +1390,15 @@ void __init srcu_init(void)
 {
 	struct srcu_struct *ssp;
 
+	/*
+	 * Once that is set, call_srcu() can follow the normal path and
+	 * queue delayed work. This must follow RCU workqueues creation
+	 * and timers initialization.
+	 */
 	srcu_init_done = true;
 	while (!list_empty(&srcu_boot_list)) {
 		ssp = list_first_entry(&srcu_boot_list, struct srcu_struct,
 				      work.work.entry);
-		check_init_srcu_struct(ssp);
 		list_del_init(&ssp->work.work.entry);
 		queue_work(rcu_gp_wq, &ssp->work.work);
 	}
diff --git a/kernel/rcu/sync.c b/kernel/rcu/sync.c
index d4558ab7a07d..33d896d85902 100644
--- a/kernel/rcu/sync.c
+++ b/kernel/rcu/sync.c
@@ -94,9 +94,9 @@ static void rcu_sync_func(struct rcu_head *rhp)
 		rcu_sync_call(rsp);
 	} else {
 		/*
-		 * We're at least a GP after the last rcu_sync_exit(); eveybody
+		 * We're at least a GP after the last rcu_sync_exit(); everybody
 		 * will now have observed the write side critical section.
-		 * Let 'em rip!.
+		 * Let 'em rip!
 		 */
 		WRITE_ONCE(rsp->gp_state, GP_IDLE);
 	}
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index 350ebf5051f9..03a118d1c003 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -23,7 +23,7 @@ typedef void (*postgp_func_t)(struct rcu_tasks *rtp);
  * struct rcu_tasks - Definition for a Tasks-RCU-like mechanism.
  * @cbs_head: Head of callback list.
  * @cbs_tail: Tail pointer for callback list.
- * @cbs_wq: Wait queue allowning new callback to get kthread's attention.
+ * @cbs_wq: Wait queue allowing new callback to get kthread's attention.
  * @cbs_lock: Lock protecting callback list.
  * @kthread_ptr: This flavor's grace-period/callback-invocation kthread.
  * @gp_func: This flavor's grace-period-wait function.
@@ -377,6 +377,46 @@ static void rcu_tasks_wait_gp(struct rcu_tasks *rtp)
 // Finally, this implementation does not support high call_rcu_tasks()
 // rates from multiple CPUs.  If this is required, per-CPU callback lists
 // will be needed.
+//
+// The implementation uses rcu_tasks_wait_gp(), which relies on function
+// pointers in the rcu_tasks structure.  The rcu_spawn_tasks_kthread()
+// function sets these function pointers up so that rcu_tasks_wait_gp()
+// invokes these functions in this order:
+//
+// rcu_tasks_pregp_step():
+//	Invokes synchronize_rcu() in order to wait for all in-flight
+//	t->on_rq and t->nvcsw transitions to complete.	This works because
+//	all such transitions are carried out with interrupts disabled.
+// rcu_tasks_pertask(), invoked on every non-idle task:
+//	For every runnable non-idle task other than the current one, use
+//	get_task_struct() to pin down that task, snapshot that task's
+//	number of voluntary context switches, and add that task to the
+//	holdout list.
+// rcu_tasks_postscan():
+//	Invoke synchronize_srcu() to ensure that all tasks that were
+//	in the process of exiting (and which thus might not know to
+//	synchronize with this RCU Tasks grace period) have completed
+//	exiting.
+// check_all_holdout_tasks(), repeatedly until holdout list is empty:
+//	Scans the holdout list, attempting to identify a quiescent state
+//	for each task on the list.  If there is a quiescent state, the
+//	corresponding task is removed from the holdout list.
+// rcu_tasks_postgp():
+//	Invokes synchronize_rcu() in order to ensure that all prior
+//	t->on_rq and t->nvcsw transitions are seen by all CPUs and tasks
+//	to have happened before the end of this RCU Tasks grace period.
+//	Again, this works because all such transitions are carried out
+//	with interrupts disabled.
+//
+// For each exiting task, the exit_tasks_rcu_start() and
+// exit_tasks_rcu_finish() functions begin and end, respectively, the SRCU
+// read-side critical sections waited for by rcu_tasks_postscan().
+//
+// Pre-grace-period update-side code is ordered before the grace via the
+// ->cbs_lock and the smp_mb__after_spinlock().  Pre-grace-period read-side
+// code is ordered before the grace period via synchronize_rcu() call
+// in rcu_tasks_pregp_step() and by the scheduler's locks and interrupt
+// disabling.
 
 /* Pre-grace-period preparation. */
 static void rcu_tasks_pregp_step(void)
@@ -504,7 +544,7 @@ DEFINE_RCU_TASKS(rcu_tasks, rcu_tasks_wait_gp, call_rcu_tasks, "RCU Tasks");
  * or transition to usermode execution.  As such, there are no read-side
  * primitives analogous to rcu_read_lock() and rcu_read_unlock() because
  * this primitive is intended to determine that all tasks have passed
- * through a safe state, not so much for data-strcuture synchronization.
+ * through a safe state, not so much for data-structure synchronization.
  *
  * See the description of call_rcu() for more detailed information on
  * memory ordering guarantees.
@@ -605,8 +645,13 @@ void exit_tasks_rcu_finish(void) { exit_tasks_rcu_finish_trace(current); }
 // passing an empty function to schedule_on_each_cpu().  This approach
 // provides an asynchronous call_rcu_tasks_rude() API and batching
 // of concurrent calls to the synchronous synchronize_rcu_rude() API.
-// This sends IPIs far and wide and induces otherwise unnecessary context
-// switches on all online CPUs, whether idle or not.
+// This invokes schedule_on_each_cpu() in order to send IPIs far and wide
+// and induces otherwise unnecessary context switches on all online CPUs,
+// whether idle or not.
+//
+// Callback handling is provided by the rcu_tasks_kthread() function.
+//
+// Ordering is provided by the scheduler's context-switch code.
 
 // Empty function to allow workqueues to force a context switch.
 static void rcu_tasks_be_rude(struct work_struct *work)
@@ -637,7 +682,7 @@ DEFINE_RCU_TASKS(rcu_tasks_rude, rcu_tasks_rude_wait_gp, call_rcu_tasks_rude,
  * there are no read-side primitives analogous to rcu_read_lock() and
  * rcu_read_unlock() because this primitive is intended to determine
  * that all tasks have passed through a safe state, not so much for
- * data-strcuture synchronization.
+ * data-structure synchronization.
  *
  * See the description of call_rcu() for more detailed information on
  * memory ordering guarantees.
@@ -1163,7 +1208,7 @@ static void exit_tasks_rcu_finish_trace(struct task_struct *t)
  * there are no read-side primitives analogous to rcu_read_lock() and
  * rcu_read_unlock() because this primitive is intended to determine
  * that all tasks have passed through a safe state, not so much for
- * data-strcuture synchronization.
+ * data-structure synchronization.
  *
  * See the description of call_rcu() for more detailed information on
  * memory ordering guarantees.
@@ -1356,5 +1401,4 @@ void __init rcu_init_tasks_generic(void)
 
 #else /* #ifdef CONFIG_TASKS_RCU_GENERIC */
 static inline void rcu_tasks_bootup_oddness(void) {}
-void show_rcu_tasks_gp_kthreads(void) {}
 #endif /* #else #ifdef CONFIG_TASKS_RCU_GENERIC */
diff --git a/kernel/rcu/tiny.c b/kernel/rcu/tiny.c
index c8a029fbb114..340b3f8b090d 100644
--- a/kernel/rcu/tiny.c
+++ b/kernel/rcu/tiny.c
@@ -221,5 +221,4 @@ void __init rcu_init(void)
 {
 	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 	rcu_early_boot_tests();
-	srcu_init();
 }
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index f12056beb916..51f24ecd94b2 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -188,6 +188,17 @@ module_param(rcu_unlock_delay, int, 0444);
 static int rcu_min_cached_objs = 5;
 module_param(rcu_min_cached_objs, int, 0444);
 
+// A page shrinker can ask for pages to be freed to make them
+// available for other parts of the system. This usually happens
+// under low memory conditions, and in that case we should also
+// defer page-cache filling for a short time period.
+//
+// The default value is 5 seconds, which is long enough to reduce
+// interference with the shrinker while it asks other systems to
+// drain their caches.
+static int rcu_delay_page_cache_fill_msec = 5000;
+module_param(rcu_delay_page_cache_fill_msec, int, 0444);
+
 /* Retrieve RCU kthreads priority for rcutorture */
 int rcu_get_gp_kthreads_prio(void)
 {
@@ -204,7 +215,7 @@ EXPORT_SYMBOL_GPL(rcu_get_gp_kthreads_prio);
  * the need for long delays to increase some race probabilities with the
  * need for fast grace periods to increase other race probabilities.
  */
-#define PER_RCU_NODE_PERIOD 3	/* Number of grace periods between delays. */
+#define PER_RCU_NODE_PERIOD 3	/* Number of grace periods between delays for debugging. */
 
 /*
  * Compute the mask of online CPUs for the specified rcu_node structure.
@@ -244,6 +255,7 @@ void rcu_softirq_qs(void)
 {
 	rcu_qs();
 	rcu_preempt_deferred_qs(current);
+	rcu_tasks_qs(current, false);
 }
 
 /*
@@ -835,28 +847,6 @@ void noinstr rcu_irq_exit(void)
 	rcu_nmi_exit();
 }
 
-/**
- * rcu_irq_exit_preempt - Inform RCU that current CPU is exiting irq
- *			  towards in kernel preemption
- *
- * Same as rcu_irq_exit() but has a sanity check that scheduling is safe
- * from RCU point of view. Invoked from return from interrupt before kernel
- * preemption.
- */
-void rcu_irq_exit_preempt(void)
-{
-	lockdep_assert_irqs_disabled();
-	rcu_nmi_exit();
-
-	RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nesting) <= 0,
-			 "RCU dynticks_nesting counter underflow/zero!");
-	RCU_LOCKDEP_WARN(__this_cpu_read(rcu_data.dynticks_nmi_nesting) !=
-			 DYNTICK_IRQ_NONIDLE,
-			 "Bad RCU  dynticks_nmi_nesting counter\n");
-	RCU_LOCKDEP_WARN(rcu_dynticks_curr_cpu_in_eqs(),
-			 "RCU in extended quiescent state!");
-}
-
 #ifdef CONFIG_PROVE_RCU
 /**
  * rcu_irq_exit_check_preempt - Validate that scheduling is possible
@@ -961,7 +951,7 @@ EXPORT_SYMBOL_GPL(rcu_idle_exit);
  */
 void noinstr rcu_user_exit(void)
 {
-	rcu_eqs_exit(1);
+	rcu_eqs_exit(true);
 }
 
 /**
@@ -1227,7 +1217,7 @@ EXPORT_SYMBOL_GPL(rcu_lockdep_current_cpu_online);
 #endif /* #if defined(CONFIG_PROVE_RCU) && defined(CONFIG_HOTPLUG_CPU) */
 
 /*
- * We are reporting a quiescent state on behalf of some other CPU, so
+ * When trying to report a quiescent state on behalf of some other CPU,
  * it is our responsibility to check for and handle potential overflow
  * of the rcu_node ->gp_seq counter with respect to the rcu_data counters.
  * After all, the CPU might be in deep idle state, and thus executing no
@@ -2050,7 +2040,7 @@ static void rcu_gp_fqs_loop(void)
 /*
  * Clean up after the old grace period.
  */
-static void rcu_gp_cleanup(void)
+static noinline void rcu_gp_cleanup(void)
 {
 	int cpu;
 	bool needgp = false;
@@ -2491,7 +2481,7 @@ int rcutree_dead_cpu(unsigned int cpu)
 
 /*
  * Invoke any RCU callbacks that have made it to the end of their grace
- * period.  Thottle as specified by rdp->blimit.
+ * period.  Throttle as specified by rdp->blimit.
  */
 static void rcu_do_batch(struct rcu_data *rdp)
 {
@@ -2631,7 +2621,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
  * state, for example, user mode or idle loop.  It also schedules RCU
  * core processing.  If the current grace period has gone on too long,
  * it will ask the scheduler to manufacture a context switch for the sole
- * purpose of providing a providing the needed quiescent state.
+ * purpose of providing the needed quiescent state.
  */
 void rcu_sched_clock_irq(int user)
 {
@@ -2913,7 +2903,6 @@ static int __init rcu_spawn_core_kthreads(void)
 		  "%s: Could not start rcuc kthread, OOM is now expected behavior\n", __func__);
 	return 0;
 }
-early_initcall(rcu_spawn_core_kthreads);
 
 /*
  * Handle any core-RCU processing required by a call_rcu() invocation.
@@ -3084,12 +3073,14 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func)
  * period elapses, in other words after all pre-existing RCU read-side
  * critical sections have completed.  However, the callback function
  * might well execute concurrently with RCU read-side critical sections
- * that started after call_rcu() was invoked.  RCU read-side critical
- * sections are delimited by rcu_read_lock() and rcu_read_unlock(), and
- * may be nested.  In addition, regions of code across which interrupts,
- * preemption, or softirqs have been disabled also serve as RCU read-side
- * critical sections.  This includes hardware interrupt handlers, softirq
- * handlers, and NMI handlers.
+ * that started after call_rcu() was invoked.
+ *
+ * RCU read-side critical sections are delimited by rcu_read_lock()
+ * and rcu_read_unlock(), and may be nested.  In addition, but only in
+ * v5.0 and later, regions of code across which interrupts, preemption,
+ * or softirqs have been disabled also serve as RCU read-side critical
+ * sections.  This includes hardware interrupt handlers, softirq handlers,
+ * and NMI handlers.
  *
  * Note that all CPUs must agree that the grace period extended beyond
  * all pre-existing RCU read-side critical section.  On systems with more
@@ -3109,6 +3100,9 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func)
  * between the call to call_rcu() and the invocation of "func()" -- even
  * if CPU A and CPU B are the same CPU (but again only if the system has
  * more than one CPU).
+ *
+ * Implementation of these memory-ordering guarantees is described here:
+ * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
  */
 void call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
@@ -3173,6 +3167,7 @@ struct kfree_rcu_cpu_work {
  *	Even though it is lockless an access has to be protected by the
  *	per-cpu lock.
  * @page_cache_work: A work to refill the cache when it is empty
+ * @backoff_page_cache_fill: Delay cache refills
  * @work_in_progress: Indicates that page_cache_work is running
  * @hrtimer: A hrtimer for scheduling a page_cache_work
  * @nr_bkv_objs: number of allocated objects at @bkvcache.
@@ -3192,7 +3187,8 @@ struct kfree_rcu_cpu {
 	bool initialized;
 	int count;
 
-	struct work_struct page_cache_work;
+	struct delayed_work page_cache_work;
+	atomic_t backoff_page_cache_fill;
 	atomic_t work_in_progress;
 	struct hrtimer hrtimer;
 
@@ -3239,7 +3235,7 @@ get_cached_bnode(struct kfree_rcu_cpu *krcp)
 	if (!krcp->nr_bkv_objs)
 		return NULL;
 
-	krcp->nr_bkv_objs--;
+	WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs - 1);
 	return (struct kvfree_rcu_bulk_data *)
 		llist_del_first(&krcp->bkvcache);
 }
@@ -3253,14 +3249,33 @@ put_cached_bnode(struct kfree_rcu_cpu *krcp,
 		return false;
 
 	llist_add((struct llist_node *) bnode, &krcp->bkvcache);
-	krcp->nr_bkv_objs++;
+	WRITE_ONCE(krcp->nr_bkv_objs, krcp->nr_bkv_objs + 1);
 	return true;
+}
+
+static int
+drain_page_cache(struct kfree_rcu_cpu *krcp)
+{
+	unsigned long flags;
+	struct llist_node *page_list, *pos, *n;
+	int freed = 0;
 
+	raw_spin_lock_irqsave(&krcp->lock, flags);
+	page_list = llist_del_all(&krcp->bkvcache);
+	WRITE_ONCE(krcp->nr_bkv_objs, 0);
+	raw_spin_unlock_irqrestore(&krcp->lock, flags);
+
+	llist_for_each_safe(pos, n, page_list) {
+		free_page((unsigned long)pos);
+		freed++;
+	}
+
+	return freed;
 }
 
 /*
  * This function is invoked in workqueue context after a grace period.
- * It frees all the objects queued on ->bhead_free or ->head_free.
+ * It frees all the objects queued on ->bkvhead_free or ->head_free.
  */
 static void kfree_rcu_work(struct work_struct *work)
 {
@@ -3287,7 +3302,7 @@ static void kfree_rcu_work(struct work_struct *work)
 	krwp->head_free = NULL;
 	raw_spin_unlock_irqrestore(&krcp->lock, flags);
 
-	// Handle two first channels.
+	// Handle the first two channels.
 	for (i = 0; i < FREE_N_CHANNELS; i++) {
 		for (; bkvhead[i]; bkvhead[i] = bnext) {
 			bnext = bkvhead[i]->next;
@@ -3325,9 +3340,11 @@ static void kfree_rcu_work(struct work_struct *work)
 	}
 
 	/*
-	 * Emergency case only. It can happen under low memory
-	 * condition when an allocation gets failed, so the "bulk"
-	 * path can not be temporary maintained.
+	 * This is used when the "bulk" path can not be used for the
+	 * double-argument of kvfree_rcu().  This happens when the
+	 * page-cache is empty, which means that objects are instead
+	 * queued on a linked list through their rcu_head structures.
+	 * This list is named "Channel 3".
 	 */
 	for (; head; head = next) {
 		unsigned long offset = (unsigned long)head->func;
@@ -3347,34 +3364,31 @@ static void kfree_rcu_work(struct work_struct *work)
 }
 
 /*
- * Schedule the kfree batch RCU work to run in workqueue context after a GP.
- *
- * This function is invoked by kfree_rcu_monitor() when the KFREE_DRAIN_JIFFIES
- * timeout has been reached.
+ * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
  */
-static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
+static void kfree_rcu_monitor(struct work_struct *work)
 {
-	struct kfree_rcu_cpu_work *krwp;
-	bool repeat = false;
+	struct kfree_rcu_cpu *krcp = container_of(work,
+		struct kfree_rcu_cpu, monitor_work.work);
+	unsigned long flags;
 	int i, j;
 
-	lockdep_assert_held(&krcp->lock);
+	raw_spin_lock_irqsave(&krcp->lock, flags);
 
+	// Attempt to start a new batch.
 	for (i = 0; i < KFREE_N_BATCHES; i++) {
-		krwp = &(krcp->krw_arr[i]);
+		struct kfree_rcu_cpu_work *krwp = &(krcp->krw_arr[i]);
 
-		/*
-		 * Try to detach bkvhead or head and attach it over any
-		 * available corresponding free channel. It can be that
-		 * a previous RCU batch is in progress, it means that
-		 * immediately to queue another one is not possible so
-		 * return false to tell caller to retry.
-		 */
+		// Try to detach bkvhead or head and attach it over any
+		// available corresponding free channel. It can be that
+		// a previous RCU batch is in progress, it means that
+		// immediately to queue another one is not possible so
+		// in that case the monitor work is rearmed.
 		if ((krcp->bkvhead[0] && !krwp->bkvhead_free[0]) ||
 			(krcp->bkvhead[1] && !krwp->bkvhead_free[1]) ||
 				(krcp->head && !krwp->head_free)) {
-			// Channel 1 corresponds to SLAB ptrs.
-			// Channel 2 corresponds to vmalloc ptrs.
+			// Channel 1 corresponds to the SLAB-pointer bulk path.
+			// Channel 2 corresponds to vmalloc-pointer bulk path.
 			for (j = 0; j < FREE_N_CHANNELS; j++) {
 				if (!krwp->bkvhead_free[j]) {
 					krwp->bkvhead_free[j] = krcp->bkvhead[j];
@@ -3382,7 +3396,8 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
 				}
 			}
 
-			// Channel 3 corresponds to emergency path.
+			// Channel 3 corresponds to both SLAB and vmalloc
+			// objects queued on the linked list.
 			if (!krwp->head_free) {
 				krwp->head_free = krcp->head;
 				krcp->head = NULL;
@@ -3390,65 +3405,35 @@ static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
 
 			WRITE_ONCE(krcp->count, 0);
 
-			/*
-			 * One work is per one batch, so there are three
-			 * "free channels", the batch can handle. It can
-			 * be that the work is in the pending state when
-			 * channels have been detached following by each
-			 * other.
-			 */
+			// One work is per one batch, so there are three
+			// "free channels", the batch can handle. It can
+			// be that the work is in the pending state when
+			// channels have been detached following by each
+			// other.
 			queue_rcu_work(system_wq, &krwp->rcu_work);
 		}
-
-		// Repeat if any "free" corresponding channel is still busy.
-		if (krcp->bkvhead[0] || krcp->bkvhead[1] || krcp->head)
-			repeat = true;
 	}
 
-	return !repeat;
-}
-
-static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
-					  unsigned long flags)
-{
-	// Attempt to start a new batch.
-	krcp->monitor_todo = false;
-	if (queue_kfree_rcu_work(krcp)) {
-		// Success! Our job is done here.
-		raw_spin_unlock_irqrestore(&krcp->lock, flags);
-		return;
-	}
+	// If there is nothing to detach, it means that our job is
+	// successfully done here. In case of having at least one
+	// of the channels that is still busy we should rearm the
+	// work to repeat an attempt. Because previous batches are
+	// still in progress.
+	if (!krcp->bkvhead[0] && !krcp->bkvhead[1] && !krcp->head)
+		krcp->monitor_todo = false;
+	else
+		schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
 
-	// Previous RCU batch still in progress, try again later.
-	krcp->monitor_todo = true;
-	schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
 	raw_spin_unlock_irqrestore(&krcp->lock, flags);
 }
 
-/*
- * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
- * It invokes kfree_rcu_drain_unlock() to attempt to start another batch.
- */
-static void kfree_rcu_monitor(struct work_struct *work)
-{
-	unsigned long flags;
-	struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
-						 monitor_work.work);
-
-	raw_spin_lock_irqsave(&krcp->lock, flags);
-	if (krcp->monitor_todo)
-		kfree_rcu_drain_unlock(krcp, flags);
-	else
-		raw_spin_unlock_irqrestore(&krcp->lock, flags);
-}
-
 static enum hrtimer_restart
 schedule_page_work_fn(struct hrtimer *t)
 {
 	struct kfree_rcu_cpu *krcp =
 		container_of(t, struct kfree_rcu_cpu, hrtimer);
 
-	queue_work(system_highpri_wq, &krcp->page_cache_work);
+	queue_delayed_work(system_highpri_wq, &krcp->page_cache_work, 0);
 	return HRTIMER_NORESTART;
 }
 
@@ -3457,12 +3442,16 @@ static void fill_page_cache_func(struct work_struct *work)
 	struct kvfree_rcu_bulk_data *bnode;
 	struct kfree_rcu_cpu *krcp =
 		container_of(work, struct kfree_rcu_cpu,
-			page_cache_work);
+			page_cache_work.work);
 	unsigned long flags;
+	int nr_pages;
 	bool pushed;
 	int i;
 
-	for (i = 0; i < rcu_min_cached_objs; i++) {
+	nr_pages = atomic_read(&krcp->backoff_page_cache_fill) ?
+		1 : rcu_min_cached_objs;
+
+	for (i = 0; i < nr_pages; i++) {
 		bnode = (struct kvfree_rcu_bulk_data *)
 			__get_free_page(GFP_KERNEL | __GFP_NORETRY | __GFP_NOMEMALLOC | __GFP_NOWARN);
 
@@ -3479,6 +3468,7 @@ static void fill_page_cache_func(struct work_struct *work)
 	}
 
 	atomic_set(&krcp->work_in_progress, 0);
+	atomic_set(&krcp->backoff_page_cache_fill, 0);
 }
 
 static void
@@ -3486,10 +3476,15 @@ run_page_cache_worker(struct kfree_rcu_cpu *krcp)
 {
 	if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
 			!atomic_xchg(&krcp->work_in_progress, 1)) {
-		hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC,
-			HRTIMER_MODE_REL);
-		krcp->hrtimer.function = schedule_page_work_fn;
-		hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
+		if (atomic_read(&krcp->backoff_page_cache_fill)) {
+			queue_delayed_work(system_wq,
+				&krcp->page_cache_work,
+					msecs_to_jiffies(rcu_delay_page_cache_fill_msec));
+		} else {
+			hrtimer_init(&krcp->hrtimer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+			krcp->hrtimer.function = schedule_page_work_fn;
+			hrtimer_start(&krcp->hrtimer, 0, HRTIMER_MODE_REL);
+		}
 	}
 }
 
@@ -3554,11 +3549,11 @@ add_ptr_to_bulk_krc_lock(struct kfree_rcu_cpu **krcp,
 }
 
 /*
- * Queue a request for lazy invocation of appropriate free routine after a
- * grace period. Please note there are three paths are maintained, two are the
- * main ones that use array of pointers interface and third one is emergency
- * one, that is used only when the main path can not be maintained temporary,
- * due to memory pressure.
+ * Queue a request for lazy invocation of the appropriate free routine
+ * after a grace period.  Please note that three paths are maintained,
+ * two for the common case using arrays of pointers and a third one that
+ * is used only when the main paths cannot be used, for example, due to
+ * memory pressure.
  *
  * Each kvfree_call_rcu() request is added to a batch. The batch will be drained
  * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
@@ -3647,6 +3642,8 @@ kfree_rcu_shrink_count(struct shrinker *shrink, struct shrink_control *sc)
 		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
 		count += READ_ONCE(krcp->count);
+		count += READ_ONCE(krcp->nr_bkv_objs);
+		atomic_set(&krcp->backoff_page_cache_fill, 1);
 	}
 
 	return count;
@@ -3656,18 +3653,14 @@ static unsigned long
 kfree_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
 {
 	int cpu, freed = 0;
-	unsigned long flags;
 
 	for_each_possible_cpu(cpu) {
 		int count;
 		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
 		count = krcp->count;
-		raw_spin_lock_irqsave(&krcp->lock, flags);
-		if (krcp->monitor_todo)
-			kfree_rcu_drain_unlock(krcp, flags);
-		else
-			raw_spin_unlock_irqrestore(&krcp->lock, flags);
+		count += drain_page_cache(krcp);
+		kfree_rcu_monitor(&krcp->monitor_work.work);
 
 		sc->nr_to_scan -= count;
 		freed += count;
@@ -3695,7 +3688,8 @@ void __init kfree_rcu_scheduler_running(void)
 		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
 		raw_spin_lock_irqsave(&krcp->lock, flags);
-		if (!krcp->head || krcp->monitor_todo) {
+		if ((!krcp->bkvhead[0] && !krcp->bkvhead[1] && !krcp->head) ||
+				krcp->monitor_todo) {
 			raw_spin_unlock_irqrestore(&krcp->lock, flags);
 			continue;
 		}
@@ -3752,10 +3746,12 @@ static int rcu_blocking_is_gp(void)
  * read-side critical sections have completed.  Note, however, that
  * upon return from synchronize_rcu(), the caller might well be executing
  * concurrently with new RCU read-side critical sections that began while
- * synchronize_rcu() was waiting.  RCU read-side critical sections are
- * delimited by rcu_read_lock() and rcu_read_unlock(), and may be nested.
- * In addition, regions of code across which interrupts, preemption, or
- * softirqs have been disabled also serve as RCU read-side critical
+ * synchronize_rcu() was waiting.
+ *
+ * RCU read-side critical sections are delimited by rcu_read_lock()
+ * and rcu_read_unlock(), and may be nested.  In addition, but only in
+ * v5.0 and later, regions of code across which interrupts, preemption,
+ * or softirqs have been disabled also serve as RCU read-side critical
  * sections.  This includes hardware interrupt handlers, softirq handlers,
  * and NMI handlers.
  *
@@ -3776,6 +3772,9 @@ static int rcu_blocking_is_gp(void)
  * to have executed a full memory barrier during the execution of
  * synchronize_rcu() -- even if CPU A and CPU B are the same CPU (but
  * again only if the system has more than one CPU).
+ *
+ * Implementation of these memory-ordering guarantees is described here:
+ * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
  */
 void synchronize_rcu(void)
 {
@@ -3846,11 +3845,11 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu);
 /**
  * poll_state_synchronize_rcu - Conditionally wait for an RCU grace period
  *
- * @oldstate: return from call to get_state_synchronize_rcu() or start_poll_synchronize_rcu()
+ * @oldstate: value from get_state_synchronize_rcu() or start_poll_synchronize_rcu()
  *
  * If a full RCU grace period has elapsed since the earlier call from
  * which oldstate was obtained, return @true, otherwise return @false.
- * If @false is returned, it is the caller's responsibilty to invoke this
+ * If @false is returned, it is the caller's responsibility to invoke this
  * function later on until it does return @true.  Alternatively, the caller
  * can explicitly wait for a grace period, for example, by passing @oldstate
  * to cond_synchronize_rcu() or by directly invoking synchronize_rcu().
@@ -3862,6 +3861,11 @@ EXPORT_SYMBOL_GPL(start_poll_synchronize_rcu);
  * (many hours even on 32-bit systems) should check them occasionally
  * and either refresh them or set a flag indicating that the grace period
  * has completed.
+ *
+ * This function provides the same memory-ordering guarantees that
+ * would be provided by a synchronize_rcu() that was invoked at the call
+ * to the function that provided @oldstate, and that returned at the end
+ * of this function.
  */
 bool poll_state_synchronize_rcu(unsigned long oldstate)
 {
@@ -3876,7 +3880,7 @@ EXPORT_SYMBOL_GPL(poll_state_synchronize_rcu);
 /**
  * cond_synchronize_rcu - Conditionally wait for an RCU grace period
  *
- * @oldstate: return value from earlier call to get_state_synchronize_rcu()
+ * @oldstate: value from get_state_synchronize_rcu() or start_poll_synchronize_rcu()
  *
  * If a full RCU grace period has elapsed since the earlier call to
  * get_state_synchronize_rcu() or start_poll_synchronize_rcu(), just return.
@@ -3886,6 +3890,11 @@ EXPORT_SYMBOL_GPL(poll_state_synchronize_rcu);
  * counter wrap is harmless.  If the counter wraps, we have waited for
  * more than 2 billion grace periods (and way more on a 64-bit system!),
  * so waiting for one additional grace period should be just fine.
+ *
+ * This function provides the same memory-ordering guarantees that
+ * would be provided by a synchronize_rcu() that was invoked at the call
+ * to the function that provided @oldstate, and that returned at the end
+ * of this function.
  */
 void cond_synchronize_rcu(unsigned long oldstate)
 {
@@ -3913,7 +3922,7 @@ static int rcu_pending(int user)
 	check_cpu_stall(rdp);
 
 	/* Does this CPU need a deferred NOCB wakeup? */
-	if (rcu_nocb_need_deferred_wakeup(rdp))
+	if (rcu_nocb_need_deferred_wakeup(rdp, RCU_NOCB_WAKE))
 		return 1;
 
 	/* Is this a nohz_full CPU in userspace or idle?  (Ignore RCU if so.) */
@@ -4096,7 +4105,7 @@ EXPORT_SYMBOL_GPL(rcu_barrier);
 /*
  * Propagate ->qsinitmask bits up the rcu_node tree to account for the
  * first CPU in a given leaf rcu_node structure coming online.  The caller
- * must hold the corresponding leaf rcu_node ->lock with interrrupts
+ * must hold the corresponding leaf rcu_node ->lock with interrupts
  * disabled.
  */
 static void rcu_init_new_rnp(struct rcu_node *rnp_leaf)
@@ -4191,7 +4200,7 @@ int rcutree_prepare_cpu(unsigned int cpu)
 	rdp->rcu_iw_gp_seq = rdp->gp_seq - 1;
 	trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuonl"));
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
-	rcu_prepare_kthreads(cpu);
+	rcu_spawn_one_boost_kthread(rnp);
 	rcu_spawn_cpu_nocb_kthread(cpu);
 	WRITE_ONCE(rcu_state.n_online_cpus, rcu_state.n_online_cpus + 1);
 
@@ -4474,6 +4483,7 @@ static int __init rcu_spawn_gp_kthread(void)
 	wake_up_process(t);
 	rcu_spawn_nocb_kthreads();
 	rcu_spawn_boost_kthreads();
+	rcu_spawn_core_kthreads();
 	return 0;
 }
 early_initcall(rcu_spawn_gp_kthread);
@@ -4584,11 +4594,25 @@ static void __init rcu_init_one(void)
  * replace the definitions in tree.h because those are needed to size
  * the ->node array in the rcu_state structure.
  */
-static void __init rcu_init_geometry(void)
+void rcu_init_geometry(void)
 {
 	ulong d;
 	int i;
+	static unsigned long old_nr_cpu_ids;
 	int rcu_capacity[RCU_NUM_LVLS];
+	static bool initialized;
+
+	if (initialized) {
+		/*
+		 * Warn if setup_nr_cpu_ids() had not yet been invoked,
+		 * unless nr_cpus_ids == NR_CPUS, in which case who cares?
+		 */
+		WARN_ON_ONCE(old_nr_cpu_ids != nr_cpu_ids);
+		return;
+	}
+
+	old_nr_cpu_ids = nr_cpu_ids;
+	initialized = true;
 
 	/*
 	 * Initialize any unspecified boot parameters.
@@ -4689,6 +4713,18 @@ static void __init kfree_rcu_batch_init(void)
 	int cpu;
 	int i;
 
+	/* Clamp it to [0:100] seconds interval. */
+	if (rcu_delay_page_cache_fill_msec < 0 ||
+		rcu_delay_page_cache_fill_msec > 100 * MSEC_PER_SEC) {
+
+		rcu_delay_page_cache_fill_msec =
+			clamp(rcu_delay_page_cache_fill_msec, 0,
+				(int) (100 * MSEC_PER_SEC));
+
+		pr_info("Adjusting rcutree.rcu_delay_page_cache_fill_msec to %d ms.\n",
+			rcu_delay_page_cache_fill_msec);
+	}
+
 	for_each_possible_cpu(cpu) {
 		struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
@@ -4698,7 +4734,7 @@ static void __init kfree_rcu_batch_init(void)
 		}
 
 		INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
-		INIT_WORK(&krcp->page_cache_work, fill_page_cache_func);
+		INIT_DELAYED_WORK(&krcp->page_cache_work, fill_page_cache_func);
 		krcp->initialized = true;
 	}
 	if (register_shrinker(&kfree_rcu_shrinker))
@@ -4732,12 +4768,11 @@ void __init rcu_init(void)
 		rcutree_online_cpu(cpu);
 	}
 
-	/* Create workqueue for expedited GPs and for Tree SRCU. */
+	/* Create workqueue for Tree SRCU and for expedited GPs. */
 	rcu_gp_wq = alloc_workqueue("rcu_gp", WQ_MEM_RECLAIM, 0);
 	WARN_ON(!rcu_gp_wq);
 	rcu_par_gp_wq = alloc_workqueue("rcu_par_gp", WQ_MEM_RECLAIM, 0);
 	WARN_ON(!rcu_par_gp_wq);
-	srcu_init();
 
 	/* Fill in default value for rcutree.qovld boot parameter. */
 	/* -After- the rcu_node ->lock fields are initialized! */
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 71821d59d95c..305cf6aeb408 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -115,6 +115,7 @@ struct rcu_node {
 				/*  boosting for this rcu_node structure. */
 	unsigned int boost_kthread_status;
 				/* State of boost_kthread_task for tracing. */
+	unsigned long n_boosts;	/* Number of boosts for this rcu_node structure. */
 #ifdef CONFIG_RCU_NOCB_CPU
 	struct swait_queue_head nocb_gp_wq[2];
 				/* Place for rcu_nocb_kthread() to wait GP. */
@@ -153,7 +154,7 @@ struct rcu_data {
 	unsigned long	gp_seq;		/* Track rsp->gp_seq counter. */
 	unsigned long	gp_seq_needed;	/* Track furthest future GP request. */
 	union rcu_noqs	cpu_no_qs;	/* No QSes yet for this CPU. */
-	bool		core_needs_qs;	/* Core waits for quiesc state. */
+	bool		core_needs_qs;	/* Core waits for quiescent state. */
 	bool		beenonline;	/* CPU online at least once. */
 	bool		gpwrap;		/* Possible ->gp_seq wrap. */
 	bool		exp_deferred_qs; /* This CPU awaiting a deferred QS? */
@@ -218,7 +219,6 @@ struct rcu_data {
 
 	/* The following fields are used by GP kthread, hence own cacheline. */
 	raw_spinlock_t nocb_gp_lock ____cacheline_internodealigned_in_smp;
-	struct timer_list nocb_bypass_timer; /* Force nocb_bypass flush. */
 	u8 nocb_gp_sleep;		/* Is the nocb GP thread asleep? */
 	u8 nocb_gp_bypass;		/* Found a bypass on last scan? */
 	u8 nocb_gp_gp;			/* GP to wait for on last scan? */
@@ -257,10 +257,10 @@ struct rcu_data {
 };
 
 /* Values for nocb_defer_wakeup field in struct rcu_data. */
-#define RCU_NOCB_WAKE_OFF	-1
 #define RCU_NOCB_WAKE_NOT	0
-#define RCU_NOCB_WAKE		1
-#define RCU_NOCB_WAKE_FORCE	2
+#define RCU_NOCB_WAKE_BYPASS	1
+#define RCU_NOCB_WAKE		2
+#define RCU_NOCB_WAKE_FORCE	3
 
 #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
 					/* For jiffies_till_first_fqs and */
@@ -417,8 +417,8 @@ static void rcu_initiate_boost(struct rcu_node *rnp, unsigned long flags);
 static void rcu_preempt_boost_start_gp(struct rcu_node *rnp);
 static bool rcu_is_callbacks_kthread(void);
 static void rcu_cpu_kthread_setup(unsigned int cpu);
+static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp);
 static void __init rcu_spawn_boost_kthreads(void);
-static void rcu_prepare_kthreads(int cpu);
 static void rcu_cleanup_after_idle(void);
 static void rcu_prepare_for_idle(void);
 static bool rcu_preempt_has_tasks(struct rcu_node *rnp);
@@ -434,7 +434,7 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
 				bool *was_alldone, unsigned long flags);
 static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
 				 unsigned long flags);
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp);
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
 static bool do_nocb_deferred_wakeup(struct rcu_data *rdp);
 static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
 static void rcu_spawn_cpu_nocb_kthread(int cpu);
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 4d6962048c30..de1dc3bb7f70 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -33,10 +33,6 @@ static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
 	return false;
 }
 
-static inline bool rcu_running_nocb_timer(struct rcu_data *rdp)
-{
-	return (timer_curr_running(&rdp->nocb_timer) && !in_irq());
-}
 #else
 static inline int rcu_lockdep_is_held_nocb(struct rcu_data *rdp)
 {
@@ -48,11 +44,6 @@ static inline bool rcu_current_is_nocb_kthread(struct rcu_data *rdp)
 	return false;
 }
 
-static inline bool rcu_running_nocb_timer(struct rcu_data *rdp)
-{
-	return false;
-}
-
 #endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 
 static bool rcu_rdp_is_offloaded(struct rcu_data *rdp)
@@ -72,8 +63,7 @@ static bool rcu_rdp_is_offloaded(struct rcu_data *rdp)
 		  rcu_lockdep_is_held_nocb(rdp) ||
 		  (rdp == this_cpu_ptr(&rcu_data) &&
 		   !(IS_ENABLED(CONFIG_PREEMPT_COUNT) && preemptible())) ||
-		  rcu_current_is_nocb_kthread(rdp) ||
-		  rcu_running_nocb_timer(rdp)),
+		  rcu_current_is_nocb_kthread(rdp)),
 		"Unsafe read of RCU_NOCB offloaded state"
 	);
 
@@ -1098,6 +1088,7 @@ static int rcu_boost(struct rcu_node *rnp)
 	/* Lock only for side effect: boosts task t's priority. */
 	rt_mutex_lock(&rnp->boost_mtx);
 	rt_mutex_unlock(&rnp->boost_mtx);  /* Then keep lockdep happy. */
+	rnp->n_boosts++;
 
 	return READ_ONCE(rnp->exp_tasks) != NULL ||
 	       READ_ONCE(rnp->boost_tasks) != NULL;
@@ -1197,22 +1188,16 @@ static void rcu_preempt_boost_start_gp(struct rcu_node *rnp)
  */
 static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp)
 {
-	int rnp_index = rnp - rcu_get_root();
 	unsigned long flags;
+	int rnp_index = rnp - rcu_get_root();
 	struct sched_param sp;
 	struct task_struct *t;
 
-	if (!IS_ENABLED(CONFIG_PREEMPT_RCU))
-		return;
-
-	if (!rcu_scheduler_fully_active || rcu_rnp_online_cpus(rnp) == 0)
+	if (rnp->boost_kthread_task || !rcu_scheduler_fully_active)
 		return;
 
 	rcu_state.boost = 1;
 
-	if (rnp->boost_kthread_task != NULL)
-		return;
-
 	t = kthread_create(rcu_boost_kthread, (void *)rnp,
 			   "rcub/%d", rnp_index);
 	if (WARN_ON_ONCE(IS_ERR(t)))
@@ -1264,17 +1249,8 @@ static void __init rcu_spawn_boost_kthreads(void)
 	struct rcu_node *rnp;
 
 	rcu_for_each_leaf_node(rnp)
-		rcu_spawn_one_boost_kthread(rnp);
-}
-
-static void rcu_prepare_kthreads(int cpu)
-{
-	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
-	struct rcu_node *rnp = rdp->mynode;
-
-	/* Fire up the incoming CPU's kthread and leaf rcu_node kthread. */
-	if (rcu_scheduler_fully_active)
-		rcu_spawn_one_boost_kthread(rnp);
+		if (rcu_rnp_online_cpus(rnp))
+			rcu_spawn_one_boost_kthread(rnp);
 }
 
 #else /* #ifdef CONFIG_RCU_BOOST */
@@ -1294,15 +1270,15 @@ static void rcu_preempt_boost_start_gp(struct rcu_node *rnp)
 {
 }
 
-static void rcu_boost_kthread_setaffinity(struct rcu_node *rnp, int outgoingcpu)
+static void rcu_spawn_one_boost_kthread(struct rcu_node *rnp)
 {
 }
 
-static void __init rcu_spawn_boost_kthreads(void)
+static void rcu_boost_kthread_setaffinity(struct rcu_node *rnp, int outgoingcpu)
 {
 }
 
-static void rcu_prepare_kthreads(int cpu)
+static void __init rcu_spawn_boost_kthreads(void)
 {
 }
 
@@ -1535,13 +1511,10 @@ static void rcu_cleanup_after_idle(void)
 static int __init rcu_nocb_setup(char *str)
 {
 	alloc_bootmem_cpumask_var(&rcu_nocb_mask);
-	if (!strcasecmp(str, "all"))		/* legacy: use "0-N" instead */
+	if (cpulist_parse(str, rcu_nocb_mask)) {
+		pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
 		cpumask_setall(rcu_nocb_mask);
-	else
-		if (cpulist_parse(str, rcu_nocb_mask)) {
-			pr_warn("rcu_nocbs= bad CPU range, all CPUs set\n");
-			cpumask_setall(rcu_nocb_mask);
-		}
+	}
 	return 1;
 }
 __setup("rcu_nocbs=", rcu_nocb_setup);
@@ -1692,56 +1665,78 @@ bool rcu_is_nocb_cpu(int cpu)
 	return false;
 }
 
-/*
- * Kick the GP kthread for this NOCB group.  Caller holds ->nocb_lock
- * and this function releases it.
- */
-static bool wake_nocb_gp(struct rcu_data *rdp, bool force,
-			 unsigned long flags)
-	__releases(rdp->nocb_lock)
+static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
+			   struct rcu_data *rdp,
+			   bool force, unsigned long flags)
+	__releases(rdp_gp->nocb_gp_lock)
 {
 	bool needwake = false;
-	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
 
-	lockdep_assert_held(&rdp->nocb_lock);
 	if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
-		rcu_nocb_unlock_irqrestore(rdp, flags);
+		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
 		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
 				    TPS("AlreadyAwake"));
 		return false;
 	}
 
-	if (READ_ONCE(rdp->nocb_defer_wakeup) > RCU_NOCB_WAKE_NOT) {
-		WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
-		del_timer(&rdp->nocb_timer);
+	if (rdp_gp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+		del_timer(&rdp_gp->nocb_timer);
 	}
-	rcu_nocb_unlock_irqrestore(rdp, flags);
-	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+
 	if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
 		WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
 		needwake = true;
-		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
 	}
 	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
-	if (needwake)
+	if (needwake) {
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
 		wake_up_process(rdp_gp->nocb_gp_kthread);
+	}
 
 	return needwake;
 }
 
 /*
+ * Kick the GP kthread for this NOCB group.
+ */
+static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
+{
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	return __wake_nocb_gp(rdp_gp, rdp, force, flags);
+}
+
+/*
  * Arrange to wake the GP kthread for this NOCB group at some future
  * time when it is safe to do so.
  */
 static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
 			       const char *reason)
 {
-	if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_OFF)
-		return;
-	if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
-		mod_timer(&rdp->nocb_timer, jiffies + 1);
-	if (rdp->nocb_defer_wakeup < waketype)
-		WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+
+	/*
+	 * Bypass wakeup overrides previous deferments. In case
+	 * of callback storm, no need to wake up too early.
+	 */
+	if (waketype == RCU_NOCB_WAKE_BYPASS) {
+		mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
+		WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+	} else {
+		if (rdp_gp->nocb_defer_wakeup < RCU_NOCB_WAKE)
+			mod_timer(&rdp_gp->nocb_timer, jiffies + 1);
+		if (rdp_gp->nocb_defer_wakeup < waketype)
+			WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
+	}
+
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
 	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
 }
 
@@ -1940,7 +1935,7 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
 }
 
 /*
- * Awaken the no-CBs grace-period kthead if needed, either due to it
+ * Awaken the no-CBs grace-period kthread if needed, either due to it
  * legitimately being asleep or due to overload conditions.
  *
  * If warranted, also wake up the kthread servicing this CPUs queues.
@@ -1968,13 +1963,14 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
 		rdp->qlen_last_fqs_check = len;
 		if (!irqs_disabled_flags(flags)) {
 			/* ... if queue was empty ... */
-			wake_nocb_gp(rdp, false, flags);
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			wake_nocb_gp(rdp, false);
 			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
 					    TPS("WakeEmpty"));
 		} else {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
 			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
 					   TPS("WakeEmptyIsDeferred"));
-			rcu_nocb_unlock_irqrestore(rdp, flags);
 		}
 	} else if (len > rdp->qlen_last_fqs_check + qhimark) {
 		/* ... or if many callbacks queued. */
@@ -1989,10 +1985,14 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
 		smp_mb(); /* Enqueue before timer_pending(). */
 		if ((rdp->nocb_cb_sleep ||
 		     !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
-		    !timer_pending(&rdp->nocb_bypass_timer))
+		    !timer_pending(&rdp->nocb_timer)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
 			wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
 					   TPS("WakeOvfIsDeferred"));
-		rcu_nocb_unlock_irqrestore(rdp, flags);
+		} else {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
+		}
 	} else {
 		rcu_nocb_unlock_irqrestore(rdp, flags);
 		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
@@ -2000,18 +2000,6 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
 	return;
 }
 
-/* Wake up the no-CBs GP kthread to flush ->nocb_bypass. */
-static void do_nocb_bypass_wakeup_timer(struct timer_list *t)
-{
-	unsigned long flags;
-	struct rcu_data *rdp = from_timer(rdp, t, nocb_bypass_timer);
-
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
-	rcu_nocb_lock_irqsave(rdp, flags);
-	smp_mb__after_spinlock(); /* Timer expire before wakeup. */
-	__call_rcu_nocb_wake(rdp, true, flags);
-}
-
 /*
  * Check if we ignore this rdp.
  *
@@ -2118,11 +2106,7 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
 			bypass = true;
 		}
 		rnp = rdp->mynode;
-		if (bypass) {  // Avoid race with first bypass CB.
-			WRITE_ONCE(my_rdp->nocb_defer_wakeup,
-				   RCU_NOCB_WAKE_NOT);
-			del_timer(&my_rdp->nocb_timer);
-		}
+
 		// Advance callbacks if helpful and low contention.
 		needwake_gp = false;
 		if (!rcu_segcblist_restempty(&rdp->cblist,
@@ -2168,12 +2152,12 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
 	my_rdp->nocb_gp_bypass = bypass;
 	my_rdp->nocb_gp_gp = needwait_gp;
 	my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
+
 	if (bypass && !rcu_nocb_poll) {
 		// At least one child with non-empty ->nocb_bypass, so set
 		// timer in order to avoid stranding its callbacks.
-		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
-		mod_timer(&my_rdp->nocb_bypass_timer, j + 2);
-		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
+		wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
+				   TPS("WakeBypassIsDeferred"));
 	}
 	if (rcu_nocb_poll) {
 		/* Polling, so trace if first poll in the series. */
@@ -2197,8 +2181,10 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
 	}
 	if (!rcu_nocb_poll) {
 		raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
-		if (bypass)
-			del_timer(&my_rdp->nocb_bypass_timer);
+		if (my_rdp->nocb_defer_wakeup > RCU_NOCB_WAKE_NOT) {
+			WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+			del_timer(&my_rdp->nocb_timer);
+		}
 		WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
 		raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
 	}
@@ -2334,25 +2320,27 @@ static int rcu_nocb_cb_kthread(void *arg)
 }
 
 /* Is a deferred wakeup of rcu_nocb_kthread() required? */
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
 {
-	return READ_ONCE(rdp->nocb_defer_wakeup) > RCU_NOCB_WAKE_NOT;
+	return READ_ONCE(rdp->nocb_defer_wakeup) >= level;
 }
 
 /* Do a deferred wakeup of rcu_nocb_kthread(). */
-static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
+static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp,
+					   struct rcu_data *rdp, int level,
+					   unsigned long flags)
+	__releases(rdp_gp->nocb_gp_lock)
 {
-	unsigned long flags;
 	int ndw;
 	int ret;
 
-	rcu_nocb_lock_irqsave(rdp, flags);
-	if (!rcu_nocb_need_deferred_wakeup(rdp)) {
-		rcu_nocb_unlock_irqrestore(rdp, flags);
+	if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
+		raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
 		return false;
 	}
-	ndw = READ_ONCE(rdp->nocb_defer_wakeup);
-	ret = wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
+
+	ndw = rdp_gp->nocb_defer_wakeup;
+	ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
 	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
 
 	return ret;
@@ -2361,9 +2349,15 @@ static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
 /* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
 static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
 {
+	unsigned long flags;
 	struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);
 
-	do_nocb_deferred_wakeup_common(rdp);
+	WARN_ON_ONCE(rdp->nocb_gp_rdp != rdp);
+	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
+
+	raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
+	smp_mb__after_spinlock(); /* Timer expire before wakeup. */
+	do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
 }
 
 /*
@@ -2373,9 +2367,14 @@ static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
  */
 static bool do_nocb_deferred_wakeup(struct rcu_data *rdp)
 {
-	if (rcu_nocb_need_deferred_wakeup(rdp))
-		return do_nocb_deferred_wakeup_common(rdp);
-	return false;
+	unsigned long flags;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+
+	if (!rdp_gp || !rcu_nocb_need_deferred_wakeup(rdp_gp, RCU_NOCB_WAKE))
+		return false;
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	return do_nocb_deferred_wakeup_common(rdp_gp, rdp, RCU_NOCB_WAKE, flags);
 }
 
 void rcu_nocb_flush_deferred_wakeup(void)
@@ -2443,17 +2442,15 @@ static long rcu_nocb_rdp_deoffload(void *arg)
 	swait_event_exclusive(rdp->nocb_state_wq,
 			      !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
 							SEGCBLIST_KTHREAD_GP));
-	rcu_nocb_lock_irqsave(rdp, flags);
-	/* Make sure nocb timer won't stay around */
-	WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_OFF);
-	rcu_nocb_unlock_irqrestore(rdp, flags);
-	del_timer_sync(&rdp->nocb_timer);
-
 	/*
-	 * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY with CB unlocked
-	 * and IRQs disabled but let's be paranoid.
+	 * Lock one last time to acquire latest callback updates from kthreads
+	 * so we can later handle callbacks locally without locking.
 	 */
 	rcu_nocb_lock_irqsave(rdp, flags);
+	/*
+	 * Theoretically we could set SEGCBLIST_SOFTIRQ_ONLY after the nocb
+	 * lock is released but how about being paranoid for once?
+	 */
 	rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
 	/*
 	 * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
@@ -2473,10 +2470,6 @@ int rcu_nocb_cpu_deoffload(int cpu)
 	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 	int ret = 0;
 
-	if (rdp == rdp->nocb_gp_rdp) {
-		pr_info("Can't deoffload an rdp GP leader (yet)\n");
-		return -EINVAL;
-	}
 	mutex_lock(&rcu_state.barrier_mutex);
 	cpus_read_lock();
 	if (rcu_rdp_is_offloaded(rdp)) {
@@ -2517,8 +2510,7 @@ static long rcu_nocb_rdp_offload(void *arg)
 	 * SEGCBLIST_SOFTIRQ_ONLY mode.
 	 */
 	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
-	/* Re-enable nocb timer */
-	WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+
 	/*
 	 * We didn't take the nocb lock while working on the
 	 * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
@@ -2626,7 +2618,6 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
 	raw_spin_lock_init(&rdp->nocb_bypass_lock);
 	raw_spin_lock_init(&rdp->nocb_gp_lock);
 	timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
-	timer_setup(&rdp->nocb_bypass_timer, do_nocb_bypass_wakeup_timer, 0);
 	rcu_cblist_init(&rdp->nocb_bypass);
 }
 
@@ -2785,13 +2776,12 @@ static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
 {
 	struct rcu_node *rnp = rdp->mynode;
 
-	pr_info("nocb GP %d %c%c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
+	pr_info("nocb GP %d %c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
 		rdp->cpu,
 		"kK"[!!rdp->nocb_gp_kthread],
 		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
 		"dD"[!!rdp->nocb_defer_wakeup],
 		"tT"[timer_pending(&rdp->nocb_timer)],
-		"bB"[timer_pending(&rdp->nocb_bypass_timer)],
 		"sS"[!!rdp->nocb_gp_sleep],
 		".W"[swait_active(&rdp->nocb_gp_wq)],
 		".W"[swait_active(&rnp->nocb_gp_wq[0])],
@@ -2812,7 +2802,6 @@ static void show_rcu_nocb_state(struct rcu_data *rdp)
 	char bufr[20];
 	struct rcu_segcblist *rsclp = &rdp->cblist;
 	bool waslocked;
-	bool wastimer;
 	bool wassleep;
 
 	if (rdp->nocb_gp_rdp == rdp)
@@ -2849,15 +2838,13 @@ static void show_rcu_nocb_state(struct rcu_data *rdp)
 		return;
 
 	waslocked = raw_spin_is_locked(&rdp->nocb_gp_lock);
-	wastimer = timer_pending(&rdp->nocb_bypass_timer);
 	wassleep = swait_active(&rdp->nocb_gp_wq);
-	if (!rdp->nocb_gp_sleep && !waslocked && !wastimer && !wassleep)
-		return;  /* Nothing untowards. */
+	if (!rdp->nocb_gp_sleep && !waslocked && !wassleep)
+		return;  /* Nothing untoward. */
 
-	pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c%c %c\n",
+	pr_info("   nocb GP activity on CB-only CPU!!! %c%c%c %c\n",
 		"lL"[waslocked],
 		"dD"[!!rdp->nocb_defer_wakeup],
-		"tT"[wastimer],
 		"sS"[!!rdp->nocb_gp_sleep],
 		".W"[wassleep]);
 }
@@ -2922,7 +2909,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
 {
 }
 
-static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
+static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level)
 {
 	return false;
 }
diff --git a/kernel/rcu/tree_stall.h b/kernel/rcu/tree_stall.h
index acb2288063b5..3f937b20814f 100644
--- a/kernel/rcu/tree_stall.h
+++ b/kernel/rcu/tree_stall.h
@@ -314,6 +314,7 @@ static void rcu_print_detail_task_stall_rnp(struct rcu_node *rnp)
  * tasks blocked within RCU read-side critical sections.
  */
 static int rcu_print_task_stall(struct rcu_node *rnp, unsigned long flags)
+	__releases(rnp->lock)
 {
 	raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 	return 0;
@@ -717,6 +718,63 @@ static void check_cpu_stall(struct rcu_data *rdp)
 
 
 /*
+ * Check to see if a failure to end RCU priority inversion was due to
+ * a CPU not passing through a quiescent state.  When this happens, there
+ * is nothing that RCU priority boosting can do to help, so we shouldn't
+ * count this as an RCU priority boosting failure.  A return of true says
+ * RCU priority boosting is to blame, and false says otherwise.  If false
+ * is returned, the first of the CPUs to blame is stored through cpup.
+ * If there was no CPU blocking the current grace period, but also nothing
+ * in need of being boosted, *cpup is set to -1.  This can happen in case
+ * of vCPU preemption while the last CPU is reporting its quiscent state,
+ * for example.
+ *
+ * If cpup is NULL, then a lockless quick check is carried out, suitable
+ * for high-rate usage.  On the other hand, if cpup is non-NULL, each
+ * rcu_node structure's ->lock is acquired, ruling out high-rate usage.
+ */
+bool rcu_check_boost_fail(unsigned long gp_state, int *cpup)
+{
+	bool atb = false;
+	int cpu;
+	unsigned long flags;
+	struct rcu_node *rnp;
+
+	rcu_for_each_leaf_node(rnp) {
+		if (!cpup) {
+			if (READ_ONCE(rnp->qsmask)) {
+				return false;
+			} else {
+				if (READ_ONCE(rnp->gp_tasks))
+					atb = true;
+				continue;
+			}
+		}
+		*cpup = -1;
+		raw_spin_lock_irqsave_rcu_node(rnp, flags);
+		if (rnp->gp_tasks)
+			atb = true;
+		if (!rnp->qsmask) {
+			// No CPUs without quiescent states for this rnp.
+			raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
+			continue;
+		}
+		// Find the first holdout CPU.
+		for_each_leaf_node_possible_cpu(rnp, cpu) {
+			if (rnp->qsmask & (1UL << (cpu - rnp->grplo))) {
+				raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
+				*cpup = cpu;
+				return false;
+			}
+		}
+		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
+	}
+	// Can't blame CPUs, so must blame RCU priority boosting.
+	return atb;
+}
+EXPORT_SYMBOL_GPL(rcu_check_boost_fail);
+
+/*
  * Show the state of the grace-period kthreads.
  */
 void show_rcu_gp_kthreads(void)
@@ -726,6 +784,7 @@ void show_rcu_gp_kthreads(void)
 	unsigned long j;
 	unsigned long ja;
 	unsigned long jr;
+	unsigned long js;
 	unsigned long jw;
 	struct rcu_data *rdp;
 	struct rcu_node *rnp;
@@ -734,21 +793,30 @@ void show_rcu_gp_kthreads(void)
 	j = jiffies;
 	ja = j - data_race(rcu_state.gp_activity);
 	jr = j - data_race(rcu_state.gp_req_activity);
+	js = j - data_race(rcu_state.gp_start);
 	jw = j - data_race(rcu_state.gp_wake_time);
-	pr_info("%s: wait state: %s(%d) ->state: %#x delta ->gp_activity %lu ->gp_req_activity %lu ->gp_wake_time %lu ->gp_wake_seq %ld ->gp_seq %ld ->gp_seq_needed %ld ->gp_flags %#x\n",
+	pr_info("%s: wait state: %s(%d) ->state: %#lx ->rt_priority %u delta ->gp_start %lu ->gp_activity %lu ->gp_req_activity %lu ->gp_wake_time %lu ->gp_wake_seq %ld ->gp_seq %ld ->gp_seq_needed %ld ->gp_max %lu ->gp_flags %#x\n",
 		rcu_state.name, gp_state_getname(rcu_state.gp_state),
-		rcu_state.gp_state, t ? t->__state : 0x1ffff,
-		ja, jr, jw, (long)data_race(rcu_state.gp_wake_seq),
+		rcu_state.gp_state, t ? t->__state : 0x1ffffL, t ? t->rt_priority : 0xffU,
+		js, ja, jr, jw, (long)data_race(rcu_state.gp_wake_seq),
 		(long)data_race(rcu_state.gp_seq),
 		(long)data_race(rcu_get_root()->gp_seq_needed),
+		data_race(rcu_state.gp_max),
 		data_race(rcu_state.gp_flags));
 	rcu_for_each_node_breadth_first(rnp) {
-		if (ULONG_CMP_GE(READ_ONCE(rcu_state.gp_seq),
-				 READ_ONCE(rnp->gp_seq_needed)))
+		if (ULONG_CMP_GE(READ_ONCE(rcu_state.gp_seq), READ_ONCE(rnp->gp_seq_needed)) &&
+		    !data_race(rnp->qsmask) && !data_race(rnp->boost_tasks) &&
+		    !data_race(rnp->exp_tasks) && !data_race(rnp->gp_tasks))
 			continue;
-		pr_info("\trcu_node %d:%d ->gp_seq %ld ->gp_seq_needed %ld\n",
-			rnp->grplo, rnp->grphi, (long)data_race(rnp->gp_seq),
-			(long)data_race(rnp->gp_seq_needed));
+		pr_info("\trcu_node %d:%d ->gp_seq %ld ->gp_seq_needed %ld ->qsmask %#lx %c%c%c%c ->n_boosts %ld\n",
+			rnp->grplo, rnp->grphi,
+			(long)data_race(rnp->gp_seq), (long)data_race(rnp->gp_seq_needed),
+			data_race(rnp->qsmask),
+			".b"[!!data_race(rnp->boost_kthread_task)],
+			".B"[!!data_race(rnp->boost_tasks)],
+			".E"[!!data_race(rnp->exp_tasks)],
+			".G"[!!data_race(rnp->gp_tasks)],
+			data_race(rnp->n_boosts));
 		if (!rcu_is_leaf_node(rnp))
 			continue;
 		for_each_leaf_node_possible_cpu(rnp, cpu) {
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index b95ae86c40a7..c21b38cc25e9 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -277,7 +277,7 @@ EXPORT_SYMBOL_GPL(rcu_callback_map);
 
 noinstr int notrace debug_lockdep_rcu_enabled(void)
 {
-	return rcu_scheduler_active != RCU_SCHEDULER_INACTIVE && debug_locks &&
+	return rcu_scheduler_active != RCU_SCHEDULER_INACTIVE && READ_ONCE(debug_locks) &&
 	       current->lockdep_recursion == 0;
 }
 EXPORT_SYMBOL_GPL(debug_lockdep_rcu_enabled);
@@ -524,6 +524,7 @@ static void test_callback(struct rcu_head *r)
 }
 
 DEFINE_STATIC_SRCU(early_srcu);
+static unsigned long early_srcu_cookie;
 
 struct early_boot_kfree_rcu {
 	struct rcu_head rh;
@@ -536,8 +537,10 @@ static void early_boot_test_call_rcu(void)
 	struct early_boot_kfree_rcu *rhp;
 
 	call_rcu(&head, test_callback);
-	if (IS_ENABLED(CONFIG_SRCU))
+	if (IS_ENABLED(CONFIG_SRCU)) {
+		early_srcu_cookie = start_poll_synchronize_srcu(&early_srcu);
 		call_srcu(&early_srcu, &shead, test_callback);
+	}
 	rhp = kmalloc(sizeof(*rhp), GFP_KERNEL);
 	if (!WARN_ON_ONCE(!rhp))
 		kfree_rcu(rhp, rh);
@@ -563,6 +566,7 @@ static int rcu_verify_early_boot_tests(void)
 		if (IS_ENABLED(CONFIG_SRCU)) {
 			early_boot_test_counter++;
 			srcu_barrier(&early_srcu);
+			WARN_ON_ONCE(!poll_state_synchronize_srcu(&early_srcu, early_srcu_cookie));
 		}
 	}
 	if (rcu_self_test_counter != early_boot_test_counter) {
diff --git a/kernel/time/timer.c b/kernel/time/timer.c
index 467087d7bdb6..3fadb58fc9d7 100644
--- a/kernel/time/timer.c
+++ b/kernel/time/timer.c
@@ -1237,20 +1237,6 @@ int try_to_del_timer_sync(struct timer_list *timer)
 }
 EXPORT_SYMBOL(try_to_del_timer_sync);
 
-bool timer_curr_running(struct timer_list *timer)
-{
-	int i;
-
-	for (i = 0; i < NR_BASES; i++) {
-		struct timer_base *base = this_cpu_ptr(&timer_bases[i]);
-
-		if (base->running_timer == timer)
-			return true;
-	}
-
-	return false;
-}
-
 #ifdef CONFIG_PREEMPT_RT
 static __init void timer_base_init_expiry_lock(struct timer_base *base)
 {