summary refs log tree commit diff
path: root/kernel/rcu
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/rcu')
-rw-r--r--kernel/rcu/Kconfig5
-rw-r--r--kernel/rcu/rcu.h16
-rw-r--r--kernel/rcu/rcu_segcblist.c216
-rw-r--r--kernel/rcu/rcu_segcblist.h57
-rw-r--r--kernel/rcu/rcutorture.c395
-rw-r--r--kernel/rcu/refscale.c23
-rw-r--r--kernel/rcu/srcutiny.c77
-rw-r--r--kernel/rcu/srcutree.c147
-rw-r--r--kernel/rcu/tasks.h79
-rw-r--r--kernel/rcu/tree.c101
-rw-r--r--kernel/rcu/tree.h2
-rw-r--r--kernel/rcu/tree_exp.h2
-rw-r--r--kernel/rcu/tree_plugin.h367
-rw-r--r--kernel/rcu/tree_stall.h60
-rw-r--r--kernel/rcu/update.c4
15 files changed, 1318 insertions, 233 deletions
diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
index cdc57b4f6d48..3128b7cf8e1f 100644
--- a/kernel/rcu/Kconfig
+++ b/kernel/rcu/Kconfig
@@ -95,6 +95,7 @@ config TASKS_RUDE_RCU
 
 config TASKS_TRACE_RCU
 	def_bool 0
+	select IRQ_WORK
 	help
 	  This option enables a task-based RCU implementation that uses
 	  explicit rcu_read_lock_trace() read-side markers, and allows
@@ -188,8 +189,8 @@ config RCU_FAST_NO_HZ
 
 config RCU_BOOST
 	bool "Enable RCU priority boosting"
-	depends on RT_MUTEXES && PREEMPT_RCU && RCU_EXPERT
-	default n
+	depends on (RT_MUTEXES && PREEMPT_RCU && RCU_EXPERT) || PREEMPT_RT
+	default y if PREEMPT_RT
 	help
 	  This option boosts the priority of preempted RCU readers that
 	  block the current preemptible RCU grace period for too long.
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index 59ef1ae6dc37..bf0827d4b659 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -378,7 +378,11 @@ do {									\
 	smp_mb__after_unlock_lock();					\
 } while (0)
 
-#define raw_spin_unlock_rcu_node(p) raw_spin_unlock(&ACCESS_PRIVATE(p, lock))
+#define raw_spin_unlock_rcu_node(p)					\
+do {									\
+	lockdep_assert_irqs_disabled();					\
+	raw_spin_unlock(&ACCESS_PRIVATE(p, lock));			\
+} while (0)
 
 #define raw_spin_lock_irq_rcu_node(p)					\
 do {									\
@@ -387,7 +391,10 @@ do {									\
 } while (0)
 
 #define raw_spin_unlock_irq_rcu_node(p)					\
-	raw_spin_unlock_irq(&ACCESS_PRIVATE(p, lock))
+do {									\
+	lockdep_assert_irqs_disabled();					\
+	raw_spin_unlock_irq(&ACCESS_PRIVATE(p, lock));			\
+} while (0)
 
 #define raw_spin_lock_irqsave_rcu_node(p, flags)			\
 do {									\
@@ -396,7 +403,10 @@ do {									\
 } while (0)
 
 #define raw_spin_unlock_irqrestore_rcu_node(p, flags)			\
-	raw_spin_unlock_irqrestore(&ACCESS_PRIVATE(p, lock), flags)
+do {									\
+	lockdep_assert_irqs_disabled();					\
+	raw_spin_unlock_irqrestore(&ACCESS_PRIVATE(p, lock), flags);	\
+} while (0)
 
 #define raw_spin_trylock_rcu_node(p)					\
 ({									\
diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
index 2d2a6b6b9dfb..7f181c9675f7 100644
--- a/kernel/rcu/rcu_segcblist.c
+++ b/kernel/rcu/rcu_segcblist.c
@@ -7,10 +7,10 @@
  * Authors: Paul E. McKenney <paulmck@linux.ibm.com>
  */
 
-#include <linux/types.h>
-#include <linux/kernel.h>
+#include <linux/cpu.h>
 #include <linux/interrupt.h>
-#include <linux/rcupdate.h>
+#include <linux/kernel.h>
+#include <linux/types.h>
 
 #include "rcu_segcblist.h"
 
@@ -88,23 +88,135 @@ static void rcu_segcblist_set_len(struct rcu_segcblist *rsclp, long v)
 #endif
 }
 
+/* Get the length of a segment of the rcu_segcblist structure. */
+static long rcu_segcblist_get_seglen(struct rcu_segcblist *rsclp, int seg)
+{
+	return READ_ONCE(rsclp->seglen[seg]);
+}
+
+/* Return number of callbacks in segmented callback list by summing seglen. */
+long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp)
+{
+	long len = 0;
+	int i;
+
+	for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++)
+		len += rcu_segcblist_get_seglen(rsclp, i);
+
+	return len;
+}
+
+/* Set the length of a segment of the rcu_segcblist structure. */
+static void rcu_segcblist_set_seglen(struct rcu_segcblist *rsclp, int seg, long v)
+{
+	WRITE_ONCE(rsclp->seglen[seg], v);
+}
+
+/* Increase the numeric length of a segment by a specified amount. */
+static void rcu_segcblist_add_seglen(struct rcu_segcblist *rsclp, int seg, long v)
+{
+	WRITE_ONCE(rsclp->seglen[seg], rsclp->seglen[seg] + v);
+}
+
+/* Move from's segment length to to's segment. */
+static void rcu_segcblist_move_seglen(struct rcu_segcblist *rsclp, int from, int to)
+{
+	long len;
+
+	if (from == to)
+		return;
+
+	len = rcu_segcblist_get_seglen(rsclp, from);
+	if (!len)
+		return;
+
+	rcu_segcblist_add_seglen(rsclp, to, len);
+	rcu_segcblist_set_seglen(rsclp, from, 0);
+}
+
+/* Increment segment's length. */
+static void rcu_segcblist_inc_seglen(struct rcu_segcblist *rsclp, int seg)
+{
+	rcu_segcblist_add_seglen(rsclp, seg, 1);
+}
+
 /*
  * Increase the numeric length of an rcu_segcblist structure by the
  * specified amount, which can be negative.  This can cause the ->len
  * field to disagree with the actual number of callbacks on the structure.
  * This increase is fully ordered with respect to the callers accesses
  * both before and after.
+ *
+ * So why on earth is a memory barrier required both before and after
+ * the update to the ->len field???
+ *
+ * The reason is that rcu_barrier() locklessly samples each CPU's ->len
+ * field, and if a given CPU's field is zero, avoids IPIing that CPU.
+ * This can of course race with both queuing and invoking of callbacks.
+ * Failing to correctly handle either of these races could result in
+ * rcu_barrier() failing to IPI a CPU that actually had callbacks queued
+ * which rcu_barrier() was obligated to wait on.  And if rcu_barrier()
+ * failed to wait on such a callback, unloading certain kernel modules
+ * would result in calls to functions whose code was no longer present in
+ * the kernel, for but one example.
+ *
+ * Therefore, ->len transitions from 1->0 and 0->1 have to be carefully
+ * ordered with respect with both list modifications and the rcu_barrier().
+ *
+ * The queuing case is CASE 1 and the invoking case is CASE 2.
+ *
+ * CASE 1: Suppose that CPU 0 has no callbacks queued, but invokes
+ * call_rcu() just as CPU 1 invokes rcu_barrier().  CPU 0's ->len field
+ * will transition from 0->1, which is one of the transitions that must
+ * be handled carefully.  Without the full memory barriers after the ->len
+ * update and at the beginning of rcu_barrier(), the following could happen:
+ *
+ * CPU 0				CPU 1
+ *
+ * call_rcu().
+ *					rcu_barrier() sees ->len as 0.
+ * set ->len = 1.
+ *					rcu_barrier() does nothing.
+ *					module is unloaded.
+ * callback invokes unloaded function!
+ *
+ * With the full barriers, any case where rcu_barrier() sees ->len as 0 will
+ * have unambiguously preceded the return from the racing call_rcu(), which
+ * means that this call_rcu() invocation is OK to not wait on.  After all,
+ * you are supposed to make sure that any problematic call_rcu() invocations
+ * happen before the rcu_barrier().
+ *
+ *
+ * CASE 2: Suppose that CPU 0 is invoking its last callback just as
+ * CPU 1 invokes rcu_barrier().  CPU 0's ->len field will transition from
+ * 1->0, which is one of the transitions that must be handled carefully.
+ * Without the full memory barriers before the ->len update and at the
+ * end of rcu_barrier(), the following could happen:
+ *
+ * CPU 0				CPU 1
+ *
+ * start invoking last callback
+ * set ->len = 0 (reordered)
+ *					rcu_barrier() sees ->len as 0
+ *					rcu_barrier() does nothing.
+ *					module is unloaded
+ * callback executing after unloaded!
+ *
+ * With the full barriers, any case where rcu_barrier() sees ->len as 0
+ * will be fully ordered after the completion of the callback function,
+ * so that the module unloading operation is completely safe.
+ *
  */
-static void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v)
+void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v)
 {
 #ifdef CONFIG_RCU_NOCB_CPU
-	smp_mb__before_atomic(); /* Up to the caller! */
+	smp_mb__before_atomic(); // Read header comment above.
 	atomic_long_add(v, &rsclp->len);
-	smp_mb__after_atomic(); /* Up to the caller! */
+	smp_mb__after_atomic();  // Read header comment above.
 #else
-	smp_mb(); /* Up to the caller! */
+	smp_mb(); // Read header comment above.
 	WRITE_ONCE(rsclp->len, rsclp->len + v);
-	smp_mb(); /* Up to the caller! */
+	smp_mb(); // Read header comment above.
 #endif
 }
 
@@ -120,26 +232,6 @@ void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp)
 }
 
 /*
- * Exchange the numeric length of the specified rcu_segcblist structure
- * with the specified value.  This can cause the ->len field to disagree
- * with the actual number of callbacks on the structure.  This exchange is
- * fully ordered with respect to the callers accesses both before and after.
- */
-static long rcu_segcblist_xchg_len(struct rcu_segcblist *rsclp, long v)
-{
-#ifdef CONFIG_RCU_NOCB_CPU
-	return atomic_long_xchg(&rsclp->len, v);
-#else
-	long ret = rsclp->len;
-
-	smp_mb(); /* Up to the caller! */
-	WRITE_ONCE(rsclp->len, v);
-	smp_mb(); /* Up to the caller! */
-	return ret;
-#endif
-}
-
-/*
  * Initialize an rcu_segcblist structure.
  */
 void rcu_segcblist_init(struct rcu_segcblist *rsclp)
@@ -149,10 +241,12 @@ void rcu_segcblist_init(struct rcu_segcblist *rsclp)
 	BUILD_BUG_ON(RCU_NEXT_TAIL + 1 != ARRAY_SIZE(rsclp->gp_seq));
 	BUILD_BUG_ON(ARRAY_SIZE(rsclp->tails) != ARRAY_SIZE(rsclp->gp_seq));
 	rsclp->head = NULL;
-	for (i = 0; i < RCU_CBLIST_NSEGS; i++)
+	for (i = 0; i < RCU_CBLIST_NSEGS; i++) {
 		rsclp->tails[i] = &rsclp->head;
+		rcu_segcblist_set_seglen(rsclp, i, 0);
+	}
 	rcu_segcblist_set_len(rsclp, 0);
-	rsclp->enabled = 1;
+	rcu_segcblist_set_flags(rsclp, SEGCBLIST_ENABLED);
 }
 
 /*
@@ -163,16 +257,21 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
 {
 	WARN_ON_ONCE(!rcu_segcblist_empty(rsclp));
 	WARN_ON_ONCE(rcu_segcblist_n_cbs(rsclp));
-	rsclp->enabled = 0;
+	rcu_segcblist_clear_flags(rsclp, SEGCBLIST_ENABLED);
 }
 
 /*
  * Mark the specified rcu_segcblist structure as offloaded.  This
  * structure must be empty.
  */
-void rcu_segcblist_offload(struct rcu_segcblist *rsclp)
+void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload)
 {
-	rsclp->offloaded = 1;
+	if (offload) {
+		rcu_segcblist_clear_flags(rsclp, SEGCBLIST_SOFTIRQ_ONLY);
+		rcu_segcblist_set_flags(rsclp, SEGCBLIST_OFFLOADED);
+	} else {
+		rcu_segcblist_clear_flags(rsclp, SEGCBLIST_OFFLOADED);
+	}
 }
 
 /*
@@ -245,7 +344,7 @@ void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
 			   struct rcu_head *rhp)
 {
 	rcu_segcblist_inc_len(rsclp);
-	smp_mb(); /* Ensure counts are updated before callback is enqueued. */
+	rcu_segcblist_inc_seglen(rsclp, RCU_NEXT_TAIL);
 	rhp->next = NULL;
 	WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rhp);
 	WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], &rhp->next);
@@ -274,6 +373,7 @@ bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
 	for (i = RCU_NEXT_TAIL; i > RCU_DONE_TAIL; i--)
 		if (rsclp->tails[i] != rsclp->tails[i - 1])
 			break;
+	rcu_segcblist_inc_seglen(rsclp, i);
 	WRITE_ONCE(*rsclp->tails[i], rhp);
 	for (; i <= RCU_NEXT_TAIL; i++)
 		WRITE_ONCE(rsclp->tails[i], &rhp->next);
@@ -281,21 +381,6 @@ bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
 }
 
 /*
- * Extract only the counts from the specified rcu_segcblist structure,
- * and place them in the specified rcu_cblist structure.  This function
- * supports both callback orphaning and invocation, hence the separation
- * of counts and callbacks.  (Callbacks ready for invocation must be
- * orphaned and adopted separately from pending callbacks, but counts
- * apply to all callbacks.  Locking must be used to make sure that
- * both orphaned-callbacks lists are consistent.)
- */
-void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp,
-					       struct rcu_cblist *rclp)
-{
-	rclp->len = rcu_segcblist_xchg_len(rsclp, 0);
-}
-
-/*
  * Extract only those callbacks ready to be invoked from the specified
  * rcu_segcblist structure and place them in the specified rcu_cblist
  * structure.
@@ -307,6 +392,7 @@ void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
 
 	if (!rcu_segcblist_ready_cbs(rsclp))
 		return; /* Nothing to do. */
+	rclp->len = rcu_segcblist_get_seglen(rsclp, RCU_DONE_TAIL);
 	*rclp->tail = rsclp->head;
 	WRITE_ONCE(rsclp->head, *rsclp->tails[RCU_DONE_TAIL]);
 	WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL);
@@ -314,6 +400,7 @@ void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
 	for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--)
 		if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL])
 			WRITE_ONCE(rsclp->tails[i], &rsclp->head);
+	rcu_segcblist_set_seglen(rsclp, RCU_DONE_TAIL, 0);
 }
 
 /*
@@ -330,11 +417,15 @@ void rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp,
 
 	if (!rcu_segcblist_pend_cbs(rsclp))
 		return; /* Nothing to do. */
+	rclp->len = 0;
 	*rclp->tail = *rsclp->tails[RCU_DONE_TAIL];
 	rclp->tail = rsclp->tails[RCU_NEXT_TAIL];
 	WRITE_ONCE(*rsclp->tails[RCU_DONE_TAIL], NULL);
-	for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++)
+	for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++) {
+		rclp->len += rcu_segcblist_get_seglen(rsclp, i);
 		WRITE_ONCE(rsclp->tails[i], rsclp->tails[RCU_DONE_TAIL]);
+		rcu_segcblist_set_seglen(rsclp, i, 0);
+	}
 }
 
 /*
@@ -345,7 +436,6 @@ void rcu_segcblist_insert_count(struct rcu_segcblist *rsclp,
 				struct rcu_cblist *rclp)
 {
 	rcu_segcblist_add_len(rsclp, rclp->len);
-	rclp->len = 0;
 }
 
 /*
@@ -359,6 +449,7 @@ void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp,
 
 	if (!rclp->head)
 		return; /* No callbacks to move. */
+	rcu_segcblist_add_seglen(rsclp, RCU_DONE_TAIL, rclp->len);
 	*rclp->tail = rsclp->head;
 	WRITE_ONCE(rsclp->head, rclp->head);
 	for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++)
@@ -379,6 +470,8 @@ void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp,
 {
 	if (!rclp->head)
 		return; /* Nothing to do. */
+
+	rcu_segcblist_add_seglen(rsclp, RCU_NEXT_TAIL, rclp->len);
 	WRITE_ONCE(*rsclp->tails[RCU_NEXT_TAIL], rclp->head);
 	WRITE_ONCE(rsclp->tails[RCU_NEXT_TAIL], rclp->tail);
 }
@@ -403,6 +496,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
 		if (ULONG_CMP_LT(seq, rsclp->gp_seq[i]))
 			break;
 		WRITE_ONCE(rsclp->tails[RCU_DONE_TAIL], rsclp->tails[i]);
+		rcu_segcblist_move_seglen(rsclp, i, RCU_DONE_TAIL);
 	}
 
 	/* If no callbacks moved, nothing more need be done. */
@@ -423,6 +517,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
 		if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL])
 			break;  /* No more callbacks. */
 		WRITE_ONCE(rsclp->tails[j], rsclp->tails[i]);
+		rcu_segcblist_move_seglen(rsclp, i, j);
 		rsclp->gp_seq[j] = rsclp->gp_seq[i];
 	}
 }
@@ -444,7 +539,7 @@ void rcu_segcblist_advance(struct rcu_segcblist *rsclp, unsigned long seq)
  */
 bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, unsigned long seq)
 {
-	int i;
+	int i, j;
 
 	WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
 	if (rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL))
@@ -487,6 +582,10 @@ bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp, unsigned long seq)
 	if (rcu_segcblist_restempty(rsclp, i) || ++i >= RCU_NEXT_TAIL)
 		return false;
 
+	/* Accounting: everything below i is about to get merged into i. */
+	for (j = i + 1; j <= RCU_NEXT_TAIL; j++)
+		rcu_segcblist_move_seglen(rsclp, j, i);
+
 	/*
 	 * Merge all later callbacks, including newly arrived callbacks,
 	 * into the segment located by the for-loop above.  Assign "seq"
@@ -514,13 +613,24 @@ void rcu_segcblist_merge(struct rcu_segcblist *dst_rsclp,
 	struct rcu_cblist donecbs;
 	struct rcu_cblist pendcbs;
 
+	lockdep_assert_cpus_held();
+
 	rcu_cblist_init(&donecbs);
 	rcu_cblist_init(&pendcbs);
-	rcu_segcblist_extract_count(src_rsclp, &donecbs);
+
 	rcu_segcblist_extract_done_cbs(src_rsclp, &donecbs);
 	rcu_segcblist_extract_pend_cbs(src_rsclp, &pendcbs);
+
+	/*
+	 * No need smp_mb() before setting length to 0, because CPU hotplug
+	 * lock excludes rcu_barrier.
+	 */
+	rcu_segcblist_set_len(src_rsclp, 0);
+
 	rcu_segcblist_insert_count(dst_rsclp, &donecbs);
+	rcu_segcblist_insert_count(dst_rsclp, &pendcbs);
 	rcu_segcblist_insert_done_cbs(dst_rsclp, &donecbs);
 	rcu_segcblist_insert_pend_cbs(dst_rsclp, &pendcbs);
+
 	rcu_segcblist_init(src_rsclp);
 }
diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
index 492262bcb591..9a19328ff251 100644
--- a/kernel/rcu/rcu_segcblist.h
+++ b/kernel/rcu/rcu_segcblist.h
@@ -15,6 +15,9 @@ static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
 	return READ_ONCE(rclp->len);
 }
 
+/* Return number of callbacks in segmented callback list by summing seglen. */
+long rcu_segcblist_n_segment_cbs(struct rcu_segcblist *rsclp);
+
 void rcu_cblist_init(struct rcu_cblist *rclp);
 void rcu_cblist_enqueue(struct rcu_cblist *rclp, struct rcu_head *rhp);
 void rcu_cblist_flush_enqueue(struct rcu_cblist *drclp,
@@ -50,19 +53,51 @@ static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp)
 #endif
 }
 
+static inline void rcu_segcblist_set_flags(struct rcu_segcblist *rsclp,
+					   int flags)
+{
+	rsclp->flags |= flags;
+}
+
+static inline void rcu_segcblist_clear_flags(struct rcu_segcblist *rsclp,
+					     int flags)
+{
+	rsclp->flags &= ~flags;
+}
+
+static inline bool rcu_segcblist_test_flags(struct rcu_segcblist *rsclp,
+					    int flags)
+{
+	return READ_ONCE(rsclp->flags) & flags;
+}
+
 /*
  * Is the specified rcu_segcblist enabled, for example, not corresponding
  * to an offline CPU?
  */
 static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
 {
-	return rsclp->enabled;
+	return rcu_segcblist_test_flags(rsclp, SEGCBLIST_ENABLED);
 }
 
-/* Is the specified rcu_segcblist offloaded?  */
+/* Is the specified rcu_segcblist offloaded, or is SEGCBLIST_SOFTIRQ_ONLY set? */
 static inline bool rcu_segcblist_is_offloaded(struct rcu_segcblist *rsclp)
 {
-	return IS_ENABLED(CONFIG_RCU_NOCB_CPU) && rsclp->offloaded;
+	if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
+	    !rcu_segcblist_test_flags(rsclp, SEGCBLIST_SOFTIRQ_ONLY))
+		return true;
+
+	return false;
+}
+
+static inline bool rcu_segcblist_completely_offloaded(struct rcu_segcblist *rsclp)
+{
+	int flags = SEGCBLIST_KTHREAD_CB | SEGCBLIST_KTHREAD_GP | SEGCBLIST_OFFLOADED;
+
+	if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) && (rsclp->flags & flags) == flags)
+		return true;
+
+	return false;
 }
 
 /*
@@ -75,10 +110,22 @@ static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg)
 	return !READ_ONCE(*READ_ONCE(rsclp->tails[seg]));
 }
 
+/*
+ * Is the specified segment of the specified rcu_segcblist structure
+ * empty of callbacks?
+ */
+static inline bool rcu_segcblist_segempty(struct rcu_segcblist *rsclp, int seg)
+{
+	if (seg == RCU_DONE_TAIL)
+		return &rsclp->head == rsclp->tails[RCU_DONE_TAIL];
+	return rsclp->tails[seg - 1] == rsclp->tails[seg];
+}
+
 void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp);
+void rcu_segcblist_add_len(struct rcu_segcblist *rsclp, long v);
 void rcu_segcblist_init(struct rcu_segcblist *rsclp);
 void rcu_segcblist_disable(struct rcu_segcblist *rsclp);
-void rcu_segcblist_offload(struct rcu_segcblist *rsclp);
+void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload);
 bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp);
 bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp);
 struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
@@ -88,8 +135,6 @@ void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
 			   struct rcu_head *rhp);
 bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp,
 			   struct rcu_head *rhp);
-void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp,
-				 struct rcu_cblist *rclp);
 void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
 				    struct rcu_cblist *rclp);
 void rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp,
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 528ed10b78fd..99657ffa6688 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -85,6 +85,7 @@ torture_param(bool, gp_cond, false, "Use conditional/async GP wait primitives");
 torture_param(bool, gp_exp, false, "Use expedited GP wait primitives");
 torture_param(bool, gp_normal, false,
 	     "Use normal (non-expedited) GP wait primitives");
+torture_param(bool, gp_poll, false, "Use polling GP wait primitives");
 torture_param(bool, gp_sync, false, "Use synchronous GP wait primitives");
 torture_param(int, irqreader, 1, "Allow RCU readers from irq handlers");
 torture_param(int, leakpointer, 0, "Leak pointer dereferences from readers");
@@ -97,6 +98,8 @@ torture_param(int, object_debug, 0,
 torture_param(int, onoff_holdoff, 0, "Time after boot before CPU hotplugs (s)");
 torture_param(int, onoff_interval, 0,
 	     "Time between CPU hotplugs (jiffies), 0=disable");
+torture_param(int, nocbs_nthreads, 0, "Number of NOCB toggle threads, 0 to disable");
+torture_param(int, nocbs_toggle, 1000, "Time between toggling nocb state (ms)");
 torture_param(int, read_exit_delay, 13,
 	      "Delay between read-then-exit episodes (s)");
 torture_param(int, read_exit_burst, 16,
@@ -127,10 +130,12 @@ static char *torture_type = "rcu";
 module_param(torture_type, charp, 0444);
 MODULE_PARM_DESC(torture_type, "Type of RCU to torture (rcu, srcu, ...)");
 
+static int nrealnocbers;
 static int nrealreaders;
 static struct task_struct *writer_task;
 static struct task_struct **fakewriter_tasks;
 static struct task_struct **reader_tasks;
+static struct task_struct **nocb_tasks;
 static struct task_struct *stats_task;
 static struct task_struct *fqs_task;
 static struct task_struct *boost_tasks[NR_CPUS];
@@ -142,11 +147,22 @@ static struct task_struct *read_exit_task;
 
 #define RCU_TORTURE_PIPE_LEN 10
 
+// Mailbox-like structure to check RCU global memory ordering.
+struct rcu_torture_reader_check {
+	unsigned long rtc_myloops;
+	int rtc_chkrdr;
+	unsigned long rtc_chkloops;
+	int rtc_ready;
+	struct rcu_torture_reader_check *rtc_assigner;
+} ____cacheline_internodealigned_in_smp;
+
+// Update-side data structure used to check RCU readers.
 struct rcu_torture {
 	struct rcu_head rtort_rcu;
 	int rtort_pipe_count;
 	struct list_head rtort_free;
 	int rtort_mbtest;
+	struct rcu_torture_reader_check *rtort_chkp;
 };
 
 static LIST_HEAD(rcu_torture_freelist);
@@ -157,10 +173,13 @@ static DEFINE_SPINLOCK(rcu_torture_lock);
 static DEFINE_PER_CPU(long [RCU_TORTURE_PIPE_LEN + 1], rcu_torture_count);
 static DEFINE_PER_CPU(long [RCU_TORTURE_PIPE_LEN + 1], rcu_torture_batch);
 static atomic_t rcu_torture_wcount[RCU_TORTURE_PIPE_LEN + 1];
+static struct rcu_torture_reader_check *rcu_torture_reader_mbchk;
 static atomic_t n_rcu_torture_alloc;
 static atomic_t n_rcu_torture_alloc_fail;
 static atomic_t n_rcu_torture_free;
 static atomic_t n_rcu_torture_mberror;
+static atomic_t n_rcu_torture_mbchk_fail;
+static atomic_t n_rcu_torture_mbchk_tries;
 static atomic_t n_rcu_torture_error;
 static long n_rcu_torture_barrier_error;
 static long n_rcu_torture_boost_ktrerror;
@@ -174,6 +193,8 @@ static unsigned long n_read_exits;
 static struct list_head rcu_torture_removed;
 static unsigned long shutdown_jiffies;
 static unsigned long start_gp_seq;
+static atomic_long_t n_nocb_offload;
+static atomic_long_t n_nocb_deoffload;
 
 static int rcu_torture_writer_state;
 #define RTWS_FIXED_DELAY	0
@@ -183,9 +204,11 @@ static int rcu_torture_writer_state;
 #define RTWS_EXP_SYNC		4
 #define RTWS_COND_GET		5
 #define RTWS_COND_SYNC		6
-#define RTWS_SYNC		7
-#define RTWS_STUTTER		8
-#define RTWS_STOPPING		9
+#define RTWS_POLL_GET		7
+#define RTWS_POLL_WAIT		8
+#define RTWS_SYNC		9
+#define RTWS_STUTTER		10
+#define RTWS_STOPPING		11
 static const char * const rcu_torture_writer_state_names[] = {
 	"RTWS_FIXED_DELAY",
 	"RTWS_DELAY",
@@ -194,6 +217,8 @@ static const char * const rcu_torture_writer_state_names[] = {
 	"RTWS_EXP_SYNC",
 	"RTWS_COND_GET",
 	"RTWS_COND_SYNC",
+	"RTWS_POLL_GET",
+	"RTWS_POLL_WAIT",
 	"RTWS_SYNC",
 	"RTWS_STUTTER",
 	"RTWS_STOPPING",
@@ -311,7 +336,9 @@ struct rcu_torture_ops {
 	void (*deferred_free)(struct rcu_torture *p);
 	void (*sync)(void);
 	void (*exp_sync)(void);
-	unsigned long (*get_state)(void);
+	unsigned long (*get_gp_state)(void);
+	unsigned long (*start_gp_poll)(void);
+	bool (*poll_gp_state)(unsigned long oldstate);
 	void (*cond_sync)(unsigned long oldstate);
 	call_rcu_func_t call;
 	void (*cb_barrier)(void);
@@ -386,7 +413,12 @@ static bool
 rcu_torture_pipe_update_one(struct rcu_torture *rp)
 {
 	int i;
+	struct rcu_torture_reader_check *rtrcp = READ_ONCE(rp->rtort_chkp);
 
+	if (rtrcp) {
+		WRITE_ONCE(rp->rtort_chkp, NULL);
+		smp_store_release(&rtrcp->rtc_ready, 1); // Pair with smp_load_acquire().
+	}
 	i = READ_ONCE(rp->rtort_pipe_count);
 	if (i > RCU_TORTURE_PIPE_LEN)
 		i = RCU_TORTURE_PIPE_LEN;
@@ -461,7 +493,7 @@ static struct rcu_torture_ops rcu_ops = {
 	.deferred_free	= rcu_torture_deferred_free,
 	.sync		= synchronize_rcu,
 	.exp_sync	= synchronize_rcu_expedited,
-	.get_state	= get_state_synchronize_rcu,
+	.get_gp_state	= get_state_synchronize_rcu,
 	.cond_sync	= cond_synchronize_rcu,
 	.call		= call_rcu,
 	.cb_barrier	= rcu_barrier,
@@ -570,6 +602,21 @@ static void srcu_torture_synchronize(void)
 	synchronize_srcu(srcu_ctlp);
 }
 
+static unsigned long srcu_torture_get_gp_state(void)
+{
+	return get_state_synchronize_srcu(srcu_ctlp);
+}
+
+static unsigned long srcu_torture_start_gp_poll(void)
+{
+	return start_poll_synchronize_srcu(srcu_ctlp);
+}
+
+static bool srcu_torture_poll_gp_state(unsigned long oldstate)
+{
+	return poll_state_synchronize_srcu(srcu_ctlp, oldstate);
+}
+
 static void srcu_torture_call(struct rcu_head *head,
 			      rcu_callback_t func)
 {
@@ -601,6 +648,9 @@ static struct rcu_torture_ops srcu_ops = {
 	.deferred_free	= srcu_torture_deferred_free,
 	.sync		= srcu_torture_synchronize,
 	.exp_sync	= srcu_torture_synchronize_expedited,
+	.get_gp_state	= srcu_torture_get_gp_state,
+	.start_gp_poll	= srcu_torture_start_gp_poll,
+	.poll_gp_state	= srcu_torture_poll_gp_state,
 	.call		= srcu_torture_call,
 	.cb_barrier	= srcu_torture_barrier,
 	.stats		= srcu_torture_stats,
@@ -1018,42 +1068,26 @@ rcu_torture_fqs(void *arg)
 	return 0;
 }
 
+// Used by writers to randomly choose from the available grace-period
+// primitives.  The only purpose of the initialization is to size the array.
+static int synctype[] = { RTWS_DEF_FREE, RTWS_EXP_SYNC, RTWS_COND_GET, RTWS_POLL_GET, RTWS_SYNC };
+static int nsynctypes;
+
 /*
- * RCU torture writer kthread.  Repeatedly substitutes a new structure
- * for that pointed to by rcu_torture_current, freeing the old structure
- * after a series of grace periods (the "pipeline").
+ * Determine which grace-period primitives are available.
  */
-static int
-rcu_torture_writer(void *arg)
+static void rcu_torture_write_types(void)
 {
-	bool can_expedite = !rcu_gp_is_expedited() && !rcu_gp_is_normal();
-	int expediting = 0;
-	unsigned long gp_snap;
 	bool gp_cond1 = gp_cond, gp_exp1 = gp_exp, gp_normal1 = gp_normal;
-	bool gp_sync1 = gp_sync;
-	int i;
-	int oldnice = task_nice(current);
-	struct rcu_torture *rp;
-	struct rcu_torture *old_rp;
-	static DEFINE_TORTURE_RANDOM(rand);
-	bool stutter_waited;
-	int synctype[] = { RTWS_DEF_FREE, RTWS_EXP_SYNC,
-			   RTWS_COND_GET, RTWS_SYNC };
-	int nsynctypes = 0;
-
-	VERBOSE_TOROUT_STRING("rcu_torture_writer task started");
-	if (!can_expedite)
-		pr_alert("%s" TORTURE_FLAG
-			 " GP expediting controlled from boot/sysfs for %s.\n",
-			 torture_type, cur_ops->name);
+	bool gp_poll1 = gp_poll, gp_sync1 = gp_sync;
 
 	/* Initialize synctype[] array.  If none set, take default. */
-	if (!gp_cond1 && !gp_exp1 && !gp_normal1 && !gp_sync1)
-		gp_cond1 = gp_exp1 = gp_normal1 = gp_sync1 = true;
-	if (gp_cond1 && cur_ops->get_state && cur_ops->cond_sync) {
+	if (!gp_cond1 && !gp_exp1 && !gp_normal1 && !gp_poll1 && !gp_sync1)
+		gp_cond1 = gp_exp1 = gp_normal1 = gp_poll1 = gp_sync1 = true;
+	if (gp_cond1 && cur_ops->get_gp_state && cur_ops->cond_sync) {
 		synctype[nsynctypes++] = RTWS_COND_GET;
 		pr_info("%s: Testing conditional GPs.\n", __func__);
-	} else if (gp_cond && (!cur_ops->get_state || !cur_ops->cond_sync)) {
+	} else if (gp_cond && (!cur_ops->get_gp_state || !cur_ops->cond_sync)) {
 		pr_alert("%s: gp_cond without primitives.\n", __func__);
 	}
 	if (gp_exp1 && cur_ops->exp_sync) {
@@ -1068,12 +1102,46 @@ rcu_torture_writer(void *arg)
 	} else if (gp_normal && !cur_ops->deferred_free) {
 		pr_alert("%s: gp_normal without primitives.\n", __func__);
 	}
+	if (gp_poll1 && cur_ops->start_gp_poll && cur_ops->poll_gp_state) {
+		synctype[nsynctypes++] = RTWS_POLL_GET;
+		pr_info("%s: Testing polling GPs.\n", __func__);
+	} else if (gp_poll && (!cur_ops->start_gp_poll || !cur_ops->poll_gp_state)) {
+		pr_alert("%s: gp_poll without primitives.\n", __func__);
+	}
 	if (gp_sync1 && cur_ops->sync) {
 		synctype[nsynctypes++] = RTWS_SYNC;
 		pr_info("%s: Testing normal GPs.\n", __func__);
 	} else if (gp_sync && !cur_ops->sync) {
 		pr_alert("%s: gp_sync without primitives.\n", __func__);
 	}
+}
+
+/*
+ * RCU torture writer kthread.  Repeatedly substitutes a new structure
+ * for that pointed to by rcu_torture_current, freeing the old structure
+ * after a series of grace periods (the "pipeline").
+ */
+static int
+rcu_torture_writer(void *arg)
+{
+	bool boot_ended;
+	bool can_expedite = !rcu_gp_is_expedited() && !rcu_gp_is_normal();
+	unsigned long cookie;
+	int expediting = 0;
+	unsigned long gp_snap;
+	int i;
+	int idx;
+	int oldnice = task_nice(current);
+	struct rcu_torture *rp;
+	struct rcu_torture *old_rp;
+	static DEFINE_TORTURE_RANDOM(rand);
+	bool stutter_waited;
+
+	VERBOSE_TOROUT_STRING("rcu_torture_writer task started");
+	if (!can_expedite)
+		pr_alert("%s" TORTURE_FLAG
+			 " GP expediting controlled from boot/sysfs for %s.\n",
+			 torture_type, cur_ops->name);
 	if (WARN_ONCE(nsynctypes == 0,
 		      "rcu_torture_writer: No update-side primitives.\n")) {
 		/*
@@ -1087,7 +1155,7 @@ rcu_torture_writer(void *arg)
 
 	do {
 		rcu_torture_writer_state = RTWS_FIXED_DELAY;
-		schedule_timeout_uninterruptible(1);
+		torture_hrtimeout_us(500, 1000, &rand);
 		rp = rcu_torture_alloc();
 		if (rp == NULL)
 			continue;
@@ -1107,6 +1175,18 @@ rcu_torture_writer(void *arg)
 			atomic_inc(&rcu_torture_wcount[i]);
 			WRITE_ONCE(old_rp->rtort_pipe_count,
 				   old_rp->rtort_pipe_count + 1);
+			if (cur_ops->get_gp_state && cur_ops->poll_gp_state) {
+				idx = cur_ops->readlock();
+				cookie = cur_ops->get_gp_state();
+				WARN_ONCE(rcu_torture_writer_state != RTWS_DEF_FREE &&
+					  cur_ops->poll_gp_state(cookie),
+					  "%s: Cookie check 1 failed %s(%d) %lu->%lu\n",
+					  __func__,
+					  rcu_torture_writer_state_getname(),
+					  rcu_torture_writer_state,
+					  cookie, cur_ops->get_gp_state());
+				cur_ops->readunlock(idx);
+			}
 			switch (synctype[torture_random(&rand) % nsynctypes]) {
 			case RTWS_DEF_FREE:
 				rcu_torture_writer_state = RTWS_DEF_FREE;
@@ -1119,15 +1199,21 @@ rcu_torture_writer(void *arg)
 				break;
 			case RTWS_COND_GET:
 				rcu_torture_writer_state = RTWS_COND_GET;
-				gp_snap = cur_ops->get_state();
-				i = torture_random(&rand) % 16;
-				if (i != 0)
-					schedule_timeout_interruptible(i);
-				udelay(torture_random(&rand) % 1000);
+				gp_snap = cur_ops->get_gp_state();
+				torture_hrtimeout_jiffies(torture_random(&rand) % 16, &rand);
 				rcu_torture_writer_state = RTWS_COND_SYNC;
 				cur_ops->cond_sync(gp_snap);
 				rcu_torture_pipe_update(old_rp);
 				break;
+			case RTWS_POLL_GET:
+				rcu_torture_writer_state = RTWS_POLL_GET;
+				gp_snap = cur_ops->start_gp_poll();
+				rcu_torture_writer_state = RTWS_POLL_WAIT;
+				while (!cur_ops->poll_gp_state(gp_snap))
+					torture_hrtimeout_jiffies(torture_random(&rand) % 16,
+								  &rand);
+				rcu_torture_pipe_update(old_rp);
+				break;
 			case RTWS_SYNC:
 				rcu_torture_writer_state = RTWS_SYNC;
 				cur_ops->sync();
@@ -1137,6 +1223,14 @@ rcu_torture_writer(void *arg)
 				WARN_ON_ONCE(1);
 				break;
 			}
+			if (cur_ops->get_gp_state && cur_ops->poll_gp_state)
+				WARN_ONCE(rcu_torture_writer_state != RTWS_DEF_FREE &&
+					  !cur_ops->poll_gp_state(cookie),
+					  "%s: Cookie check 2 failed %s(%d) %lu->%lu\n",
+					  __func__,
+					  rcu_torture_writer_state_getname(),
+					  rcu_torture_writer_state,
+					  cookie, cur_ops->get_gp_state());
 		}
 		WRITE_ONCE(rcu_torture_current_version,
 			   rcu_torture_current_version + 1);
@@ -1155,12 +1249,13 @@ rcu_torture_writer(void *arg)
 				       !rcu_gp_is_normal();
 		}
 		rcu_torture_writer_state = RTWS_STUTTER;
+		boot_ended = rcu_inkernel_boot_has_ended();
 		stutter_waited = stutter_wait("rcu_torture_writer");
 		if (stutter_waited &&
 		    !READ_ONCE(rcu_fwd_cb_nodelay) &&
 		    !cur_ops->slow_gps &&
 		    !torture_must_stop() &&
-		    rcu_inkernel_boot_has_ended())
+		    boot_ended)
 			for (i = 0; i < ARRAY_SIZE(rcu_tortures); i++)
 				if (list_empty(&rcu_tortures[i].rtort_free) &&
 				    rcu_access_pointer(rcu_torture_current) !=
@@ -1194,26 +1289,43 @@ rcu_torture_writer(void *arg)
 static int
 rcu_torture_fakewriter(void *arg)
 {
+	unsigned long gp_snap;
 	DEFINE_TORTURE_RANDOM(rand);
 
 	VERBOSE_TOROUT_STRING("rcu_torture_fakewriter task started");
 	set_user_nice(current, MAX_NICE);
 
 	do {
-		schedule_timeout_uninterruptible(1 + torture_random(&rand)%10);
-		udelay(torture_random(&rand) & 0x3ff);
+		torture_hrtimeout_jiffies(torture_random(&rand) % 10, &rand);
 		if (cur_ops->cb_barrier != NULL &&
 		    torture_random(&rand) % (nfakewriters * 8) == 0) {
 			cur_ops->cb_barrier();
-		} else if (gp_normal == gp_exp) {
-			if (cur_ops->sync && torture_random(&rand) & 0x80)
-				cur_ops->sync();
-			else if (cur_ops->exp_sync)
+		} else {
+			switch (synctype[torture_random(&rand) % nsynctypes]) {
+			case RTWS_DEF_FREE:
+				break;
+			case RTWS_EXP_SYNC:
 				cur_ops->exp_sync();
-		} else if (gp_normal && cur_ops->sync) {
-			cur_ops->sync();
-		} else if (cur_ops->exp_sync) {
-			cur_ops->exp_sync();
+				break;
+			case RTWS_COND_GET:
+				gp_snap = cur_ops->get_gp_state();
+				torture_hrtimeout_jiffies(torture_random(&rand) % 16, &rand);
+				cur_ops->cond_sync(gp_snap);
+				break;
+			case RTWS_POLL_GET:
+				gp_snap = cur_ops->start_gp_poll();
+				while (!cur_ops->poll_gp_state(gp_snap)) {
+					torture_hrtimeout_jiffies(torture_random(&rand) % 16,
+								  &rand);
+				}
+				break;
+			case RTWS_SYNC:
+				cur_ops->sync();
+				break;
+			default:
+				WARN_ON_ONCE(1);
+				break;
+			}
 		}
 		stutter_wait("rcu_torture_fakewriter");
 	} while (!torture_must_stop());
@@ -1227,6 +1339,62 @@ static void rcu_torture_timer_cb(struct rcu_head *rhp)
 	kfree(rhp);
 }
 
+// Set up and carry out testing of RCU's global memory ordering
+static void rcu_torture_reader_do_mbchk(long myid, struct rcu_torture *rtp,
+					struct torture_random_state *trsp)
+{
+	unsigned long loops;
+	int noc = torture_num_online_cpus();
+	int rdrchked;
+	int rdrchker;
+	struct rcu_torture_reader_check *rtrcp; // Me.
+	struct rcu_torture_reader_check *rtrcp_assigner; // Assigned us to do checking.
+	struct rcu_torture_reader_check *rtrcp_chked; // Reader being checked.
+	struct rcu_torture_reader_check *rtrcp_chker; // Reader doing checking when not me.
+
+	if (myid < 0)
+		return; // Don't try this from timer handlers.
+
+	// Increment my counter.
+	rtrcp = &rcu_torture_reader_mbchk[myid];
+	WRITE_ONCE(rtrcp->rtc_myloops, rtrcp->rtc_myloops + 1);
+
+	// Attempt to assign someone else some checking work.
+	rdrchked = torture_random(trsp) % nrealreaders;
+	rtrcp_chked = &rcu_torture_reader_mbchk[rdrchked];
+	rdrchker = torture_random(trsp) % nrealreaders;
+	rtrcp_chker = &rcu_torture_reader_mbchk[rdrchker];
+	if (rdrchked != myid && rdrchked != rdrchker && noc >= rdrchked && noc >= rdrchker &&
+	    smp_load_acquire(&rtrcp->rtc_chkrdr) < 0 && // Pairs with smp_store_release below.
+	    !READ_ONCE(rtp->rtort_chkp) &&
+	    !smp_load_acquire(&rtrcp_chker->rtc_assigner)) { // Pairs with smp_store_release below.
+		rtrcp->rtc_chkloops = READ_ONCE(rtrcp_chked->rtc_myloops);
+		WARN_ON_ONCE(rtrcp->rtc_chkrdr >= 0);
+		rtrcp->rtc_chkrdr = rdrchked;
+		WARN_ON_ONCE(rtrcp->rtc_ready); // This gets set after the grace period ends.
+		if (cmpxchg_relaxed(&rtrcp_chker->rtc_assigner, NULL, rtrcp) ||
+		    cmpxchg_relaxed(&rtp->rtort_chkp, NULL, rtrcp))
+			(void)cmpxchg_relaxed(&rtrcp_chker->rtc_assigner, rtrcp, NULL); // Back out.
+	}
+
+	// If assigned some completed work, do it!
+	rtrcp_assigner = READ_ONCE(rtrcp->rtc_assigner);
+	if (!rtrcp_assigner || !smp_load_acquire(&rtrcp_assigner->rtc_ready))
+		return; // No work or work not yet ready.
+	rdrchked = rtrcp_assigner->rtc_chkrdr;
+	if (WARN_ON_ONCE(rdrchked < 0))
+		return;
+	rtrcp_chked = &rcu_torture_reader_mbchk[rdrchked];
+	loops = READ_ONCE(rtrcp_chked->rtc_myloops);
+	atomic_inc(&n_rcu_torture_mbchk_tries);
+	if (ULONG_CMP_LT(loops, rtrcp_assigner->rtc_chkloops))
+		atomic_inc(&n_rcu_torture_mbchk_fail);
+	rtrcp_assigner->rtc_chkloops = loops + ULONG_MAX / 2;
+	rtrcp_assigner->rtc_ready = 0;
+	smp_store_release(&rtrcp->rtc_assigner, NULL); // Someone else can assign us work.
+	smp_store_release(&rtrcp_assigner->rtc_chkrdr, -1); // Assigner can again assign.
+}
+
 /*
  * Do one extension of an RCU read-side critical section using the
  * current reader state in readstate (set to zero for initial entry
@@ -1362,8 +1530,9 @@ rcutorture_loop_extend(int *readstate, struct torture_random_state *trsp,
  * no data to read.  Can be invoked both from process context and
  * from a timer handler.
  */
-static bool rcu_torture_one_read(struct torture_random_state *trsp)
+static bool rcu_torture_one_read(struct torture_random_state *trsp, long myid)
 {
+	unsigned long cookie;
 	int i;
 	unsigned long started;
 	unsigned long completed;
@@ -1379,6 +1548,8 @@ static bool rcu_torture_one_read(struct torture_random_state *trsp)
 	WARN_ON_ONCE(!rcu_is_watching());
 	newstate = rcutorture_extend_mask(readstate, trsp);
 	rcutorture_one_extend(&readstate, newstate, trsp, rtrsp++);
+	if (cur_ops->get_gp_state && cur_ops->poll_gp_state)
+		cookie = cur_ops->get_gp_state();
 	started = cur_ops->get_gp_seq();
 	ts = rcu_trace_clock_local();
 	p = rcu_dereference_check(rcu_torture_current,
@@ -1394,6 +1565,7 @@ static bool rcu_torture_one_read(struct torture_random_state *trsp)
 	}
 	if (p->rtort_mbtest == 0)
 		atomic_inc(&n_rcu_torture_mberror);
+	rcu_torture_reader_do_mbchk(myid, p, trsp);
 	rtrsp = rcutorture_loop_extend(&readstate, trsp, rtrsp);
 	preempt_disable();
 	pipe_count = READ_ONCE(p->rtort_pipe_count);
@@ -1415,6 +1587,13 @@ static bool rcu_torture_one_read(struct torture_random_state *trsp)
 	}
 	__this_cpu_inc(rcu_torture_batch[completed]);
 	preempt_enable();
+	if (cur_ops->get_gp_state && cur_ops->poll_gp_state)
+		WARN_ONCE(cur_ops->poll_gp_state(cookie),
+			  "%s: Cookie check 3 failed %s(%d) %lu->%lu\n",
+			  __func__,
+			  rcu_torture_writer_state_getname(),
+			  rcu_torture_writer_state,
+			  cookie, cur_ops->get_gp_state());
 	rcutorture_one_extend(&readstate, 0, trsp, rtrsp);
 	WARN_ON_ONCE(readstate & RCUTORTURE_RDR_MASK);
 	// This next splat is expected behavior if leakpointer, especially
@@ -1443,7 +1622,7 @@ static DEFINE_TORTURE_RANDOM_PERCPU(rcu_torture_timer_rand);
 static void rcu_torture_timer(struct timer_list *unused)
 {
 	atomic_long_inc(&n_rcu_torture_timers);
-	(void)rcu_torture_one_read(this_cpu_ptr(&rcu_torture_timer_rand));
+	(void)rcu_torture_one_read(this_cpu_ptr(&rcu_torture_timer_rand), -1);
 
 	/* Test call_rcu() invocation from interrupt handler. */
 	if (cur_ops->call) {
@@ -1479,13 +1658,13 @@ rcu_torture_reader(void *arg)
 			if (!timer_pending(&t))
 				mod_timer(&t, jiffies + 1);
 		}
-		if (!rcu_torture_one_read(&rand) && !torture_must_stop())
+		if (!rcu_torture_one_read(&rand, myid) && !torture_must_stop())
 			schedule_timeout_interruptible(HZ);
 		if (time_after(jiffies, lastsleep) && !torture_must_stop()) {
-			schedule_timeout_interruptible(1);
+			torture_hrtimeout_us(500, 1000, &rand);
 			lastsleep = jiffies + 10;
 		}
-		while (num_online_cpus() < mynumonline && !torture_must_stop())
+		while (torture_num_online_cpus() < mynumonline && !torture_must_stop())
 			schedule_timeout_interruptible(HZ / 5);
 		stutter_wait("rcu_torture_reader");
 	} while (!torture_must_stop());
@@ -1499,6 +1678,53 @@ rcu_torture_reader(void *arg)
 }
 
 /*
+ * Randomly Toggle CPUs' callback-offload state.  This uses hrtimers to
+ * increase race probabilities and fuzzes the interval between toggling.
+ */
+static int rcu_nocb_toggle(void *arg)
+{
+	int cpu;
+	int maxcpu = -1;
+	int oldnice = task_nice(current);
+	long r;
+	DEFINE_TORTURE_RANDOM(rand);
+	ktime_t toggle_delay;
+	unsigned long toggle_fuzz;
+	ktime_t toggle_interval = ms_to_ktime(nocbs_toggle);
+
+	VERBOSE_TOROUT_STRING("rcu_nocb_toggle task started");
+	while (!rcu_inkernel_boot_has_ended())
+		schedule_timeout_interruptible(HZ / 10);
+	for_each_online_cpu(cpu)
+		maxcpu = cpu;
+	WARN_ON(maxcpu < 0);
+	if (toggle_interval > ULONG_MAX)
+		toggle_fuzz = ULONG_MAX >> 3;
+	else
+		toggle_fuzz = toggle_interval >> 3;
+	if (toggle_fuzz <= 0)
+		toggle_fuzz = NSEC_PER_USEC;
+	do {
+		r = torture_random(&rand);
+		cpu = (r >> 4) % (maxcpu + 1);
+		if (r & 0x1) {
+			rcu_nocb_cpu_offload(cpu);
+			atomic_long_inc(&n_nocb_offload);
+		} else {
+			rcu_nocb_cpu_deoffload(cpu);
+			atomic_long_inc(&n_nocb_deoffload);
+		}
+		toggle_delay = torture_random(&rand) % toggle_fuzz + toggle_interval;
+		set_current_state(TASK_INTERRUPTIBLE);
+		schedule_hrtimeout(&toggle_delay, HRTIMER_MODE_REL);
+		if (stutter_wait("rcu_nocb_toggle"))
+			sched_set_normal(current, oldnice);
+	} while (!torture_must_stop());
+	torture_kthread_stopping("rcu_nocb_toggle");
+	return 0;
+}
+
+/*
  * Print torture statistics.  Caller must ensure that there is only
  * one call to this function at a given time!!!  This is normally
  * accomplished by relying on the module system to only have one copy
@@ -1539,8 +1765,9 @@ rcu_torture_stats_print(void)
 		atomic_read(&n_rcu_torture_alloc),
 		atomic_read(&n_rcu_torture_alloc_fail),
 		atomic_read(&n_rcu_torture_free));
-	pr_cont("rtmbe: %d rtbe: %ld rtbke: %ld rtbre: %ld ",
+	pr_cont("rtmbe: %d rtmbkf: %d/%d rtbe: %ld rtbke: %ld rtbre: %ld ",
 		atomic_read(&n_rcu_torture_mberror),
+		atomic_read(&n_rcu_torture_mbchk_fail), atomic_read(&n_rcu_torture_mbchk_tries),
 		n_rcu_torture_barrier_error,
 		n_rcu_torture_boost_ktrerror,
 		n_rcu_torture_boost_rterror);
@@ -1553,16 +1780,20 @@ rcu_torture_stats_print(void)
 		data_race(n_barrier_successes),
 		data_race(n_barrier_attempts),
 		data_race(n_rcu_torture_barrier_error));
-	pr_cont("read-exits: %ld\n", data_race(n_read_exits));
+	pr_cont("read-exits: %ld ", data_race(n_read_exits)); // Statistic.
+	pr_cont("nocb-toggles: %ld:%ld\n",
+		atomic_long_read(&n_nocb_offload), atomic_long_read(&n_nocb_deoffload));
 
 	pr_alert("%s%s ", torture_type, TORTURE_FLAG);
 	if (atomic_read(&n_rcu_torture_mberror) ||
+	    atomic_read(&n_rcu_torture_mbchk_fail) ||
 	    n_rcu_torture_barrier_error || n_rcu_torture_boost_ktrerror ||
 	    n_rcu_torture_boost_rterror || n_rcu_torture_boost_failure ||
 	    i > 1) {
 		pr_cont("%s", "!!! ");
 		atomic_inc(&n_rcu_torture_error);
 		WARN_ON_ONCE(atomic_read(&n_rcu_torture_mberror));
+		WARN_ON_ONCE(atomic_read(&n_rcu_torture_mbchk_fail));
 		WARN_ON_ONCE(n_rcu_torture_barrier_error);  // rcu_barrier()
 		WARN_ON_ONCE(n_rcu_torture_boost_ktrerror); // no boost kthread
 		WARN_ON_ONCE(n_rcu_torture_boost_rterror); // can't set RT prio
@@ -1647,7 +1878,8 @@ rcu_torture_print_module_parms(struct rcu_torture_ops *cur_ops, const char *tag)
 		 "stall_cpu_block=%d "
 		 "n_barrier_cbs=%d "
 		 "onoff_interval=%d onoff_holdoff=%d "
-		 "read_exit_delay=%d read_exit_burst=%d\n",
+		 "read_exit_delay=%d read_exit_burst=%d "
+		 "nocbs_nthreads=%d nocbs_toggle=%d\n",
 		 torture_type, tag, nrealreaders, nfakewriters,
 		 stat_interval, verbose, test_no_idle_hz, shuffle_interval,
 		 stutter, irqreader, fqs_duration, fqs_holdoff, fqs_stutter,
@@ -1657,7 +1889,8 @@ rcu_torture_print_module_parms(struct rcu_torture_ops *cur_ops, const char *tag)
 		 stall_cpu_block,
 		 n_barrier_cbs,
 		 onoff_interval, onoff_holdoff,
-		 read_exit_delay, read_exit_burst);
+		 read_exit_delay, read_exit_burst,
+		 nocbs_nthreads, nocbs_toggle);
 }
 
 static int rcutorture_booster_cleanup(unsigned int cpu)
@@ -2392,7 +2625,7 @@ static int rcu_torture_read_exit_child(void *trsp_in)
 	// Minimize time between reading and exiting.
 	while (!kthread_should_stop())
 		schedule_timeout_uninterruptible(1);
-	(void)rcu_torture_one_read(trsp);
+	(void)rcu_torture_one_read(trsp, -1);
 	return 0;
 }
 
@@ -2500,6 +2733,13 @@ rcu_torture_cleanup(void)
 	torture_stop_kthread(rcu_torture_stall, stall_task);
 	torture_stop_kthread(rcu_torture_writer, writer_task);
 
+	if (nocb_tasks) {
+		for (i = 0; i < nrealnocbers; i++)
+			torture_stop_kthread(rcu_nocb_toggle, nocb_tasks[i]);
+		kfree(nocb_tasks);
+		nocb_tasks = NULL;
+	}
+
 	if (reader_tasks) {
 		for (i = 0; i < nrealreaders; i++)
 			torture_stop_kthread(rcu_torture_reader,
@@ -2507,6 +2747,8 @@ rcu_torture_cleanup(void)
 		kfree(reader_tasks);
 		reader_tasks = NULL;
 	}
+	kfree(rcu_torture_reader_mbchk);
+	rcu_torture_reader_mbchk = NULL;
 
 	if (fakewriter_tasks) {
 		for (i = 0; i < nfakewriters; i++)
@@ -2604,6 +2846,7 @@ static void rcu_test_debug_objects(void)
 #ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
 	struct rcu_head rh1;
 	struct rcu_head rh2;
+	struct rcu_head *rhp = kmalloc(sizeof(*rhp), GFP_KERNEL);
 
 	init_rcu_head_on_stack(&rh1);
 	init_rcu_head_on_stack(&rh2);
@@ -2616,6 +2859,10 @@ static void rcu_test_debug_objects(void)
 	local_irq_disable(); /* Make it harder to start a new grace period. */
 	call_rcu(&rh2, rcu_torture_leak_cb);
 	call_rcu(&rh2, rcu_torture_err_cb); /* Duplicate callback. */
+	if (rhp) {
+		call_rcu(rhp, rcu_torture_leak_cb);
+		call_rcu(rhp, rcu_torture_err_cb); /* Another duplicate callback. */
+	}
 	local_irq_enable();
 	rcu_read_unlock();
 	preempt_enable();
@@ -2710,6 +2957,8 @@ rcu_torture_init(void)
 	atomic_set(&n_rcu_torture_alloc_fail, 0);
 	atomic_set(&n_rcu_torture_free, 0);
 	atomic_set(&n_rcu_torture_mberror, 0);
+	atomic_set(&n_rcu_torture_mbchk_fail, 0);
+	atomic_set(&n_rcu_torture_mbchk_tries, 0);
 	atomic_set(&n_rcu_torture_error, 0);
 	n_rcu_torture_barrier_error = 0;
 	n_rcu_torture_boost_ktrerror = 0;
@@ -2729,6 +2978,7 @@ rcu_torture_init(void)
 
 	/* Start up the kthreads. */
 
+	rcu_torture_write_types();
 	firsterr = torture_create_kthread(rcu_torture_writer, NULL,
 					  writer_task);
 	if (firsterr)
@@ -2751,17 +3001,40 @@ rcu_torture_init(void)
 	}
 	reader_tasks = kcalloc(nrealreaders, sizeof(reader_tasks[0]),
 			       GFP_KERNEL);
-	if (reader_tasks == NULL) {
+	rcu_torture_reader_mbchk = kcalloc(nrealreaders, sizeof(*rcu_torture_reader_mbchk),
+					   GFP_KERNEL);
+	if (!reader_tasks || !rcu_torture_reader_mbchk) {
 		VERBOSE_TOROUT_ERRSTRING("out of memory");
 		firsterr = -ENOMEM;
 		goto unwind;
 	}
 	for (i = 0; i < nrealreaders; i++) {
+		rcu_torture_reader_mbchk[i].rtc_chkrdr = -1;
 		firsterr = torture_create_kthread(rcu_torture_reader, (void *)i,
 						  reader_tasks[i]);
 		if (firsterr)
 			goto unwind;
 	}
+	nrealnocbers = nocbs_nthreads;
+	if (WARN_ON(nrealnocbers < 0))
+		nrealnocbers = 1;
+	if (WARN_ON(nocbs_toggle < 0))
+		nocbs_toggle = HZ;
+	if (nrealnocbers > 0) {
+		nocb_tasks = kcalloc(nrealnocbers, sizeof(nocb_tasks[0]), GFP_KERNEL);
+		if (nocb_tasks == NULL) {
+			VERBOSE_TOROUT_ERRSTRING("out of memory");
+			firsterr = -ENOMEM;
+			goto unwind;
+		}
+	} else {
+		nocb_tasks = NULL;
+	}
+	for (i = 0; i < nrealnocbers; i++) {
+		firsterr = torture_create_kthread(rcu_nocb_toggle, NULL, nocb_tasks[i]);
+		if (firsterr)
+			goto unwind;
+	}
 	if (stat_interval > 0) {
 		firsterr = torture_create_kthread(rcu_torture_stats, NULL,
 						  stats_task);
diff --git a/kernel/rcu/refscale.c b/kernel/rcu/refscale.c
index 23ff36a66f97..02dd9767b559 100644
--- a/kernel/rcu/refscale.c
+++ b/kernel/rcu/refscale.c
@@ -46,6 +46,18 @@
 #define VERBOSE_SCALEOUT(s, x...) \
 	do { if (verbose) pr_alert("%s" SCALE_FLAG s, scale_type, ## x); } while (0)
 
+static atomic_t verbose_batch_ctr;
+
+#define VERBOSE_SCALEOUT_BATCH(s, x...)							\
+do {											\
+	if (verbose &&									\
+	    (verbose_batched <= 0 ||							\
+	     !(atomic_inc_return(&verbose_batch_ctr) % verbose_batched))) {		\
+		schedule_timeout_uninterruptible(1);					\
+		pr_alert("%s" SCALE_FLAG s, scale_type, ## x);				\
+	}										\
+} while (0)
+
 #define VERBOSE_SCALEOUT_ERRSTRING(s, x...) \
 	do { if (verbose) pr_alert("%s" SCALE_FLAG "!!! " s, scale_type, ## x); } while (0)
 
@@ -57,6 +69,7 @@ module_param(scale_type, charp, 0444);
 MODULE_PARM_DESC(scale_type, "Type of test (rcu, srcu, refcnt, rwsem, rwlock.");
 
 torture_param(int, verbose, 0, "Enable verbose debugging printk()s");
+torture_param(int, verbose_batched, 0, "Batch verbose debugging printk()s");
 
 // Wait until there are multiple CPUs before starting test.
 torture_param(int, holdoff, IS_BUILTIN(CONFIG_RCU_REF_SCALE_TEST) ? 10 : 0,
@@ -368,14 +381,14 @@ ref_scale_reader(void *arg)
 	u64 start;
 	s64 duration;
 
-	VERBOSE_SCALEOUT("ref_scale_reader %ld: task started", me);
+	VERBOSE_SCALEOUT_BATCH("ref_scale_reader %ld: task started", me);
 	set_cpus_allowed_ptr(current, cpumask_of(me % nr_cpu_ids));
 	set_user_nice(current, MAX_NICE);
 	atomic_inc(&n_init);
 	if (holdoff)
 		schedule_timeout_interruptible(holdoff * HZ);
 repeat:
-	VERBOSE_SCALEOUT("ref_scale_reader %ld: waiting to start next experiment on cpu %d", me, smp_processor_id());
+	VERBOSE_SCALEOUT_BATCH("ref_scale_reader %ld: waiting to start next experiment on cpu %d", me, smp_processor_id());
 
 	// Wait for signal that this reader can start.
 	wait_event(rt->wq, (atomic_read(&nreaders_exp) && smp_load_acquire(&rt->start_reader)) ||
@@ -392,7 +405,7 @@ repeat:
 		while (atomic_read_acquire(&n_started))
 			cpu_relax();
 
-	VERBOSE_SCALEOUT("ref_scale_reader %ld: experiment %d started", me, exp_idx);
+	VERBOSE_SCALEOUT_BATCH("ref_scale_reader %ld: experiment %d started", me, exp_idx);
 
 
 	// To reduce noise, do an initial cache-warming invocation, check
@@ -421,8 +434,8 @@ repeat:
 	if (atomic_dec_and_test(&nreaders_exp))
 		wake_up(&main_wq);
 
-	VERBOSE_SCALEOUT("ref_scale_reader %ld: experiment %d ended, (readers remaining=%d)",
-			me, exp_idx, atomic_read(&nreaders_exp));
+	VERBOSE_SCALEOUT_BATCH("ref_scale_reader %ld: experiment %d ended, (readers remaining=%d)",
+				me, exp_idx, atomic_read(&nreaders_exp));
 
 	if (!torture_must_stop())
 		goto repeat;
diff --git a/kernel/rcu/srcutiny.c b/kernel/rcu/srcutiny.c
index 6208c1dae5c9..26344dc6483b 100644
--- a/kernel/rcu/srcutiny.c
+++ b/kernel/rcu/srcutiny.c
@@ -34,6 +34,7 @@ static int init_srcu_struct_fields(struct srcu_struct *ssp)
 	ssp->srcu_gp_running = false;
 	ssp->srcu_gp_waiting = false;
 	ssp->srcu_idx = 0;
+	ssp->srcu_idx_max = 0;
 	INIT_WORK(&ssp->srcu_work, srcu_drive_gp);
 	INIT_LIST_HEAD(&ssp->srcu_work.entry);
 	return 0;
@@ -84,6 +85,8 @@ void cleanup_srcu_struct(struct srcu_struct *ssp)
 	WARN_ON(ssp->srcu_gp_waiting);
 	WARN_ON(ssp->srcu_cb_head);
 	WARN_ON(&ssp->srcu_cb_head != ssp->srcu_cb_tail);
+	WARN_ON(ssp->srcu_idx != ssp->srcu_idx_max);
+	WARN_ON(ssp->srcu_idx & 0x1);
 }
 EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
 
@@ -114,7 +117,7 @@ void srcu_drive_gp(struct work_struct *wp)
 	struct srcu_struct *ssp;
 
 	ssp = container_of(wp, struct srcu_struct, srcu_work);
-	if (ssp->srcu_gp_running || !READ_ONCE(ssp->srcu_cb_head))
+	if (ssp->srcu_gp_running || USHORT_CMP_GE(ssp->srcu_idx, READ_ONCE(ssp->srcu_idx_max)))
 		return; /* Already running or nothing to do. */
 
 	/* Remove recently arrived callbacks and wait for readers. */
@@ -124,11 +127,12 @@ void srcu_drive_gp(struct work_struct *wp)
 	ssp->srcu_cb_head = NULL;
 	ssp->srcu_cb_tail = &ssp->srcu_cb_head;
 	local_irq_enable();
-	idx = ssp->srcu_idx;
-	WRITE_ONCE(ssp->srcu_idx, !ssp->srcu_idx);
+	idx = (ssp->srcu_idx & 0x2) / 2;
+	WRITE_ONCE(ssp->srcu_idx, ssp->srcu_idx + 1);
 	WRITE_ONCE(ssp->srcu_gp_waiting, true);  /* srcu_read_unlock() wakes! */
 	swait_event_exclusive(ssp->srcu_wq, !READ_ONCE(ssp->srcu_lock_nesting[idx]));
 	WRITE_ONCE(ssp->srcu_gp_waiting, false); /* srcu_read_unlock() cheap. */
+	WRITE_ONCE(ssp->srcu_idx, ssp->srcu_idx + 1);
 
 	/* Invoke the callbacks we removed above. */
 	while (lh) {
@@ -146,11 +150,27 @@ void srcu_drive_gp(struct work_struct *wp)
 	 * straighten that out.
 	 */
 	WRITE_ONCE(ssp->srcu_gp_running, false);
-	if (READ_ONCE(ssp->srcu_cb_head))
+	if (USHORT_CMP_LT(ssp->srcu_idx, READ_ONCE(ssp->srcu_idx_max)))
 		schedule_work(&ssp->srcu_work);
 }
 EXPORT_SYMBOL_GPL(srcu_drive_gp);
 
+static void srcu_gp_start_if_needed(struct srcu_struct *ssp)
+{
+	unsigned short cookie;
+
+	cookie = get_state_synchronize_srcu(ssp);
+	if (USHORT_CMP_GE(READ_ONCE(ssp->srcu_idx_max), cookie))
+		return;
+	WRITE_ONCE(ssp->srcu_idx_max, cookie);
+	if (!READ_ONCE(ssp->srcu_gp_running)) {
+		if (likely(srcu_init_done))
+			schedule_work(&ssp->srcu_work);
+		else if (list_empty(&ssp->srcu_work.entry))
+			list_add(&ssp->srcu_work.entry, &srcu_boot_list);
+	}
+}
+
 /*
  * Enqueue an SRCU callback on the specified srcu_struct structure,
  * initiating grace-period processing if it is not already running.
@@ -166,12 +186,7 @@ void call_srcu(struct srcu_struct *ssp, struct rcu_head *rhp,
 	*ssp->srcu_cb_tail = rhp;
 	ssp->srcu_cb_tail = &rhp->next;
 	local_irq_restore(flags);
-	if (!READ_ONCE(ssp->srcu_gp_running)) {
-		if (likely(srcu_init_done))
-			schedule_work(&ssp->srcu_work);
-		else if (list_empty(&ssp->srcu_work.entry))
-			list_add(&ssp->srcu_work.entry, &srcu_boot_list);
-	}
+	srcu_gp_start_if_needed(ssp);
 }
 EXPORT_SYMBOL_GPL(call_srcu);
 
@@ -190,6 +205,48 @@ void synchronize_srcu(struct srcu_struct *ssp)
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
+/*
+ * get_state_synchronize_srcu - Provide an end-of-grace-period cookie
+ */
+unsigned long get_state_synchronize_srcu(struct srcu_struct *ssp)
+{
+	unsigned long ret;
+
+	barrier();
+	ret = (READ_ONCE(ssp->srcu_idx) + 3) & ~0x1;
+	barrier();
+	return ret & USHRT_MAX;
+}
+EXPORT_SYMBOL_GPL(get_state_synchronize_srcu);
+
+/*
+ * start_poll_synchronize_srcu - Provide cookie and start grace period
+ *
+ * The difference between this and get_state_synchronize_srcu() is that
+ * this function ensures that the poll_state_synchronize_srcu() will
+ * eventually return the value true.
+ */
+unsigned long start_poll_synchronize_srcu(struct srcu_struct *ssp)
+{
+	unsigned long ret = get_state_synchronize_srcu(ssp);
+
+	srcu_gp_start_if_needed(ssp);
+	return ret;
+}
+EXPORT_SYMBOL_GPL(start_poll_synchronize_srcu);
+
+/*
+ * poll_state_synchronize_srcu - Has cookie's grace period ended?
+ */
+bool poll_state_synchronize_srcu(struct srcu_struct *ssp, unsigned long cookie)
+{
+	bool ret = USHORT_CMP_GE(READ_ONCE(ssp->srcu_idx), cookie);
+
+	barrier();
+	return ret;
+}
+EXPORT_SYMBOL_GPL(poll_state_synchronize_srcu);
+
 /* Lockdep diagnostics.  */
 void __init rcu_scheduler_starting(void)
 {
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index 0f23d20d485a..e26547b34ad3 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -808,6 +808,46 @@ static void srcu_leak_callback(struct rcu_head *rhp)
 }
 
 /*
+ * Start an SRCU grace period, and also queue the callback if non-NULL.
+ */
+static unsigned long srcu_gp_start_if_needed(struct srcu_struct *ssp,
+					     struct rcu_head *rhp, bool do_norm)
+{
+	unsigned long flags;
+	int idx;
+	bool needexp = false;
+	bool needgp = false;
+	unsigned long s;
+	struct srcu_data *sdp;
+
+	check_init_srcu_struct(ssp);
+	idx = srcu_read_lock(ssp);
+	sdp = raw_cpu_ptr(ssp->sda);
+	spin_lock_irqsave_rcu_node(sdp, flags);
+	if (rhp)
+		rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
+	rcu_segcblist_advance(&sdp->srcu_cblist,
+			      rcu_seq_current(&ssp->srcu_gp_seq));
+	s = rcu_seq_snap(&ssp->srcu_gp_seq);
+	(void)rcu_segcblist_accelerate(&sdp->srcu_cblist, s);
+	if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed, s)) {
+		sdp->srcu_gp_seq_needed = s;
+		needgp = true;
+	}
+	if (!do_norm && ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) {
+		sdp->srcu_gp_seq_needed_exp = s;
+		needexp = true;
+	}
+	spin_unlock_irqrestore_rcu_node(sdp, flags);
+	if (needgp)
+		srcu_funnel_gp_start(ssp, sdp, s, do_norm);
+	else if (needexp)
+		srcu_funnel_exp_start(ssp, sdp->mynode, s);
+	srcu_read_unlock(ssp, idx);
+	return s;
+}
+
+/*
  * Enqueue an SRCU callback on the srcu_data structure associated with
  * the current CPU and the specified srcu_struct structure, initiating
  * grace-period processing if it is not already running.
@@ -838,14 +878,6 @@ static void srcu_leak_callback(struct rcu_head *rhp)
 static void __call_srcu(struct srcu_struct *ssp, struct rcu_head *rhp,
 			rcu_callback_t func, bool do_norm)
 {
-	unsigned long flags;
-	int idx;
-	bool needexp = false;
-	bool needgp = false;
-	unsigned long s;
-	struct srcu_data *sdp;
-
-	check_init_srcu_struct(ssp);
 	if (debug_rcu_head_queue(rhp)) {
 		/* Probable double call_srcu(), so leak the callback. */
 		WRITE_ONCE(rhp->func, srcu_leak_callback);
@@ -853,28 +885,7 @@ static void __call_srcu(struct srcu_struct *ssp, struct rcu_head *rhp,
 		return;
 	}
 	rhp->func = func;
-	idx = srcu_read_lock(ssp);
-	sdp = raw_cpu_ptr(ssp->sda);
-	spin_lock_irqsave_rcu_node(sdp, flags);
-	rcu_segcblist_enqueue(&sdp->srcu_cblist, rhp);
-	rcu_segcblist_advance(&sdp->srcu_cblist,
-			      rcu_seq_current(&ssp->srcu_gp_seq));
-	s = rcu_seq_snap(&ssp->srcu_gp_seq);
-	(void)rcu_segcblist_accelerate(&sdp->srcu_cblist, s);
-	if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed, s)) {
-		sdp->srcu_gp_seq_needed = s;
-		needgp = true;
-	}
-	if (!do_norm && ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) {
-		sdp->srcu_gp_seq_needed_exp = s;
-		needexp = true;
-	}
-	spin_unlock_irqrestore_rcu_node(sdp, flags);
-	if (needgp)
-		srcu_funnel_gp_start(ssp, sdp, s, do_norm);
-	else if (needexp)
-		srcu_funnel_exp_start(ssp, sdp->mynode, s);
-	srcu_read_unlock(ssp, idx);
+	(void)srcu_gp_start_if_needed(ssp, rhp, do_norm);
 }
 
 /**
@@ -1003,6 +1014,77 @@ void synchronize_srcu(struct srcu_struct *ssp)
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
+/**
+ * get_state_synchronize_srcu - Provide an end-of-grace-period cookie
+ * @ssp: srcu_struct to provide cookie for.
+ *
+ * This function returns a cookie that can be passed to
+ * poll_state_synchronize_srcu(), which will return true if a full grace
+ * period has elapsed in the meantime.  It is the caller's responsibility
+ * to make sure that grace period happens, for example, by invoking
+ * call_srcu() after return from get_state_synchronize_srcu().
+ */
+unsigned long get_state_synchronize_srcu(struct srcu_struct *ssp)
+{
+	// Any prior manipulation of SRCU-protected data must happen
+	// before the load from ->srcu_gp_seq.
+	smp_mb();
+	return rcu_seq_snap(&ssp->srcu_gp_seq);
+}
+EXPORT_SYMBOL_GPL(get_state_synchronize_srcu);
+
+/**
+ * start_poll_synchronize_srcu - Provide cookie and start grace period
+ * @ssp: srcu_struct to provide cookie for.
+ *
+ * This function returns a cookie that can be passed to
+ * poll_state_synchronize_srcu(), which will return true if a full grace
+ * period has elapsed in the meantime.  Unlike get_state_synchronize_srcu(),
+ * this function also ensures that any needed SRCU grace period will be
+ * started.  This convenience does come at a cost in terms of CPU overhead.
+ */
+unsigned long start_poll_synchronize_srcu(struct srcu_struct *ssp)
+{
+	return srcu_gp_start_if_needed(ssp, NULL, true);
+}
+EXPORT_SYMBOL_GPL(start_poll_synchronize_srcu);
+
+/**
+ * poll_state_synchronize_srcu - Has cookie's grace period ended?
+ * @ssp: srcu_struct to provide cookie for.
+ * @cookie: Return value from get_state_synchronize_srcu() or start_poll_synchronize_srcu().
+ *
+ * This function takes the cookie that was returned from either
+ * get_state_synchronize_srcu() or start_poll_synchronize_srcu(), and
+ * returns @true if an SRCU grace period elapsed since the time that the
+ * cookie was created.
+ *
+ * Because cookies are finite in size, wrapping/overflow is possible.
+ * This is more pronounced on 32-bit systems where cookies are 32 bits,
+ * where in theory wrapping could happen in about 14 hours assuming
+ * 25-microsecond expedited SRCU grace periods.  However, a more likely
+ * overflow lower bound is on the order of 24 days in the case of
+ * one-millisecond SRCU grace periods.  Of course, wrapping in a 64-bit
+ * system requires geologic timespans, as in more than seven million years
+ * even for expedited SRCU grace periods.
+ *
+ * Wrapping/overflow is much more of an issue for CONFIG_SMP=n systems
+ * that also have CONFIG_PREEMPTION=n, which selects Tiny SRCU.  This uses
+ * a 16-bit cookie, which rcutorture routinely wraps in a matter of a
+ * few minutes.  If this proves to be a problem, this counter will be
+ * expanded to the same size as for Tree SRCU.
+ */
+bool poll_state_synchronize_srcu(struct srcu_struct *ssp, unsigned long cookie)
+{
+	if (!rcu_seq_done(&ssp->srcu_gp_seq, cookie))
+		return false;
+	// Ensure that the end of the SRCU grace period happens before
+	// any subsequent code that the caller might execute.
+	smp_mb(); // ^^^
+	return true;
+}
+EXPORT_SYMBOL_GPL(poll_state_synchronize_srcu);
+
 /*
  * Callback function for srcu_barrier() use.
  */
@@ -1160,6 +1242,7 @@ static void srcu_advance_state(struct srcu_struct *ssp)
  */
 static void srcu_invoke_callbacks(struct work_struct *work)
 {
+	long len;
 	bool more;
 	struct rcu_cblist ready_cbs;
 	struct rcu_head *rhp;
@@ -1182,6 +1265,7 @@ static void srcu_invoke_callbacks(struct work_struct *work)
 	/* We are on the job!  Extract and invoke ready callbacks. */
 	sdp->srcu_cblist_invoking = true;
 	rcu_segcblist_extract_done_cbs(&sdp->srcu_cblist, &ready_cbs);
+	len = ready_cbs.len;
 	spin_unlock_irq_rcu_node(sdp);
 	rhp = rcu_cblist_dequeue(&ready_cbs);
 	for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
@@ -1190,13 +1274,14 @@ static void srcu_invoke_callbacks(struct work_struct *work)
 		rhp->func(rhp);
 		local_bh_enable();
 	}
+	WARN_ON_ONCE(ready_cbs.len);
 
 	/*
 	 * Update counts, accelerate new callbacks, and if needed,
 	 * schedule another round of callback invocation.
 	 */
 	spin_lock_irq_rcu_node(sdp);
-	rcu_segcblist_insert_count(&sdp->srcu_cblist, &ready_cbs);
+	rcu_segcblist_add_len(&sdp->srcu_cblist, -len);
 	(void)rcu_segcblist_accelerate(&sdp->srcu_cblist,
 				       rcu_seq_snap(&ssp->srcu_gp_seq));
 	sdp->srcu_cblist_invoking = false;
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index 36607551f966..af7c19439f4e 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -1224,6 +1224,82 @@ void show_rcu_tasks_gp_kthreads(void)
 }
 #endif /* #ifndef CONFIG_TINY_RCU */
 
+#ifdef CONFIG_PROVE_RCU
+struct rcu_tasks_test_desc {
+	struct rcu_head rh;
+	const char *name;
+	bool notrun;
+};
+
+static struct rcu_tasks_test_desc tests[] = {
+	{
+		.name = "call_rcu_tasks()",
+		/* If not defined, the test is skipped. */
+		.notrun = !IS_ENABLED(CONFIG_TASKS_RCU),
+	},
+	{
+		.name = "call_rcu_tasks_rude()",
+		/* If not defined, the test is skipped. */
+		.notrun = !IS_ENABLED(CONFIG_TASKS_RUDE_RCU),
+	},
+	{
+		.name = "call_rcu_tasks_trace()",
+		/* If not defined, the test is skipped. */
+		.notrun = !IS_ENABLED(CONFIG_TASKS_TRACE_RCU)
+	}
+};
+
+static void test_rcu_tasks_callback(struct rcu_head *rhp)
+{
+	struct rcu_tasks_test_desc *rttd =
+		container_of(rhp, struct rcu_tasks_test_desc, rh);
+
+	pr_info("Callback from %s invoked.\n", rttd->name);
+
+	rttd->notrun = true;
+}
+
+static void rcu_tasks_initiate_self_tests(void)
+{
+	pr_info("Running RCU-tasks wait API self tests\n");
+#ifdef CONFIG_TASKS_RCU
+	synchronize_rcu_tasks();
+	call_rcu_tasks(&tests[0].rh, test_rcu_tasks_callback);
+#endif
+
+#ifdef CONFIG_TASKS_RUDE_RCU
+	synchronize_rcu_tasks_rude();
+	call_rcu_tasks_rude(&tests[1].rh, test_rcu_tasks_callback);
+#endif
+
+#ifdef CONFIG_TASKS_TRACE_RCU
+	synchronize_rcu_tasks_trace();
+	call_rcu_tasks_trace(&tests[2].rh, test_rcu_tasks_callback);
+#endif
+}
+
+static int rcu_tasks_verify_self_tests(void)
+{
+	int ret = 0;
+	int i;
+
+	for (i = 0; i < ARRAY_SIZE(tests); i++) {
+		if (!tests[i].notrun) {		// still hanging.
+			pr_err("%s has been failed.\n", tests[i].name);
+			ret = -1;
+		}
+	}
+
+	if (ret)
+		WARN_ON(1);
+
+	return ret;
+}
+late_initcall(rcu_tasks_verify_self_tests);
+#else /* #ifdef CONFIG_PROVE_RCU */
+static void rcu_tasks_initiate_self_tests(void) { }
+#endif /* #else #ifdef CONFIG_PROVE_RCU */
+
 void __init rcu_init_tasks_generic(void)
 {
 #ifdef CONFIG_TASKS_RCU
@@ -1237,6 +1313,9 @@ void __init rcu_init_tasks_generic(void)
 #ifdef CONFIG_TASKS_TRACE_RCU
 	rcu_spawn_tasks_trace_kthread();
 #endif
+
+	// Run the self-tests.
+	rcu_tasks_initiate_self_tests();
 }
 
 #else /* #ifdef CONFIG_TASKS_RCU_GENERIC */
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 40e5e3dd253e..0f4a6a3c057b 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -83,6 +83,9 @@ static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, rcu_data) = {
 	.dynticks_nesting = 1,
 	.dynticks_nmi_nesting = DYNTICK_IRQ_NONIDLE,
 	.dynticks = ATOMIC_INIT(RCU_DYNTICK_CTRL_CTR),
+#ifdef CONFIG_RCU_NOCB_CPU
+	.cblist.flags = SEGCBLIST_SOFTIRQ_ONLY,
+#endif
 };
 static struct rcu_state rcu_state = {
 	.level = { &rcu_state.node[0] },
@@ -100,8 +103,10 @@ static struct rcu_state rcu_state = {
 static bool dump_tree;
 module_param(dump_tree, bool, 0444);
 /* By default, use RCU_SOFTIRQ instead of rcuc kthreads. */
-static bool use_softirq = true;
+static bool use_softirq = !IS_ENABLED(CONFIG_PREEMPT_RT);
+#ifndef CONFIG_PREEMPT_RT
 module_param(use_softirq, bool, 0444);
+#endif
 /* Control rcu_node-tree auto-balancing at boot time. */
 static bool rcu_fanout_exact;
 module_param(rcu_fanout_exact, bool, 0444);
@@ -1495,6 +1500,8 @@ static bool rcu_accelerate_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
 	if (!rcu_segcblist_pend_cbs(&rdp->cblist))
 		return false;
 
+	trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCbPreAcc"));
+
 	/*
 	 * Callbacks are often registered with incomplete grace-period
 	 * information.  Something about the fact that getting exact
@@ -1515,6 +1522,8 @@ static bool rcu_accelerate_cbs(struct rcu_node *rnp, struct rcu_data *rdp)
 	else
 		trace_rcu_grace_period(rcu_state.name, gp_seq_req, TPS("AccReadyCB"));
 
+	trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCbPostAcc"));
+
 	return ret;
 }
 
@@ -1765,7 +1774,7 @@ static bool rcu_gp_init(void)
 	 * go offline later.  Please also refer to "Hotplug CPU" section
 	 * of RCU's Requirements documentation.
 	 */
-	rcu_state.gp_state = RCU_GP_ONOFF;
+	WRITE_ONCE(rcu_state.gp_state, RCU_GP_ONOFF);
 	rcu_for_each_leaf_node(rnp) {
 		smp_mb(); // Pair with barriers used when updating ->ofl_seq to odd values.
 		firstseq = READ_ONCE(rnp->ofl_seq);
@@ -1831,7 +1840,7 @@ static bool rcu_gp_init(void)
 	 * The grace period cannot complete until the initialization
 	 * process finishes, because this kthread handles both.
 	 */
-	rcu_state.gp_state = RCU_GP_INIT;
+	WRITE_ONCE(rcu_state.gp_state, RCU_GP_INIT);
 	rcu_for_each_node_breadth_first(rnp) {
 		rcu_gp_slow(gp_init_delay);
 		raw_spin_lock_irqsave_rcu_node(rnp, flags);
@@ -1930,17 +1939,22 @@ static void rcu_gp_fqs_loop(void)
 	ret = 0;
 	for (;;) {
 		if (!ret) {
-			rcu_state.jiffies_force_qs = jiffies + j;
+			WRITE_ONCE(rcu_state.jiffies_force_qs, jiffies + j);
+			/*
+			 * jiffies_force_qs before RCU_GP_WAIT_FQS state
+			 * update; required for stall checks.
+			 */
+			smp_wmb();
 			WRITE_ONCE(rcu_state.jiffies_kick_kthreads,
 				   jiffies + (j ? 3 * j : 2));
 		}
 		trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 				       TPS("fqswait"));
-		rcu_state.gp_state = RCU_GP_WAIT_FQS;
+		WRITE_ONCE(rcu_state.gp_state, RCU_GP_WAIT_FQS);
 		ret = swait_event_idle_timeout_exclusive(
 				rcu_state.gp_wq, rcu_gp_fqs_check_wake(&gf), j);
 		rcu_gp_torture_wait();
-		rcu_state.gp_state = RCU_GP_DOING_FQS;
+		WRITE_ONCE(rcu_state.gp_state, RCU_GP_DOING_FQS);
 		/* Locking provides needed memory barriers. */
 		/* If grace period done, leave loop. */
 		if (!READ_ONCE(rnp->qsmask) &&
@@ -2054,7 +2068,7 @@ static void rcu_gp_cleanup(void)
 	trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq, TPS("end"));
 	rcu_seq_end(&rcu_state.gp_seq);
 	ASSERT_EXCLUSIVE_WRITER(rcu_state.gp_seq);
-	rcu_state.gp_state = RCU_GP_IDLE;
+	WRITE_ONCE(rcu_state.gp_state, RCU_GP_IDLE);
 	/* Check for GP requests since above loop. */
 	rdp = this_cpu_ptr(&rcu_data);
 	if (!needgp && ULONG_CMP_LT(rnp->gp_seq, rnp->gp_seq_needed)) {
@@ -2093,12 +2107,12 @@ static int __noreturn rcu_gp_kthread(void *unused)
 		for (;;) {
 			trace_rcu_grace_period(rcu_state.name, rcu_state.gp_seq,
 					       TPS("reqwait"));
-			rcu_state.gp_state = RCU_GP_WAIT_GPS;
+			WRITE_ONCE(rcu_state.gp_state, RCU_GP_WAIT_GPS);
 			swait_event_idle_exclusive(rcu_state.gp_wq,
 					 READ_ONCE(rcu_state.gp_flags) &
 					 RCU_GP_FLAG_INIT);
 			rcu_gp_torture_wait();
-			rcu_state.gp_state = RCU_GP_DONE_GPS;
+			WRITE_ONCE(rcu_state.gp_state, RCU_GP_DONE_GPS);
 			/* Locking provides needed memory barrier. */
 			if (rcu_gp_init())
 				break;
@@ -2113,9 +2127,9 @@ static int __noreturn rcu_gp_kthread(void *unused)
 		rcu_gp_fqs_loop();
 
 		/* Handle grace-period end. */
-		rcu_state.gp_state = RCU_GP_CLEANUP;
+		WRITE_ONCE(rcu_state.gp_state, RCU_GP_CLEANUP);
 		rcu_gp_cleanup();
-		rcu_state.gp_state = RCU_GP_CLEANED;
+		WRITE_ONCE(rcu_state.gp_state, RCU_GP_CLEANED);
 	}
 }
 
@@ -2430,11 +2444,12 @@ int rcutree_dead_cpu(unsigned int cpu)
 static void rcu_do_batch(struct rcu_data *rdp)
 {
 	int div;
+	bool __maybe_unused empty;
 	unsigned long flags;
 	const bool offloaded = rcu_segcblist_is_offloaded(&rdp->cblist);
 	struct rcu_head *rhp;
 	struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
-	long bl, count;
+	long bl, count = 0;
 	long pending, tlimit = 0;
 
 	/* If no callbacks are ready, just return. */
@@ -2471,14 +2486,18 @@ static void rcu_do_batch(struct rcu_data *rdp)
 	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
 	if (offloaded)
 		rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
+
+	trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCbDequeued"));
 	rcu_nocb_unlock_irqrestore(rdp, flags);
 
 	/* Invoke callbacks. */
 	tick_dep_set_task(current, TICK_DEP_BIT_RCU);
 	rhp = rcu_cblist_dequeue(&rcl);
+
 	for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
 		rcu_callback_t f;
 
+		count++;
 		debug_rcu_head_unqueue(rhp);
 
 		rcu_lock_acquire(&rcu_callback_map);
@@ -2492,21 +2511,19 @@ static void rcu_do_batch(struct rcu_data *rdp)
 
 		/*
 		 * Stop only if limit reached and CPU has something to do.
-		 * Note: The rcl structure counts down from zero.
 		 */
-		if (-rcl.len >= bl && !offloaded &&
+		if (count >= bl && !offloaded &&
 		    (need_resched() ||
 		     (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
 			break;
 		if (unlikely(tlimit)) {
 			/* only call local_clock() every 32 callbacks */
-			if (likely((-rcl.len & 31) || local_clock() < tlimit))
+			if (likely((count & 31) || local_clock() < tlimit))
 				continue;
 			/* Exceeded the time limit, so leave. */
 			break;
 		}
-		if (offloaded) {
-			WARN_ON_ONCE(in_serving_softirq());
+		if (!in_serving_softirq()) {
 			local_bh_enable();
 			lockdep_assert_irqs_enabled();
 			cond_resched_tasks_rcu_qs();
@@ -2517,15 +2534,13 @@ static void rcu_do_batch(struct rcu_data *rdp)
 
 	local_irq_save(flags);
 	rcu_nocb_lock(rdp);
-	count = -rcl.len;
 	rdp->n_cbs_invoked += count;
 	trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(),
 			    is_idle_task(current), rcu_is_callbacks_kthread());
 
 	/* Update counts and requeue any remaining callbacks. */
 	rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl);
-	smp_mb(); /* List handling before counting for rcu_barrier(). */
-	rcu_segcblist_insert_count(&rdp->cblist, &rcl);
+	rcu_segcblist_add_len(&rdp->cblist, -count);
 
 	/* Reinstate batch limit if we have worked down the excess. */
 	count = rcu_segcblist_n_cbs(&rdp->cblist);
@@ -2543,9 +2558,12 @@ static void rcu_do_batch(struct rcu_data *rdp)
 	 * The following usually indicates a double call_rcu().  To track
 	 * this down, try building with CONFIG_DEBUG_OBJECTS_RCU_HEAD=y.
 	 */
-	WARN_ON_ONCE(count == 0 && !rcu_segcblist_empty(&rdp->cblist));
+	empty = rcu_segcblist_empty(&rdp->cblist);
+	WARN_ON_ONCE(count == 0 && !empty);
 	WARN_ON_ONCE(!IS_ENABLED(CONFIG_RCU_NOCB_CPU) &&
-		     count != 0 && rcu_segcblist_empty(&rdp->cblist));
+		     count != 0 && empty);
+	WARN_ON_ONCE(count == 0 && rcu_segcblist_n_segment_cbs(&rdp->cblist) != 0);
+	WARN_ON_ONCE(!empty && rcu_segcblist_n_segment_cbs(&rdp->cblist) == 0);
 
 	rcu_nocb_unlock_irqrestore(rdp, flags);
 
@@ -2566,6 +2584,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
 void rcu_sched_clock_irq(int user)
 {
 	trace_rcu_utilization(TPS("Start scheduler-tick"));
+	lockdep_assert_irqs_disabled();
 	raw_cpu_inc(rcu_data.ticks_this_gp);
 	/* The load-acquire pairs with the store-release setting to true. */
 	if (smp_load_acquire(this_cpu_ptr(&rcu_data.rcu_urgent_qs))) {
@@ -2579,6 +2598,7 @@ void rcu_sched_clock_irq(int user)
 	rcu_flavor_sched_clock_irq(user);
 	if (rcu_pending(user))
 		invoke_rcu_core();
+	lockdep_assert_irqs_disabled();
 
 	trace_rcu_utilization(TPS("End scheduler-tick"));
 }
@@ -2688,7 +2708,7 @@ static __latent_entropy void rcu_core(void)
 	unsigned long flags;
 	struct rcu_data *rdp = raw_cpu_ptr(&rcu_data);
 	struct rcu_node *rnp = rdp->mynode;
-	const bool offloaded = rcu_segcblist_is_offloaded(&rdp->cblist);
+	const bool do_batch = !rcu_segcblist_completely_offloaded(&rdp->cblist);
 
 	if (cpu_is_offline(smp_processor_id()))
 		return;
@@ -2708,17 +2728,17 @@ static __latent_entropy void rcu_core(void)
 
 	/* No grace period and unregistered callbacks? */
 	if (!rcu_gp_in_progress() &&
-	    rcu_segcblist_is_enabled(&rdp->cblist) && !offloaded) {
-		local_irq_save(flags);
+	    rcu_segcblist_is_enabled(&rdp->cblist) && do_batch) {
+		rcu_nocb_lock_irqsave(rdp, flags);
 		if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
 			rcu_accelerate_cbs_unlocked(rnp, rdp);
-		local_irq_restore(flags);
+		rcu_nocb_unlock_irqrestore(rdp, flags);
 	}
 
 	rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check());
 
 	/* If there are callbacks ready, invoke them. */
-	if (!offloaded && rcu_segcblist_ready_cbs(&rdp->cblist) &&
+	if (do_batch && rcu_segcblist_ready_cbs(&rdp->cblist) &&
 	    likely(READ_ONCE(rcu_scheduler_fully_active)))
 		rcu_do_batch(rdp);
 
@@ -2941,6 +2961,7 @@ static void check_cb_ovld(struct rcu_data *rdp)
 static void
 __call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
+	static atomic_t doublefrees;
 	unsigned long flags;
 	struct rcu_data *rdp;
 	bool was_alldone;
@@ -2954,8 +2975,10 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func)
 		 * Use rcu:rcu_callback trace event to find the previous
 		 * time callback was passed to __call_rcu().
 		 */
-		WARN_ONCE(1, "__call_rcu(): Double-freed CB %p->%pS()!!!\n",
-			  head, head->func);
+		if (atomic_inc_return(&doublefrees) < 4) {
+			pr_err("%s(): Double-freed CB %p->%pS()!!!  ", __func__, head, head->func);
+			mem_dump_obj(head);
+		}
 		WRITE_ONCE(head->func, rcu_leak_callback);
 		return;
 	}
@@ -2989,6 +3012,8 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func)
 		trace_rcu_callback(rcu_state.name, head,
 				   rcu_segcblist_n_cbs(&rdp->cblist));
 
+	trace_rcu_segcb_stats(&rdp->cblist, TPS("SegCBQueued"));
+
 	/* Go handle any RCU core processing required. */
 	if (unlikely(rcu_segcblist_is_offloaded(&rdp->cblist))) {
 		__call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */
@@ -3498,6 +3523,7 @@ void kvfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 		goto unlock_return;
 	}
 
+	kasan_record_aux_stack(ptr);
 	success = kvfree_call_rcu_add_ptr_to_bulk(krcp, ptr);
 	if (!success) {
 		run_page_cache_worker(krcp);
@@ -3747,6 +3773,8 @@ static int rcu_pending(int user)
 	struct rcu_data *rdp = this_cpu_ptr(&rcu_data);
 	struct rcu_node *rnp = rdp->mynode;
 
+	lockdep_assert_irqs_disabled();
+
 	/* Check for CPU stalls, if enabled. */
 	check_cpu_stall(rdp);
 
@@ -4001,12 +4029,18 @@ int rcutree_prepare_cpu(unsigned int cpu)
 	rdp->qlen_last_fqs_check = 0;
 	rdp->n_force_qs_snap = rcu_state.n_force_qs;
 	rdp->blimit = blimit;
-	if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
-	    !rcu_segcblist_is_offloaded(&rdp->cblist))
-		rcu_segcblist_init(&rdp->cblist);  /* Re-enable callbacks. */
 	rdp->dynticks_nesting = 1;	/* CPU not up, no tearing. */
 	rcu_dynticks_eqs_online();
 	raw_spin_unlock_rcu_node(rnp);		/* irqs remain disabled. */
+	/*
+	 * Lock in case the CB/GP kthreads are still around handling
+	 * old callbacks (longer term we should flush all callbacks
+	 * before completing CPU offline)
+	 */
+	rcu_nocb_lock(rdp);
+	if (rcu_segcblist_empty(&rdp->cblist)) /* No early-boot CBs? */
+		rcu_segcblist_init(&rdp->cblist);  /* Re-enable callbacks. */
+	rcu_nocb_unlock(rdp);
 
 	/*
 	 * Add CPU to leaf rcu_node pending-online bitmask.  Any needed
@@ -4159,6 +4193,9 @@ void rcu_report_dead(unsigned int cpu)
 	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
 	struct rcu_node *rnp = rdp->mynode;  /* Outgoing CPU's rdp & rnp. */
 
+	// Do any dangling deferred wakeups.
+	do_nocb_deferred_wakeup(rdp);
+
 	/* QS for any half-done expedited grace period. */
 	preempt_disable();
 	rcu_report_exp_rdp(this_cpu_ptr(&rcu_data));
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 7708ed161f4a..5d359b9f9fec 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -201,6 +201,7 @@ struct rcu_data {
 	/* 5) Callback offloading. */
 #ifdef CONFIG_RCU_NOCB_CPU
 	struct swait_queue_head nocb_cb_wq; /* For nocb kthreads to sleep on. */
+	struct swait_queue_head nocb_state_wq; /* For offloading state changes */
 	struct task_struct *nocb_gp_kthread;
 	raw_spinlock_t nocb_lock;	/* Guard following pair of fields. */
 	atomic_t nocb_lock_contended;	/* Contention experienced. */
@@ -256,6 +257,7 @@ struct rcu_data {
 };
 
 /* Values for nocb_defer_wakeup field in struct rcu_data. */
+#define RCU_NOCB_WAKE_OFF	-1
 #define RCU_NOCB_WAKE_NOT	0
 #define RCU_NOCB_WAKE		1
 #define RCU_NOCB_WAKE_FORCE	2
diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
index 8760b6ead770..6c6ff06d4ae6 100644
--- a/kernel/rcu/tree_exp.h
+++ b/kernel/rcu/tree_exp.h
@@ -545,7 +545,7 @@ static void synchronize_rcu_expedited_wait(void)
 			data_race(rnp_root->expmask),
 			".T"[!!data_race(rnp_root->exp_tasks)]);
 		if (ndetected) {
-			pr_err("blocking rcu_node structures:");
+			pr_err("blocking rcu_node structures (internal RCU debug):");
 			rcu_for_each_node_breadth_first(rnp) {
 				if (rnp == rnp_root)
 					continue; /* printed unconditionally */
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 7e291ce0a1d6..231a0c6cf03c 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -682,6 +682,7 @@ static void rcu_flavor_sched_clock_irq(int user)
 {
 	struct task_struct *t = current;
 
+	lockdep_assert_irqs_disabled();
 	if (user || rcu_is_cpu_rrupt_from_idle()) {
 		rcu_note_voluntary_context_switch(current);
 	}
@@ -1665,6 +1666,8 @@ static void wake_nocb_gp(struct rcu_data *rdp, bool force,
 static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
 			       const char *reason)
 {
+	if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_OFF)
+		return;
 	if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
 		mod_timer(&rdp->nocb_timer, jiffies + 1);
 	if (rdp->nocb_defer_wakeup < waketype)
@@ -1929,6 +1932,52 @@ static void do_nocb_bypass_wakeup_timer(struct timer_list *t)
 }
 
 /*
+ * Check if we ignore this rdp.
+ *
+ * We check that without holding the nocb lock but
+ * we make sure not to miss a freshly offloaded rdp
+ * with the current ordering:
+ *
+ *  rdp_offload_toggle()        nocb_gp_enabled_cb()
+ * -------------------------   ----------------------------
+ *    WRITE flags                 LOCK nocb_gp_lock
+ *    LOCK nocb_gp_lock           READ/WRITE nocb_gp_sleep
+ *    READ/WRITE nocb_gp_sleep    UNLOCK nocb_gp_lock
+ *    UNLOCK nocb_gp_lock         READ flags
+ */
+static inline bool nocb_gp_enabled_cb(struct rcu_data *rdp)
+{
+	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_GP;
+
+	return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_gp_update_state(struct rcu_data *rdp, bool *needwake_state)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+
+	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP)) {
+			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_GP);
+			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+				*needwake_state = true;
+		}
+		return true;
+	}
+
+	/*
+	 * De-offloading. Clear our flag and notify the de-offload worker.
+	 * We will ignore this rdp until it ever gets re-offloaded.
+	 */
+	WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+	rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_GP);
+	if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB))
+		*needwake_state = true;
+	return false;
+}
+
+
+/*
  * No-CBs GP kthreads come here to wait for additional callbacks to show up
  * or for grace periods to end.
  */
@@ -1956,8 +2005,18 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
 	 */
 	WARN_ON_ONCE(my_rdp->nocb_gp_rdp != my_rdp);
 	for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
+		bool needwake_state = false;
+
+		if (!nocb_gp_enabled_cb(rdp))
+			continue;
 		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
 		rcu_nocb_lock_irqsave(rdp, flags);
+		if (!nocb_gp_update_state(rdp, &needwake_state)) {
+			rcu_nocb_unlock_irqrestore(rdp, flags);
+			if (needwake_state)
+				swake_up_one(&rdp->nocb_state_wq);
+			continue;
+		}
 		bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
 		if (bypass_ncbs &&
 		    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
@@ -1967,6 +2026,8 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
 			bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
 		} else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
 			rcu_nocb_unlock_irqrestore(rdp, flags);
+			if (needwake_state)
+				swake_up_one(&rdp->nocb_state_wq);
 			continue; /* No callbacks here, try next. */
 		}
 		if (bypass_ncbs) {
@@ -2018,6 +2079,8 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
 		}
 		if (needwake_gp)
 			rcu_gp_kthread_wake();
+		if (needwake_state)
+			swake_up_one(&rdp->nocb_state_wq);
 	}
 
 	my_rdp->nocb_gp_bypass = bypass;
@@ -2081,14 +2144,27 @@ static int rcu_nocb_gp_kthread(void *arg)
 	return 0;
 }
 
+static inline bool nocb_cb_can_run(struct rcu_data *rdp)
+{
+	u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
+	return rcu_segcblist_test_flags(&rdp->cblist, flags);
+}
+
+static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
+{
+	return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
+}
+
 /*
  * Invoke any ready callbacks from the corresponding no-CBs CPU,
  * then, if there are no more, wait for more to appear.
  */
 static void nocb_cb_wait(struct rcu_data *rdp)
 {
+	struct rcu_segcblist *cblist = &rdp->cblist;
 	unsigned long cur_gp_seq;
 	unsigned long flags;
+	bool needwake_state = false;
 	bool needwake_gp = false;
 	struct rcu_node *rnp = rdp->mynode;
 
@@ -2100,32 +2176,55 @@ static void nocb_cb_wait(struct rcu_data *rdp)
 	local_bh_enable();
 	lockdep_assert_irqs_enabled();
 	rcu_nocb_lock_irqsave(rdp, flags);
-	if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
+	if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
 	    rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
 	    raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
 		needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
 		raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
 	}
-	if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
-		rcu_nocb_unlock_irqrestore(rdp, flags);
-		if (needwake_gp)
-			rcu_gp_kthread_wake();
-		return;
-	}
 
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
 	WRITE_ONCE(rdp->nocb_cb_sleep, true);
+
+	if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB)) {
+			rcu_segcblist_set_flags(cblist, SEGCBLIST_KTHREAD_CB);
+			if (rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+				needwake_state = true;
+		}
+		if (rcu_segcblist_ready_cbs(cblist))
+			WRITE_ONCE(rdp->nocb_cb_sleep, false);
+	} else {
+		/*
+		 * De-offloading. Clear our flag and notify the de-offload worker.
+		 * We won't touch the callbacks and keep sleeping until we ever
+		 * get re-offloaded.
+		 */
+		WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB));
+		rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
+		if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
+			needwake_state = true;
+	}
+
+	if (rdp->nocb_cb_sleep)
+		trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
+
 	rcu_nocb_unlock_irqrestore(rdp, flags);
 	if (needwake_gp)
 		rcu_gp_kthread_wake();
-	swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
-				 !READ_ONCE(rdp->nocb_cb_sleep));
-	if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */
-		/* ^^^ Ensure CB invocation follows _sleep test. */
-		return;
-	}
-	WARN_ON(signal_pending(current));
-	trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+
+	if (needwake_state)
+		swake_up_one(&rdp->nocb_state_wq);
+
+	do {
+		swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
+						    nocb_cb_wait_cond(rdp));
+
+		// VVV Ensure CB invocation follows _sleep test.
+		if (smp_load_acquire(&rdp->nocb_cb_sleep)) { // ^^^
+			WARN_ON(signal_pending(current));
+			trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
+		}
+	} while (!nocb_cb_can_run(rdp));
 }
 
 /*
@@ -2148,7 +2247,7 @@ static int rcu_nocb_cb_kthread(void *arg)
 /* Is a deferred wakeup of rcu_nocb_kthread() required? */
 static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
 {
-	return READ_ONCE(rdp->nocb_defer_wakeup);
+	return READ_ONCE(rdp->nocb_defer_wakeup) > RCU_NOCB_WAKE_NOT;
 }
 
 /* Do a deferred wakeup of rcu_nocb_kthread(). */
@@ -2187,6 +2286,195 @@ static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
 		do_nocb_deferred_wakeup_common(rdp);
 }
 
+static int rdp_offload_toggle(struct rcu_data *rdp,
+			       bool offload, unsigned long flags)
+	__releases(rdp->nocb_lock)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
+	bool wake_gp = false;
+
+	rcu_segcblist_offload(cblist, offload);
+
+	if (rdp->nocb_cb_sleep)
+		rdp->nocb_cb_sleep = false;
+	rcu_nocb_unlock_irqrestore(rdp, flags);
+
+	/*
+	 * Ignore former value of nocb_cb_sleep and force wake up as it could
+	 * have been spuriously set to false already.
+	 */
+	swake_up_one(&rdp->nocb_cb_wq);
+
+	raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
+	if (rdp_gp->nocb_gp_sleep) {
+		rdp_gp->nocb_gp_sleep = false;
+		wake_gp = true;
+	}
+	raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
+
+	if (wake_gp)
+		wake_up_process(rdp_gp->nocb_gp_kthread);
+
+	return 0;
+}
+
+static int __rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long flags;
+	int ret;
+
+	pr_info("De-offloading %d\n", rdp->cpu);
+
+	rcu_nocb_lock_irqsave(rdp, flags);
+	/*
+	 * If there are still pending work offloaded, the offline
+	 * CPU won't help much handling them.
+	 */
+	if (cpu_is_offline(rdp->cpu) && !rcu_segcblist_empty(&rdp->cblist)) {
+		rcu_nocb_unlock_irqrestore(rdp, flags);
+		return -EBUSY;
+	}
+
+	ret = rdp_offload_toggle(rdp, false, flags);
+	swait_event_exclusive(rdp->nocb_state_wq,
+			      !rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB |
+							SEGCBLIST_KTHREAD_GP));
+	rcu_nocb_lock_irqsave(rdp, flags);
+	/* Make sure nocb timer won't stay around */
+	WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_OFF);
+	rcu_nocb_unlock_irqrestore(rdp, flags);
+	del_timer_sync(&rdp->nocb_timer);
+
+	/*
+	 * Flush bypass. While IRQs are disabled and once we set
+	 * SEGCBLIST_SOFTIRQ_ONLY, no callback is supposed to be
+	 * enqueued on bypass.
+	 */
+	rcu_nocb_lock_irqsave(rdp, flags);
+	rcu_nocb_flush_bypass(rdp, NULL, jiffies);
+	rcu_segcblist_set_flags(cblist, SEGCBLIST_SOFTIRQ_ONLY);
+	/*
+	 * With SEGCBLIST_SOFTIRQ_ONLY, we can't use
+	 * rcu_nocb_unlock_irqrestore() anymore. Theoretically we
+	 * could set SEGCBLIST_SOFTIRQ_ONLY with cb unlocked and IRQs
+	 * disabled now, but let's be paranoid.
+	 */
+	raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+
+	return ret;
+}
+
+static long rcu_nocb_rdp_deoffload(void *arg)
+{
+	struct rcu_data *rdp = arg;
+
+	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+	return __rcu_nocb_rdp_deoffload(rdp);
+}
+
+int rcu_nocb_cpu_deoffload(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int ret = 0;
+
+	if (rdp == rdp->nocb_gp_rdp) {
+		pr_info("Can't deoffload an rdp GP leader (yet)\n");
+		return -EINVAL;
+	}
+	mutex_lock(&rcu_state.barrier_mutex);
+	cpus_read_lock();
+	if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
+		if (cpu_online(cpu))
+			ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
+		else
+			ret = __rcu_nocb_rdp_deoffload(rdp);
+		if (!ret)
+			cpumask_clear_cpu(cpu, rcu_nocb_mask);
+	}
+	cpus_read_unlock();
+	mutex_unlock(&rcu_state.barrier_mutex);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_deoffload);
+
+static int __rcu_nocb_rdp_offload(struct rcu_data *rdp)
+{
+	struct rcu_segcblist *cblist = &rdp->cblist;
+	unsigned long flags;
+	int ret;
+
+	/*
+	 * For now we only support re-offload, ie: the rdp must have been
+	 * offloaded on boot first.
+	 */
+	if (!rdp->nocb_gp_rdp)
+		return -EINVAL;
+
+	pr_info("Offloading %d\n", rdp->cpu);
+	/*
+	 * Can't use rcu_nocb_lock_irqsave() while we are in
+	 * SEGCBLIST_SOFTIRQ_ONLY mode.
+	 */
+	raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+	/* Re-enable nocb timer */
+	WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+	/*
+	 * We didn't take the nocb lock while working on the
+	 * rdp->cblist in SEGCBLIST_SOFTIRQ_ONLY mode.
+	 * Every modifications that have been done previously on
+	 * rdp->cblist must be visible remotely by the nocb kthreads
+	 * upon wake up after reading the cblist flags.
+	 *
+	 * The layout against nocb_lock enforces that ordering:
+	 *
+	 *  __rcu_nocb_rdp_offload()   nocb_cb_wait()/nocb_gp_wait()
+	 * -------------------------   ----------------------------
+	 *      WRITE callbacks           rcu_nocb_lock()
+	 *      rcu_nocb_lock()           READ flags
+	 *      WRITE flags               READ callbacks
+	 *      rcu_nocb_unlock()         rcu_nocb_unlock()
+	 */
+	ret = rdp_offload_toggle(rdp, true, flags);
+	swait_event_exclusive(rdp->nocb_state_wq,
+			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_CB) &&
+			      rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP));
+
+	return ret;
+}
+
+static long rcu_nocb_rdp_offload(void *arg)
+{
+	struct rcu_data *rdp = arg;
+
+	WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());
+	return __rcu_nocb_rdp_offload(rdp);
+}
+
+int rcu_nocb_cpu_offload(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+	int ret = 0;
+
+	mutex_lock(&rcu_state.barrier_mutex);
+	cpus_read_lock();
+	if (!rcu_segcblist_is_offloaded(&rdp->cblist)) {
+		if (cpu_online(cpu))
+			ret = work_on_cpu(cpu, rcu_nocb_rdp_offload, rdp);
+		else
+			ret = __rcu_nocb_rdp_offload(rdp);
+		if (!ret)
+			cpumask_set_cpu(cpu, rcu_nocb_mask);
+	}
+	cpus_read_unlock();
+	mutex_unlock(&rcu_state.barrier_mutex);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rcu_nocb_cpu_offload);
+
 void __init rcu_init_nohz(void)
 {
 	int cpu;
@@ -2229,7 +2517,9 @@ void __init rcu_init_nohz(void)
 		rdp = per_cpu_ptr(&rcu_data, cpu);
 		if (rcu_segcblist_empty(&rdp->cblist))
 			rcu_segcblist_init(&rdp->cblist);
-		rcu_segcblist_offload(&rdp->cblist);
+		rcu_segcblist_offload(&rdp->cblist, true);
+		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
+		rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_GP);
 	}
 	rcu_organize_nocb_kthreads();
 }
@@ -2239,6 +2529,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
 {
 	init_swait_queue_head(&rdp->nocb_cb_wq);
 	init_swait_queue_head(&rdp->nocb_gp_wq);
+	init_swait_queue_head(&rdp->nocb_state_wq);
 	raw_spin_lock_init(&rdp->nocb_lock);
 	raw_spin_lock_init(&rdp->nocb_bypass_lock);
 	raw_spin_lock_init(&rdp->nocb_gp_lock);
@@ -2381,6 +2672,19 @@ void rcu_bind_current_to_nocb(void)
 }
 EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
 
+// The ->on_cpu field is available only in CONFIG_SMP=y, so...
+#ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+	return tsp && tsp->state == TASK_RUNNING && !tsp->on_cpu ? "!" : "";
+}
+#else // #ifdef CONFIG_SMP
+static char *show_rcu_should_be_on_cpu(struct task_struct *tsp)
+{
+	return "";
+}
+#endif // #else #ifdef CONFIG_SMP
+
 /*
  * Dump out nocb grace-period kthread state for the specified rcu_data
  * structure.
@@ -2389,7 +2693,7 @@ static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
 {
 	struct rcu_node *rnp = rdp->mynode;
 
-	pr_info("nocb GP %d %c%c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu\n",
+	pr_info("nocb GP %d %c%c%c%c%c%c %c[%c%c] %c%c:%ld rnp %d:%d %lu %c CPU %d%s\n",
 		rdp->cpu,
 		"kK"[!!rdp->nocb_gp_kthread],
 		"lL"[raw_spin_is_locked(&rdp->nocb_gp_lock)],
@@ -2403,12 +2707,17 @@ static void show_rcu_nocb_gp_state(struct rcu_data *rdp)
 		".B"[!!rdp->nocb_gp_bypass],
 		".G"[!!rdp->nocb_gp_gp],
 		(long)rdp->nocb_gp_seq,
-		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops));
+		rnp->grplo, rnp->grphi, READ_ONCE(rdp->nocb_gp_loops),
+		rdp->nocb_gp_kthread ? task_state_to_char(rdp->nocb_gp_kthread) : '.',
+		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
 }
 
 /* Dump out nocb kthread state for the specified rcu_data structure. */
 static void show_rcu_nocb_state(struct rcu_data *rdp)
 {
+	char bufw[20];
+	char bufr[20];
 	struct rcu_segcblist *rsclp = &rdp->cblist;
 	bool waslocked;
 	bool wastimer;
@@ -2417,8 +2726,11 @@ static void show_rcu_nocb_state(struct rcu_data *rdp)
 	if (rdp->nocb_gp_rdp == rdp)
 		show_rcu_nocb_gp_state(rdp);
 
-	pr_info("   CB %d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%c%c%c q%ld\n",
+	sprintf(bufw, "%ld", rsclp->gp_seq[RCU_WAIT_TAIL]);
+	sprintf(bufr, "%ld", rsclp->gp_seq[RCU_NEXT_READY_TAIL]);
+	pr_info("   CB %d^%d->%d %c%c%c%c%c%c F%ld L%ld C%d %c%c%s%c%s%c%c q%ld %c CPU %d%s\n",
 		rdp->cpu, rdp->nocb_gp_rdp->cpu,
+		rdp->nocb_next_cb_rdp ? rdp->nocb_next_cb_rdp->cpu : -1,
 		"kK"[!!rdp->nocb_cb_kthread],
 		"bB"[raw_spin_is_locked(&rdp->nocb_bypass_lock)],
 		"cC"[!!atomic_read(&rdp->nocb_lock_contended)],
@@ -2429,11 +2741,16 @@ static void show_rcu_nocb_state(struct rcu_data *rdp)
 		jiffies - rdp->nocb_nobypass_last,
 		rdp->nocb_nobypass_count,
 		".D"[rcu_segcblist_ready_cbs(rsclp)],
-		".W"[!rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL)],
-		".R"[!rcu_segcblist_restempty(rsclp, RCU_WAIT_TAIL)],
-		".N"[!rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL)],
+		".W"[!rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL)],
+		rcu_segcblist_segempty(rsclp, RCU_WAIT_TAIL) ? "" : bufw,
+		".R"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL)],
+		rcu_segcblist_segempty(rsclp, RCU_NEXT_READY_TAIL) ? "" : bufr,
+		".N"[!rcu_segcblist_segempty(rsclp, RCU_NEXT_TAIL)],
 		".B"[!!rcu_cblist_n_cbs(&rdp->nocb_bypass)],
-		rcu_segcblist_n_cbs(&rdp->cblist));
+		rcu_segcblist_n_cbs(&rdp->cblist),
+		rdp->nocb_cb_kthread ? task_state_to_char(rdp->nocb_cb_kthread) : '.',
+		rdp->nocb_cb_kthread ? (int)task_cpu(rdp->nocb_gp_kthread) : -1,
+		show_rcu_should_be_on_cpu(rdp->nocb_cb_kthread));
 
 	/* It is OK for GP kthreads to have GP state. */
 	if (rdp->nocb_gp_rdp == rdp)
diff --git a/kernel/rcu/tree_stall.h b/kernel/rcu/tree_stall.h
index 70d48c52fabc..475b26171b20 100644
--- a/kernel/rcu/tree_stall.h
+++ b/kernel/rcu/tree_stall.h
@@ -266,6 +266,7 @@ static int rcu_print_task_stall(struct rcu_node *rnp, unsigned long flags)
 	struct task_struct *t;
 	struct task_struct *ts[8];
 
+	lockdep_assert_irqs_disabled();
 	if (!rcu_preempt_blocked_readers_cgp(rnp))
 		return 0;
 	pr_err("\tTasks blocked on level-%d rcu_node (CPUs %d-%d):",
@@ -290,6 +291,7 @@ static int rcu_print_task_stall(struct rcu_node *rnp, unsigned long flags)
 				".q"[rscr.rs.b.need_qs],
 				".e"[rscr.rs.b.exp_hint],
 				".l"[rscr.on_blkd_list]);
+		lockdep_assert_irqs_disabled();
 		put_task_struct(t);
 		ndetected++;
 	}
@@ -333,9 +335,12 @@ static void rcu_dump_cpu_stacks(void)
 	rcu_for_each_leaf_node(rnp) {
 		raw_spin_lock_irqsave_rcu_node(rnp, flags);
 		for_each_leaf_node_possible_cpu(rnp, cpu)
-			if (rnp->qsmask & leaf_node_cpu_bit(rnp, cpu))
-				if (!trigger_single_cpu_backtrace(cpu))
+			if (rnp->qsmask & leaf_node_cpu_bit(rnp, cpu)) {
+				if (cpu_is_offline(cpu))
+					pr_err("Offline CPU %d blocking current GP.\n", cpu);
+				else if (!trigger_single_cpu_backtrace(cpu))
 					dump_cpu_task(cpu);
+			}
 		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 	}
 }
@@ -449,25 +454,66 @@ static void print_cpu_stall_info(int cpu)
 /* Complain about starvation of grace-period kthread.  */
 static void rcu_check_gp_kthread_starvation(void)
 {
+	int cpu;
 	struct task_struct *gpk = rcu_state.gp_kthread;
 	unsigned long j;
 
 	if (rcu_is_gp_kthread_starving(&j)) {
+		cpu = gpk ? task_cpu(gpk) : -1;
 		pr_err("%s kthread starved for %ld jiffies! g%ld f%#x %s(%d) ->state=%#lx ->cpu=%d\n",
 		       rcu_state.name, j,
 		       (long)rcu_seq_current(&rcu_state.gp_seq),
 		       data_race(rcu_state.gp_flags),
 		       gp_state_getname(rcu_state.gp_state), rcu_state.gp_state,
-		       gpk ? gpk->state : ~0, gpk ? task_cpu(gpk) : -1);
+		       gpk ? gpk->state : ~0, cpu);
 		if (gpk) {
 			pr_err("\tUnless %s kthread gets sufficient CPU time, OOM is now expected behavior.\n", rcu_state.name);
 			pr_err("RCU grace-period kthread stack dump:\n");
 			sched_show_task(gpk);
+			if (cpu >= 0) {
+				if (cpu_is_offline(cpu)) {
+					pr_err("RCU GP kthread last ran on offline CPU %d.\n", cpu);
+				} else  {
+					pr_err("Stack dump where RCU GP kthread last ran:\n");
+					if (!trigger_single_cpu_backtrace(cpu))
+						dump_cpu_task(cpu);
+				}
+			}
 			wake_up_process(gpk);
 		}
 	}
 }
 
+/* Complain about missing wakeups from expired fqs wait timer */
+static void rcu_check_gp_kthread_expired_fqs_timer(void)
+{
+	struct task_struct *gpk = rcu_state.gp_kthread;
+	short gp_state;
+	unsigned long jiffies_fqs;
+	int cpu;
+
+	/*
+	 * Order reads of .gp_state and .jiffies_force_qs.
+	 * Matching smp_wmb() is present in rcu_gp_fqs_loop().
+	 */
+	gp_state = smp_load_acquire(&rcu_state.gp_state);
+	jiffies_fqs = READ_ONCE(rcu_state.jiffies_force_qs);
+
+	if (gp_state == RCU_GP_WAIT_FQS &&
+	    time_after(jiffies, jiffies_fqs + RCU_STALL_MIGHT_MIN) &&
+	    gpk && !READ_ONCE(gpk->on_rq)) {
+		cpu = task_cpu(gpk);
+		pr_err("%s kthread timer wakeup didn't happen for %ld jiffies! g%ld f%#x %s(%d) ->state=%#lx\n",
+		       rcu_state.name, (jiffies - jiffies_fqs),
+		       (long)rcu_seq_current(&rcu_state.gp_seq),
+		       data_race(rcu_state.gp_flags),
+		       gp_state_getname(RCU_GP_WAIT_FQS), RCU_GP_WAIT_FQS,
+		       gpk->state);
+		pr_err("\tPossible timer handling issue on cpu=%d timer-softirq=%u\n",
+		       cpu, kstat_softirqs_cpu(TIMER_SOFTIRQ, cpu));
+	}
+}
+
 static void print_other_cpu_stall(unsigned long gp_seq, unsigned long gps)
 {
 	int cpu;
@@ -478,6 +524,8 @@ static void print_other_cpu_stall(unsigned long gp_seq, unsigned long gps)
 	struct rcu_node *rnp;
 	long totqlen = 0;
 
+	lockdep_assert_irqs_disabled();
+
 	/* Kick and suppress, if so configured. */
 	rcu_stall_kick_kthreads();
 	if (rcu_stall_is_suppressed())
@@ -499,6 +547,7 @@ static void print_other_cpu_stall(unsigned long gp_seq, unsigned long gps)
 				}
 		}
 		ndetected += rcu_print_task_stall(rnp, flags); // Releases rnp->lock.
+		lockdep_assert_irqs_disabled();
 	}
 
 	for_each_possible_cpu(cpu)
@@ -529,6 +578,7 @@ static void print_other_cpu_stall(unsigned long gp_seq, unsigned long gps)
 		WRITE_ONCE(rcu_state.jiffies_stall,
 			   jiffies + 3 * rcu_jiffies_till_stall_check() + 3);
 
+	rcu_check_gp_kthread_expired_fqs_timer();
 	rcu_check_gp_kthread_starvation();
 
 	panic_on_rcu_stall();
@@ -544,6 +594,8 @@ static void print_cpu_stall(unsigned long gps)
 	struct rcu_node *rnp = rcu_get_root();
 	long totqlen = 0;
 
+	lockdep_assert_irqs_disabled();
+
 	/* Kick and suppress, if so configured. */
 	rcu_stall_kick_kthreads();
 	if (rcu_stall_is_suppressed())
@@ -564,6 +616,7 @@ static void print_cpu_stall(unsigned long gps)
 		jiffies - gps,
 		(long)rcu_seq_current(&rcu_state.gp_seq), totqlen);
 
+	rcu_check_gp_kthread_expired_fqs_timer();
 	rcu_check_gp_kthread_starvation();
 
 	rcu_dump_cpu_stacks();
@@ -598,6 +651,7 @@ static void check_cpu_stall(struct rcu_data *rdp)
 	unsigned long js;
 	struct rcu_node *rnp;
 
+	lockdep_assert_irqs_disabled();
 	if ((rcu_stall_is_suppressed() && !READ_ONCE(rcu_kick_kthreads)) ||
 	    !rcu_gp_in_progress())
 		return;
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index 39334d2d2b37..b95ae86c40a7 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -56,8 +56,10 @@
 #ifndef CONFIG_TINY_RCU
 module_param(rcu_expedited, int, 0);
 module_param(rcu_normal, int, 0);
-static int rcu_normal_after_boot;
+static int rcu_normal_after_boot = IS_ENABLED(CONFIG_PREEMPT_RT);
+#ifndef CONFIG_PREEMPT_RT
 module_param(rcu_normal_after_boot, int, 0);
+#endif
 #endif /* #ifndef CONFIG_TINY_RCU */
 
 #ifdef CONFIG_DEBUG_LOCK_ALLOC