summary refs log tree commit diff
path: root/block
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2013-07-11 13:03:24 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2013-07-11 13:03:24 -0700
commit36805aaea5ae3cf1bb32f1643e0a800bb69f0d5b (patch)
tree5565132549a0733772b3a2ac6b5cda516ea8cdce /block
parent6d2fa9e141ea56a571ec842fd4f3a86bea44a203 (diff)
parentd50235b7bc3ee0a0427984d763ea7534149531b4 (diff)
downloadlinux-36805aaea5ae3cf1bb32f1643e0a800bb69f0d5b.tar.gz
Merge branch 'for-3.11/core' of git://git.kernel.dk/linux-block
Pull core block IO updates from Jens Axboe:
 "Here are the core IO block bits for 3.11. It contains:

   - A tweak to the reserved tag logic from Jan, for weirdo devices with
     just 3 free tags.  But for those it improves things substantially
     for random writes.

   - Periodic writeback fix from Jan.  Marked for stable as well.

   - Fix for a race condition in IO scheduler switching from Jianpeng.

   - The hierarchical blk-cgroup support from Tejun.  This is the grunt
     of the series.

   - blk-throttle fix from Vivek.

  Just a note that I'm in the middle of a relocation, whole family is
  flying out tomorrow.  Hence I will be awal the remainder of this week,
  but back at work again on Monday the 15th.  CC'ing Tejun, since any
  potential "surprises" will most likely be from the blk-cgroup work.
  But it's been brewing for a while and sitting in my tree and
  linux-next for a long time, so should be solid."

* 'for-3.11/core' of git://git.kernel.dk/linux-block: (36 commits)
  elevator: Fix a race in elevator switching
  block: Reserve only one queue tag for sync IO if only 3 tags are available
  writeback: Fix periodic writeback after fs mount
  blk-throttle: implement proper hierarchy support
  blk-throttle: implement throtl_grp->has_rules[]
  blk-throttle: Account for child group's start time in parent while bio climbs up
  blk-throttle: add throtl_qnode for dispatch fairness
  blk-throttle: make throtl_pending_timer_fn() ready for hierarchy
  blk-throttle: make tg_dispatch_one_bio() ready for hierarchy
  blk-throttle: make blk_throtl_bio() ready for hierarchy
  blk-throttle: make blk_throtl_drain() ready for hierarchy
  blk-throttle: dispatch from throtl_pending_timer_fn()
  blk-throttle: implement dispatch looping
  blk-throttle: separate out throtl_service_queue->pending_timer from throtl_data->dispatch_work
  blk-throttle: set REQ_THROTTLED from throtl_charge_bio() and gate stats update with it
  blk-throttle: implement sq_to_tg(), sq_to_td() and throtl_log()
  blk-throttle: add throtl_service_queue->parent_sq
  blk-throttle: generalize update_disptime optimization in blk_throtl_bio()
  blk-throttle: dispatch to throtl_data->service_queue.bio_lists[]
  blk-throttle: move bio_lists[] and friends to throtl_service_queue
  ...
Diffstat (limited to 'block')
-rw-r--r--block/blk-cgroup.c105
-rw-r--r--block/blk-cgroup.h38
-rw-r--r--block/blk-tag.c11
-rw-r--r--block/blk-throttle.c1064
-rw-r--r--block/cfq-iosched.c17
-rw-r--r--block/deadline-iosched.c16
-rw-r--r--block/elevator.c25
-rw-r--r--block/noop-iosched.c17
8 files changed, 875 insertions, 418 deletions
diff --git a/block/blk-cgroup.c b/block/blk-cgroup.c
index e8918ffaf96d..290792a13e3c 100644
--- a/block/blk-cgroup.c
+++ b/block/blk-cgroup.c
@@ -32,26 +32,6 @@ EXPORT_SYMBOL_GPL(blkcg_root);
 
 static struct blkcg_policy *blkcg_policy[BLKCG_MAX_POLS];
 
-static struct blkcg_gq *__blkg_lookup(struct blkcg *blkcg,
-				      struct request_queue *q, bool update_hint);
-
-/**
- * blkg_for_each_descendant_pre - pre-order walk of a blkg's descendants
- * @d_blkg: loop cursor pointing to the current descendant
- * @pos_cgrp: used for iteration
- * @p_blkg: target blkg to walk descendants of
- *
- * Walk @c_blkg through the descendants of @p_blkg.  Must be used with RCU
- * read locked.  If called under either blkcg or queue lock, the iteration
- * is guaranteed to include all and only online blkgs.  The caller may
- * update @pos_cgrp by calling cgroup_rightmost_descendant() to skip
- * subtree.
- */
-#define blkg_for_each_descendant_pre(d_blkg, pos_cgrp, p_blkg)		\
-	cgroup_for_each_descendant_pre((pos_cgrp), (p_blkg)->blkcg->css.cgroup) \
-		if (((d_blkg) = __blkg_lookup(cgroup_to_blkcg(pos_cgrp), \
-					      (p_blkg)->q, false)))
-
 static bool blkcg_policy_enabled(struct request_queue *q,
 				 const struct blkcg_policy *pol)
 {
@@ -71,18 +51,8 @@ static void blkg_free(struct blkcg_gq *blkg)
 	if (!blkg)
 		return;
 
-	for (i = 0; i < BLKCG_MAX_POLS; i++) {
-		struct blkcg_policy *pol = blkcg_policy[i];
-		struct blkg_policy_data *pd = blkg->pd[i];
-
-		if (!pd)
-			continue;
-
-		if (pol && pol->pd_exit_fn)
-			pol->pd_exit_fn(blkg);
-
-		kfree(pd);
-	}
+	for (i = 0; i < BLKCG_MAX_POLS; i++)
+		kfree(blkg->pd[i]);
 
 	blk_exit_rl(&blkg->rl);
 	kfree(blkg);
@@ -134,10 +104,6 @@ static struct blkcg_gq *blkg_alloc(struct blkcg *blkcg, struct request_queue *q,
 		blkg->pd[i] = pd;
 		pd->blkg = blkg;
 		pd->plid = i;
-
-		/* invoke per-policy init */
-		if (pol->pd_init_fn)
-			pol->pd_init_fn(blkg);
 	}
 
 	return blkg;
@@ -158,8 +124,8 @@ err_free:
  * @q's bypass state.  If @update_hint is %true, the caller should be
  * holding @q->queue_lock and lookup hint is updated on success.
  */
-static struct blkcg_gq *__blkg_lookup(struct blkcg *blkcg,
-				      struct request_queue *q, bool update_hint)
+struct blkcg_gq *__blkg_lookup(struct blkcg *blkcg, struct request_queue *q,
+			       bool update_hint)
 {
 	struct blkcg_gq *blkg;
 
@@ -234,16 +200,25 @@ static struct blkcg_gq *blkg_create(struct blkcg *blkcg,
 	}
 	blkg = new_blkg;
 
-	/* link parent and insert */
+	/* link parent */
 	if (blkcg_parent(blkcg)) {
 		blkg->parent = __blkg_lookup(blkcg_parent(blkcg), q, false);
 		if (WARN_ON_ONCE(!blkg->parent)) {
-			blkg = ERR_PTR(-EINVAL);
+			ret = -EINVAL;
 			goto err_put_css;
 		}
 		blkg_get(blkg->parent);
 	}
 
+	/* invoke per-policy init */
+	for (i = 0; i < BLKCG_MAX_POLS; i++) {
+		struct blkcg_policy *pol = blkcg_policy[i];
+
+		if (blkg->pd[i] && pol->pd_init_fn)
+			pol->pd_init_fn(blkg);
+	}
+
+	/* insert */
 	spin_lock(&blkcg->lock);
 	ret = radix_tree_insert(&blkcg->blkg_tree, q->id, blkg);
 	if (likely(!ret)) {
@@ -394,30 +369,38 @@ static void blkg_destroy_all(struct request_queue *q)
 	q->root_rl.blkg = NULL;
 }
 
-static void blkg_rcu_free(struct rcu_head *rcu_head)
+/*
+ * A group is RCU protected, but having an rcu lock does not mean that one
+ * can access all the fields of blkg and assume these are valid.  For
+ * example, don't try to follow throtl_data and request queue links.
+ *
+ * Having a reference to blkg under an rcu allows accesses to only values
+ * local to groups like group stats and group rate limits.
+ */
+void __blkg_release_rcu(struct rcu_head *rcu_head)
 {
-	blkg_free(container_of(rcu_head, struct blkcg_gq, rcu_head));
-}
+	struct blkcg_gq *blkg = container_of(rcu_head, struct blkcg_gq, rcu_head);
+	int i;
+
+	/* tell policies that this one is being freed */
+	for (i = 0; i < BLKCG_MAX_POLS; i++) {
+		struct blkcg_policy *pol = blkcg_policy[i];
+
+		if (blkg->pd[i] && pol->pd_exit_fn)
+			pol->pd_exit_fn(blkg);
+	}
 
-void __blkg_release(struct blkcg_gq *blkg)
-{
 	/* release the blkcg and parent blkg refs this blkg has been holding */
 	css_put(&blkg->blkcg->css);
-	if (blkg->parent)
+	if (blkg->parent) {
+		spin_lock_irq(blkg->q->queue_lock);
 		blkg_put(blkg->parent);
+		spin_unlock_irq(blkg->q->queue_lock);
+	}
 
-	/*
-	 * A group is freed in rcu manner. But having an rcu lock does not
-	 * mean that one can access all the fields of blkg and assume these
-	 * are valid. For example, don't try to follow throtl_data and
-	 * request queue links.
-	 *
-	 * Having a reference to blkg under an rcu allows acess to only
-	 * values local to groups like group stats and group rate limits
-	 */
-	call_rcu(&blkg->rcu_head, blkg_rcu_free);
+	blkg_free(blkg);
 }
-EXPORT_SYMBOL_GPL(__blkg_release);
+EXPORT_SYMBOL_GPL(__blkg_release_rcu);
 
 /*
  * The next function used by blk_queue_for_each_rl().  It's a bit tricky
@@ -928,14 +911,6 @@ struct cgroup_subsys blkio_subsys = {
 	.subsys_id = blkio_subsys_id,
 	.base_cftypes = blkcg_files,
 	.module = THIS_MODULE,
-
-	/*
-	 * blkio subsystem is utterly broken in terms of hierarchy support.
-	 * It treats all cgroups equally regardless of where they're
-	 * located in the hierarchy - all cgroups are treated as if they're
-	 * right below the root.  Fix it and remove the following.
-	 */
-	.broken_hierarchy = true,
 };
 EXPORT_SYMBOL_GPL(blkio_subsys);
 
diff --git a/block/blk-cgroup.h b/block/blk-cgroup.h
index 4e595ee8c915..8056c03a3382 100644
--- a/block/blk-cgroup.h
+++ b/block/blk-cgroup.h
@@ -266,7 +266,7 @@ static inline void blkg_get(struct blkcg_gq *blkg)
 	blkg->refcnt++;
 }
 
-void __blkg_release(struct blkcg_gq *blkg);
+void __blkg_release_rcu(struct rcu_head *rcu);
 
 /**
  * blkg_put - put a blkg reference
@@ -279,9 +279,43 @@ static inline void blkg_put(struct blkcg_gq *blkg)
 	lockdep_assert_held(blkg->q->queue_lock);
 	WARN_ON_ONCE(blkg->refcnt <= 0);
 	if (!--blkg->refcnt)
-		__blkg_release(blkg);
+		call_rcu(&blkg->rcu_head, __blkg_release_rcu);
 }
 
+struct blkcg_gq *__blkg_lookup(struct blkcg *blkcg, struct request_queue *q,
+			       bool update_hint);
+
+/**
+ * blkg_for_each_descendant_pre - pre-order walk of a blkg's descendants
+ * @d_blkg: loop cursor pointing to the current descendant
+ * @pos_cgrp: used for iteration
+ * @p_blkg: target blkg to walk descendants of
+ *
+ * Walk @c_blkg through the descendants of @p_blkg.  Must be used with RCU
+ * read locked.  If called under either blkcg or queue lock, the iteration
+ * is guaranteed to include all and only online blkgs.  The caller may
+ * update @pos_cgrp by calling cgroup_rightmost_descendant() to skip
+ * subtree.
+ */
+#define blkg_for_each_descendant_pre(d_blkg, pos_cgrp, p_blkg)		\
+	cgroup_for_each_descendant_pre((pos_cgrp), (p_blkg)->blkcg->css.cgroup) \
+		if (((d_blkg) = __blkg_lookup(cgroup_to_blkcg(pos_cgrp), \
+					      (p_blkg)->q, false)))
+
+/**
+ * blkg_for_each_descendant_post - post-order walk of a blkg's descendants
+ * @d_blkg: loop cursor pointing to the current descendant
+ * @pos_cgrp: used for iteration
+ * @p_blkg: target blkg to walk descendants of
+ *
+ * Similar to blkg_for_each_descendant_pre() but performs post-order
+ * traversal instead.  Synchronization rules are the same.
+ */
+#define blkg_for_each_descendant_post(d_blkg, pos_cgrp, p_blkg)		\
+	cgroup_for_each_descendant_post((pos_cgrp), (p_blkg)->blkcg->css.cgroup) \
+		if (((d_blkg) = __blkg_lookup(cgroup_to_blkcg(pos_cgrp), \
+					      (p_blkg)->q, false)))
+
 /**
  * blk_get_rl - get request_list to use
  * @q: request_queue of interest
diff --git a/block/blk-tag.c b/block/blk-tag.c
index cc345e1d8d4e..3f33d8672268 100644
--- a/block/blk-tag.c
+++ b/block/blk-tag.c
@@ -348,9 +348,16 @@ int blk_queue_start_tag(struct request_queue *q, struct request *rq)
 	 */
 	max_depth = bqt->max_depth;
 	if (!rq_is_sync(rq) && max_depth > 1) {
-		max_depth -= 2;
-		if (!max_depth)
+		switch (max_depth) {
+		case 2:
 			max_depth = 1;
+			break;
+		case 3:
+			max_depth = 2;
+			break;
+		default:
+			max_depth -= 2;
+		}
 		if (q->in_flight[BLK_RW_ASYNC] > max_depth)
 			return 1;
 	}
diff --git a/block/blk-throttle.c b/block/blk-throttle.c
index 31146225f3d0..08a32dfd3844 100644
--- a/block/blk-throttle.c
+++ b/block/blk-throttle.c
@@ -25,18 +25,61 @@ static struct blkcg_policy blkcg_policy_throtl;
 
 /* A workqueue to queue throttle related work */
 static struct workqueue_struct *kthrotld_workqueue;
-static void throtl_schedule_delayed_work(struct throtl_data *td,
-				unsigned long delay);
-
-struct throtl_rb_root {
-	struct rb_root rb;
-	struct rb_node *left;
-	unsigned int count;
-	unsigned long min_disptime;
+
+/*
+ * To implement hierarchical throttling, throtl_grps form a tree and bios
+ * are dispatched upwards level by level until they reach the top and get
+ * issued.  When dispatching bios from the children and local group at each
+ * level, if the bios are dispatched into a single bio_list, there's a risk
+ * of a local or child group which can queue many bios at once filling up
+ * the list starving others.
+ *
+ * To avoid such starvation, dispatched bios are queued separately
+ * according to where they came from.  When they are again dispatched to
+ * the parent, they're popped in round-robin order so that no single source
+ * hogs the dispatch window.
+ *
+ * throtl_qnode is used to keep the queued bios separated by their sources.
+ * Bios are queued to throtl_qnode which in turn is queued to
+ * throtl_service_queue and then dispatched in round-robin order.
+ *
+ * It's also used to track the reference counts on blkg's.  A qnode always
+ * belongs to a throtl_grp and gets queued on itself or the parent, so
+ * incrementing the reference of the associated throtl_grp when a qnode is
+ * queued and decrementing when dequeued is enough to keep the whole blkg
+ * tree pinned while bios are in flight.
+ */
+struct throtl_qnode {
+	struct list_head	node;		/* service_queue->queued[] */
+	struct bio_list		bios;		/* queued bios */
+	struct throtl_grp	*tg;		/* tg this qnode belongs to */
 };
 
-#define THROTL_RB_ROOT	(struct throtl_rb_root) { .rb = RB_ROOT, .left = NULL, \
-			.count = 0, .min_disptime = 0}
+struct throtl_service_queue {
+	struct throtl_service_queue *parent_sq;	/* the parent service_queue */
+
+	/*
+	 * Bios queued directly to this service_queue or dispatched from
+	 * children throtl_grp's.
+	 */
+	struct list_head	queued[2];	/* throtl_qnode [READ/WRITE] */
+	unsigned int		nr_queued[2];	/* number of queued bios */
+
+	/*
+	 * RB tree of active children throtl_grp's, which are sorted by
+	 * their ->disptime.
+	 */
+	struct rb_root		pending_tree;	/* RB tree of active tgs */
+	struct rb_node		*first_pending;	/* first node in the tree */
+	unsigned int		nr_pending;	/* # queued in the tree */
+	unsigned long		first_pending_disptime;	/* disptime of the first tg */
+	struct timer_list	pending_timer;	/* fires on first_pending_disptime */
+};
+
+enum tg_state_flags {
+	THROTL_TG_PENDING	= 1 << 0,	/* on parent's pending tree */
+	THROTL_TG_WAS_EMPTY	= 1 << 1,	/* bio_lists[] became non-empty */
+};
 
 #define rb_entry_tg(node)	rb_entry((node), struct throtl_grp, rb_node)
 
@@ -52,9 +95,26 @@ struct throtl_grp {
 	/* must be the first member */
 	struct blkg_policy_data pd;
 
-	/* active throtl group service_tree member */
+	/* active throtl group service_queue member */
 	struct rb_node rb_node;
 
+	/* throtl_data this group belongs to */
+	struct throtl_data *td;
+
+	/* this group's service queue */
+	struct throtl_service_queue service_queue;
+
+	/*
+	 * qnode_on_self is used when bios are directly queued to this
+	 * throtl_grp so that local bios compete fairly with bios
+	 * dispatched from children.  qnode_on_parent is used when bios are
+	 * dispatched from this throtl_grp into its parent and will compete
+	 * with the sibling qnode_on_parents and the parent's
+	 * qnode_on_self.
+	 */
+	struct throtl_qnode qnode_on_self[2];
+	struct throtl_qnode qnode_on_parent[2];
+
 	/*
 	 * Dispatch time in jiffies. This is the estimated time when group
 	 * will unthrottle and is ready to dispatch more bio. It is used as
@@ -64,11 +124,8 @@ struct throtl_grp {
 
 	unsigned int flags;
 
-	/* Two lists for READ and WRITE */
-	struct bio_list bio_lists[2];
-
-	/* Number of queued bios on READ and WRITE lists */
-	unsigned int nr_queued[2];
+	/* are there any throtl rules between this group and td? */
+	bool has_rules[2];
 
 	/* bytes per second rate limits */
 	uint64_t bps[2];
@@ -85,9 +142,6 @@ struct throtl_grp {
 	unsigned long slice_start[2];
 	unsigned long slice_end[2];
 
-	/* Some throttle limits got updated for the group */
-	int limits_changed;
-
 	/* Per cpu stats pointer */
 	struct tg_stats_cpu __percpu *stats_cpu;
 
@@ -98,7 +152,7 @@ struct throtl_grp {
 struct throtl_data
 {
 	/* service tree for active throtl groups */
-	struct throtl_rb_root tg_service_tree;
+	struct throtl_service_queue service_queue;
 
 	struct request_queue *queue;
 
@@ -111,9 +165,7 @@ struct throtl_data
 	unsigned int nr_undestroyed_grps;
 
 	/* Work for dispatching throttled bios */
-	struct delayed_work throtl_work;
-
-	int limits_changed;
+	struct work_struct dispatch_work;
 };
 
 /* list and work item to allocate percpu group stats */
@@ -123,6 +175,8 @@ static LIST_HEAD(tg_stats_alloc_list);
 static void tg_stats_alloc_fn(struct work_struct *);
 static DECLARE_DELAYED_WORK(tg_stats_alloc_work, tg_stats_alloc_fn);
 
+static void throtl_pending_timer_fn(unsigned long arg);
+
 static inline struct throtl_grp *pd_to_tg(struct blkg_policy_data *pd)
 {
 	return pd ? container_of(pd, struct throtl_grp, pd) : NULL;
@@ -143,41 +197,65 @@ static inline struct throtl_grp *td_root_tg(struct throtl_data *td)
 	return blkg_to_tg(td->queue->root_blkg);
 }
 
-enum tg_state_flags {
-	THROTL_TG_FLAG_on_rr = 0,	/* on round-robin busy list */
-};
-
-#define THROTL_TG_FNS(name)						\
-static inline void throtl_mark_tg_##name(struct throtl_grp *tg)		\
-{									\
-	(tg)->flags |= (1 << THROTL_TG_FLAG_##name);			\
-}									\
-static inline void throtl_clear_tg_##name(struct throtl_grp *tg)	\
-{									\
-	(tg)->flags &= ~(1 << THROTL_TG_FLAG_##name);			\
-}									\
-static inline int throtl_tg_##name(const struct throtl_grp *tg)		\
-{									\
-	return ((tg)->flags & (1 << THROTL_TG_FLAG_##name)) != 0;	\
+/**
+ * sq_to_tg - return the throl_grp the specified service queue belongs to
+ * @sq: the throtl_service_queue of interest
+ *
+ * Return the throtl_grp @sq belongs to.  If @sq is the top-level one
+ * embedded in throtl_data, %NULL is returned.
+ */
+static struct throtl_grp *sq_to_tg(struct throtl_service_queue *sq)
+{
+	if (sq && sq->parent_sq)
+		return container_of(sq, struct throtl_grp, service_queue);
+	else
+		return NULL;
 }
 
-THROTL_TG_FNS(on_rr);
+/**
+ * sq_to_td - return throtl_data the specified service queue belongs to
+ * @sq: the throtl_service_queue of interest
+ *
+ * A service_queue can be embeded in either a throtl_grp or throtl_data.
+ * Determine the associated throtl_data accordingly and return it.
+ */
+static struct throtl_data *sq_to_td(struct throtl_service_queue *sq)
+{
+	struct throtl_grp *tg = sq_to_tg(sq);
 
-#define throtl_log_tg(td, tg, fmt, args...)	do {			\
-	char __pbuf[128];						\
+	if (tg)
+		return tg->td;
+	else
+		return container_of(sq, struct throtl_data, service_queue);
+}
+
+/**
+ * throtl_log - log debug message via blktrace
+ * @sq: the service_queue being reported
+ * @fmt: printf format string
+ * @args: printf args
+ *
+ * The messages are prefixed with "throtl BLKG_NAME" if @sq belongs to a
+ * throtl_grp; otherwise, just "throtl".
+ *
+ * TODO: this should be made a function and name formatting should happen
+ * after testing whether blktrace is enabled.
+ */
+#define throtl_log(sq, fmt, args...)	do {				\
+	struct throtl_grp *__tg = sq_to_tg((sq));			\
+	struct throtl_data *__td = sq_to_td((sq));			\
+									\
+	(void)__td;							\
+	if ((__tg)) {							\
+		char __pbuf[128];					\
 									\
-	blkg_path(tg_to_blkg(tg), __pbuf, sizeof(__pbuf));		\
-	blk_add_trace_msg((td)->queue, "throtl %s " fmt, __pbuf, ##args); \
+		blkg_path(tg_to_blkg(__tg), __pbuf, sizeof(__pbuf));	\
+		blk_add_trace_msg(__td->queue, "throtl %s " fmt, __pbuf, ##args); \
+	} else {							\
+		blk_add_trace_msg(__td->queue, "throtl " fmt, ##args);	\
+	}								\
 } while (0)
 
-#define throtl_log(td, fmt, args...)	\
-	blk_add_trace_msg((td)->queue, "throtl " fmt, ##args)
-
-static inline unsigned int total_nr_queued(struct throtl_data *td)
-{
-	return td->nr_queued[0] + td->nr_queued[1];
-}
-
 /*
  * Worker for allocating per cpu stat for tgs. This is scheduled on the
  * system_wq once there are some groups on the alloc_list waiting for
@@ -215,15 +293,141 @@ alloc_stats:
 		goto alloc_stats;
 }
 
+static void throtl_qnode_init(struct throtl_qnode *qn, struct throtl_grp *tg)
+{
+	INIT_LIST_HEAD(&qn->node);
+	bio_list_init(&qn->bios);
+	qn->tg = tg;
+}
+
+/**
+ * throtl_qnode_add_bio - add a bio to a throtl_qnode and activate it
+ * @bio: bio being added
+ * @qn: qnode to add bio to
+ * @queued: the service_queue->queued[] list @qn belongs to
+ *
+ * Add @bio to @qn and put @qn on @queued if it's not already on.
+ * @qn->tg's reference count is bumped when @qn is activated.  See the
+ * comment on top of throtl_qnode definition for details.
+ */
+static void throtl_qnode_add_bio(struct bio *bio, struct throtl_qnode *qn,
+				 struct list_head *queued)
+{
+	bio_list_add(&qn->bios, bio);
+	if (list_empty(&qn->node)) {
+		list_add_tail(&qn->node, queued);
+		blkg_get(tg_to_blkg(qn->tg));
+	}
+}
+
+/**
+ * throtl_peek_queued - peek the first bio on a qnode list
+ * @queued: the qnode list to peek
+ */
+static struct bio *throtl_peek_queued(struct list_head *queued)
+{
+	struct throtl_qnode *qn = list_first_entry(queued, struct throtl_qnode, node);
+	struct bio *bio;
+
+	if (list_empty(queued))
+		return NULL;
+
+	bio = bio_list_peek(&qn->bios);
+	WARN_ON_ONCE(!bio);
+	return bio;
+}
+
+/**
+ * throtl_pop_queued - pop the first bio form a qnode list
+ * @queued: the qnode list to pop a bio from
+ * @tg_to_put: optional out argument for throtl_grp to put
+ *
+ * Pop the first bio from the qnode list @queued.  After popping, the first
+ * qnode is removed from @queued if empty or moved to the end of @queued so
+ * that the popping order is round-robin.
+ *
+ * When the first qnode is removed, its associated throtl_grp should be put
+ * too.  If @tg_to_put is NULL, this function automatically puts it;
+ * otherwise, *@tg_to_put is set to the throtl_grp to put and the caller is
+ * responsible for putting it.
+ */
+static struct bio *throtl_pop_queued(struct list_head *queued,
+				     struct throtl_grp **tg_to_put)
+{
+	struct throtl_qnode *qn = list_first_entry(queued, struct throtl_qnode, node);
+	struct bio *bio;
+
+	if (list_empty(queued))
+		return NULL;
+
+	bio = bio_list_pop(&qn->bios);
+	WARN_ON_ONCE(!bio);
+
+	if (bio_list_empty(&qn->bios)) {
+		list_del_init(&qn->node);
+		if (tg_to_put)
+			*tg_to_put = qn->tg;
+		else
+			blkg_put(tg_to_blkg(qn->tg));
+	} else {
+		list_move_tail(&qn->node, queued);
+	}
+
+	return bio;
+}
+
+/* init a service_queue, assumes the caller zeroed it */
+static void throtl_service_queue_init(struct throtl_service_queue *sq,
+				      struct throtl_service_queue *parent_sq)
+{
+	INIT_LIST_HEAD(&sq->queued[0]);
+	INIT_LIST_HEAD(&sq->queued[1]);
+	sq->pending_tree = RB_ROOT;
+	sq->parent_sq = parent_sq;
+	setup_timer(&sq->pending_timer, throtl_pending_timer_fn,
+		    (unsigned long)sq);
+}
+
+static void throtl_service_queue_exit(struct throtl_service_queue *sq)
+{
+	del_timer_sync(&sq->pending_timer);
+}
+
 static void throtl_pd_init(struct blkcg_gq *blkg)
 {
 	struct throtl_grp *tg = blkg_to_tg(blkg);
+	struct throtl_data *td = blkg->q->td;
+	struct throtl_service_queue *parent_sq;
 	unsigned long flags;
+	int rw;
+
+	/*
+	 * If sane_hierarchy is enabled, we switch to properly hierarchical
+	 * behavior where limits on a given throtl_grp are applied to the
+	 * whole subtree rather than just the group itself.  e.g. If 16M
+	 * read_bps limit is set on the root group, the whole system can't
+	 * exceed 16M for the device.
+	 *
+	 * If sane_hierarchy is not enabled, the broken flat hierarchy
+	 * behavior is retained where all throtl_grps are treated as if
+	 * they're all separate root groups right below throtl_data.
+	 * Limits of a group don't interact with limits of other groups
+	 * regardless of the position of the group in the hierarchy.
+	 */
+	parent_sq = &td->service_queue;
+
+	if (cgroup_sane_behavior(blkg->blkcg->css.cgroup) && blkg->parent)
+		parent_sq = &blkg_to_tg(blkg->parent)->service_queue;
+
+	throtl_service_queue_init(&tg->service_queue, parent_sq);
+
+	for (rw = READ; rw <= WRITE; rw++) {
+		throtl_qnode_init(&tg->qnode_on_self[rw], tg);
+		throtl_qnode_init(&tg->qnode_on_parent[rw], tg);
+	}
 
 	RB_CLEAR_NODE(&tg->rb_node);
-	bio_list_init(&tg->bio_lists[0]);
-	bio_list_init(&tg->bio_lists[1]);
-	tg->limits_changed = false;
+	tg->td = td;
 
 	tg->bps[READ] = -1;
 	tg->bps[WRITE] = -1;
@@ -241,6 +445,30 @@ static void throtl_pd_init(struct blkcg_gq *blkg)
 	spin_unlock_irqrestore(&tg_stats_alloc_lock, flags);
 }
 
+/*
+ * Set has_rules[] if @tg or any of its parents have limits configured.
+ * This doesn't require walking up to the top of the hierarchy as the
+ * parent's has_rules[] is guaranteed to be correct.
+ */
+static void tg_update_has_rules(struct throtl_grp *tg)
+{
+	struct throtl_grp *parent_tg = sq_to_tg(tg->service_queue.parent_sq);
+	int rw;
+
+	for (rw = READ; rw <= WRITE; rw++)
+		tg->has_rules[rw] = (parent_tg && parent_tg->has_rules[rw]) ||
+				    (tg->bps[rw] != -1 || tg->iops[rw] != -1);
+}
+
+static void throtl_pd_online(struct blkcg_gq *blkg)
+{
+	/*
+	 * We don't want new groups to escape the limits of its ancestors.
+	 * Update has_rules[] after a new group is brought online.
+	 */
+	tg_update_has_rules(blkg_to_tg(blkg));
+}
+
 static void throtl_pd_exit(struct blkcg_gq *blkg)
 {
 	struct throtl_grp *tg = blkg_to_tg(blkg);
@@ -251,6 +479,8 @@ static void throtl_pd_exit(struct blkcg_gq *blkg)
 	spin_unlock_irqrestore(&tg_stats_alloc_lock, flags);
 
 	free_percpu(tg->stats_cpu);
+
+	throtl_service_queue_exit(&tg->service_queue);
 }
 
 static void throtl_pd_reset_stats(struct blkcg_gq *blkg)
@@ -309,17 +539,18 @@ static struct throtl_grp *throtl_lookup_create_tg(struct throtl_data *td,
 	return tg;
 }
 
-static struct throtl_grp *throtl_rb_first(struct throtl_rb_root *root)
+static struct throtl_grp *
+throtl_rb_first(struct throtl_service_queue *parent_sq)
 {
 	/* Service tree is empty */
-	if (!root->count)
+	if (!parent_sq->nr_pending)
 		return NULL;
 
-	if (!root->left)
-		root->left = rb_first(&root->rb);
+	if (!parent_sq->first_pending)
+		parent_sq->first_pending = rb_first(&parent_sq->pending_tree);
 
-	if (root->left)
-		return rb_entry_tg(root->left);
+	if (parent_sq->first_pending)
+		return rb_entry_tg(parent_sq->first_pending);
 
 	return NULL;
 }
@@ -330,29 +561,30 @@ static void rb_erase_init(struct rb_node *n, struct rb_root *root)
 	RB_CLEAR_NODE(n);
 }
 
-static void throtl_rb_erase(struct rb_node *n, struct throtl_rb_root *root)
+static void throtl_rb_erase(struct rb_node *n,
+			    struct throtl_service_queue *parent_sq)
 {
-	if (root->left == n)
-		root->left = NULL;
-	rb_erase_init(n, &root->rb);
-	--root->count;
+	if (parent_sq->first_pending == n)
+		parent_sq->first_pending = NULL;
+	rb_erase_init(n, &parent_sq->pending_tree);
+	--parent_sq->nr_pending;
 }
 
-static void update_min_dispatch_time(struct throtl_rb_root *st)
+static void update_min_dispatch_time(struct throtl_service_queue *parent_sq)
 {
 	struct throtl_grp *tg;
 
-	tg = throtl_rb_first(st);
+	tg = throtl_rb_first(parent_sq);
 	if (!tg)
 		return;
 
-	st->min_disptime = tg->disptime;
+	parent_sq->first_pending_disptime = tg->disptime;
 }
 
-static void
-tg_service_tree_add(struct throtl_rb_root *st, struct throtl_grp *tg)
+static void tg_service_queue_add(struct throtl_grp *tg)
 {
-	struct rb_node **node = &st->rb.rb_node;
+	struct throtl_service_queue *parent_sq = tg->service_queue.parent_sq;
+	struct rb_node **node = &parent_sq->pending_tree.rb_node;
 	struct rb_node *parent = NULL;
 	struct throtl_grp *__tg;
 	unsigned long key = tg->disptime;
@@ -371,89 +603,135 @@ tg_service_tree_add(struct throtl_rb_root *st, struct throtl_grp *tg)
 	}
 
 	if (left)
-		st->left = &tg->rb_node;
+		parent_sq->first_pending = &tg->rb_node;
 
 	rb_link_node(&tg->rb_node, parent, node);
-	rb_insert_color(&tg->rb_node, &st->rb);
+	rb_insert_color(&tg->rb_node, &parent_sq->pending_tree);
 }
 
-static void __throtl_enqueue_tg(struct throtl_data *td, struct throtl_grp *tg)
+static void __throtl_enqueue_tg(struct throtl_grp *tg)
 {
-	struct throtl_rb_root *st = &td->tg_service_tree;
+	tg_service_queue_add(tg);
+	tg->flags |= THROTL_TG_PENDING;
+	tg->service_queue.parent_sq->nr_pending++;
+}
 
-	tg_service_tree_add(st, tg);
-	throtl_mark_tg_on_rr(tg);
-	st->count++;
+static void throtl_enqueue_tg(struct throtl_grp *tg)
+{
+	if (!(tg->flags & THROTL_TG_PENDING))
+		__throtl_enqueue_tg(tg);
 }
 
-static void throtl_enqueue_tg(struct throtl_data *td, struct throtl_grp *tg)
+static void __throtl_dequeue_tg(struct throtl_grp *tg)
 {
-	if (!throtl_tg_on_rr(tg))
-		__throtl_enqueue_tg(td, tg);
+	throtl_rb_erase(&tg->rb_node, tg->service_queue.parent_sq);
+	tg->flags &= ~THROTL_TG_PENDING;
 }
 
-static void __throtl_dequeue_tg(struct throtl_data *td, struct throtl_grp *tg)
+static void throtl_dequeue_tg(struct throtl_grp *tg)
 {
-	throtl_rb_erase(&tg->rb_node, &td->tg_service_tree);
-	throtl_clear_tg_on_rr(tg);
+	if (tg->flags & THROTL_TG_PENDING)
+		__throtl_dequeue_tg(tg);
 }
 
-static void throtl_dequeue_tg(struct throtl_data *td, struct throtl_grp *tg)
+/* Call with queue lock held */
+static void throtl_schedule_pending_timer(struct throtl_service_queue *sq,
+					  unsigned long expires)
 {
-	if (throtl_tg_on_rr(tg))
-		__throtl_dequeue_tg(td, tg);
+	mod_timer(&sq->pending_timer, expires);
+	throtl_log(sq, "schedule timer. delay=%lu jiffies=%lu",
+		   expires - jiffies, jiffies);
 }
 
-static void throtl_schedule_next_dispatch(struct throtl_data *td)
+/**
+ * throtl_schedule_next_dispatch - schedule the next dispatch cycle
+ * @sq: the service_queue to schedule dispatch for
+ * @force: force scheduling
+ *
+ * Arm @sq->pending_timer so that the next dispatch cycle starts on the
+ * dispatch time of the first pending child.  Returns %true if either timer
+ * is armed or there's no pending child left.  %false if the current
+ * dispatch window is still open and the caller should continue
+ * dispatching.
+ *
+ * If @force is %true, the dispatch timer is always scheduled and this
+ * function is guaranteed to return %true.  This is to be used when the
+ * caller can't dispatch itself and needs to invoke pending_timer
+ * unconditionally.  Note that forced scheduling is likely to induce short
+ * delay before dispatch starts even if @sq->first_pending_disptime is not
+ * in the future and thus shouldn't be used in hot paths.
+ */
+static bool throtl_schedule_next_dispatch(struct throtl_service_queue *sq,
+					  bool force)
 {
-	struct throtl_rb_root *st = &td->tg_service_tree;
+	/* any pending children left? */
+	if (!sq->nr_pending)
+		return true;
 
-	/*
-	 * If there are more bios pending, schedule more work.
-	 */
-	if (!total_nr_queued(td))
-		return;
+	update_min_dispatch_time(sq);
 
-	BUG_ON(!st->count);
+	/* is the next dispatch time in the future? */
+	if (force || time_after(sq->first_pending_disptime, jiffies)) {
+		throtl_schedule_pending_timer(sq, sq->first_pending_disptime);
+		return true;
+	}
 
-	update_min_dispatch_time(st);
+	/* tell the caller to continue dispatching */
+	return false;
+}
 
-	if (time_before_eq(st->min_disptime, jiffies))
-		throtl_schedule_delayed_work(td, 0);
-	else
-		throtl_schedule_delayed_work(td, (st->min_disptime - jiffies));
+static inline void throtl_start_new_slice_with_credit(struct throtl_grp *tg,
+		bool rw, unsigned long start)
+{
+	tg->bytes_disp[rw] = 0;
+	tg->io_disp[rw] = 0;
+
+	/*
+	 * Previous slice has expired. We must have trimmed it after last
+	 * bio dispatch. That means since start of last slice, we never used
+	 * that bandwidth. Do try to make use of that bandwidth while giving
+	 * credit.
+	 */
+	if (time_after_eq(start, tg->slice_start[rw]))
+		tg->slice_start[rw] = start;
+
+	tg->slice_end[rw] = jiffies + throtl_slice;
+	throtl_log(&tg->service_queue,
+		   "[%c] new slice with credit start=%lu end=%lu jiffies=%lu",
+		   rw == READ ? 'R' : 'W', tg->slice_start[rw],
+		   tg->slice_end[rw], jiffies);
 }
 
-static inline void
-throtl_start_new_slice(struct throtl_data *td, struct throtl_grp *tg, bool rw)
+static inline void throtl_start_new_slice(struct throtl_grp *tg, bool rw)
 {
 	tg->bytes_disp[rw] = 0;
 	tg->io_disp[rw] = 0;
 	tg->slice_start[rw] = jiffies;
 	tg->slice_end[rw] = jiffies + throtl_slice;
-	throtl_log_tg(td, tg, "[%c] new slice start=%lu end=%lu jiffies=%lu",
-			rw == READ ? 'R' : 'W', tg->slice_start[rw],
-			tg->slice_end[rw], jiffies);
+	throtl_log(&tg->service_queue,
+		   "[%c] new slice start=%lu end=%lu jiffies=%lu",
+		   rw == READ ? 'R' : 'W', tg->slice_start[rw],
+		   tg->slice_end[rw], jiffies);
 }
 
-static inline void throtl_set_slice_end(struct throtl_data *td,
-		struct throtl_grp *tg, bool rw, unsigned long jiffy_end)
+static inline void throtl_set_slice_end(struct throtl_grp *tg, bool rw,
+					unsigned long jiffy_end)
 {
 	tg->slice_end[rw] = roundup(jiffy_end, throtl_slice);
 }
 
-static inline void throtl_extend_slice(struct throtl_data *td,
-		struct throtl_grp *tg, bool rw, unsigned long jiffy_end)
+static inline void throtl_extend_slice(struct throtl_grp *tg, bool rw,
+				       unsigned long jiffy_end)
 {
 	tg->slice_end[rw] = roundup(jiffy_end, throtl_slice);
-	throtl_log_tg(td, tg, "[%c] extend slice start=%lu end=%lu jiffies=%lu",
-			rw == READ ? 'R' : 'W', tg->slice_start[rw],
-			tg->slice_end[rw], jiffies);
+	throtl_log(&tg->service_queue,
+		   "[%c] extend slice start=%lu end=%lu jiffies=%lu",
+		   rw == READ ? 'R' : 'W', tg->slice_start[rw],
+		   tg->slice_end[rw], jiffies);
 }
 
 /* Determine if previously allocated or extended slice is complete or not */
-static bool
-throtl_slice_used(struct throtl_data *td, struct throtl_grp *tg, bool rw)
+static bool throtl_slice_used(struct throtl_grp *tg, bool rw)
 {
 	if (time_in_range(jiffies, tg->slice_start[rw], tg->slice_end[rw]))
 		return 0;
@@ -462,8 +740,7 @@ throtl_slice_used(struct throtl_data *td, struct throtl_grp *tg, bool rw)
 }
 
 /* Trim the used slices and adjust slice start accordingly */
-static inline void
-throtl_trim_slice(struct throtl_data *td, struct throtl_grp *tg, bool rw)
+static inline void throtl_trim_slice(struct throtl_grp *tg, bool rw)
 {
 	unsigned long nr_slices, time_elapsed, io_trim;
 	u64 bytes_trim, tmp;
@@ -475,7 +752,7 @@ throtl_trim_slice(struct throtl_data *td, struct throtl_grp *tg, bool rw)
 	 * renewed. Don't try to trim the slice if slice is used. A new
 	 * slice will start when appropriate.
 	 */
-	if (throtl_slice_used(td, tg, rw))
+	if (throtl_slice_used(tg, rw))
 		return;
 
 	/*
@@ -486,7 +763,7 @@ throtl_trim_slice(struct throtl_data *td, struct throtl_grp *tg, bool rw)
 	 * is bad because it does not allow new slice to start.
 	 */
 
-	throtl_set_slice_end(td, tg, rw, jiffies + throtl_slice);
+	throtl_set_slice_end(tg, rw, jiffies + throtl_slice);
 
 	time_elapsed = jiffies - tg->slice_start[rw];
 
@@ -515,14 +792,14 @@ throtl_trim_slice(struct throtl_data *td, struct throtl_grp *tg, bool rw)
 
 	tg->slice_start[rw] += nr_slices * throtl_slice;
 
-	throtl_log_tg(td, tg, "[%c] trim slice nr=%lu bytes=%llu io=%lu"
-			" start=%lu end=%lu jiffies=%lu",
-			rw == READ ? 'R' : 'W', nr_slices, bytes_trim, io_trim,
-			tg->slice_start[rw], tg->slice_end[rw], jiffies);
+	throtl_log(&tg->service_queue,
+		   "[%c] trim slice nr=%lu bytes=%llu io=%lu start=%lu end=%lu jiffies=%lu",
+		   rw == READ ? 'R' : 'W', nr_slices, bytes_trim, io_trim,
+		   tg->slice_start[rw], tg->slice_end[rw], jiffies);
 }
 
-static bool tg_with_in_iops_limit(struct throtl_data *td, struct throtl_grp *tg,
-		struct bio *bio, unsigned long *wait)
+static bool tg_with_in_iops_limit(struct throtl_grp *tg, struct bio *bio,
+				  unsigned long *wait)
 {
 	bool rw = bio_data_dir(bio);
 	unsigned int io_allowed;
@@ -571,8 +848,8 @@ static bool tg_with_in_iops_limit(struct throtl_data *td, struct throtl_grp *tg,
 	return 0;
 }
 
-static bool tg_with_in_bps_limit(struct throtl_data *td, struct throtl_grp *tg,
-		struct bio *bio, unsigned long *wait)
+static bool tg_with_in_bps_limit(struct throtl_grp *tg, struct bio *bio,
+				 unsigned long *wait)
 {
 	bool rw = bio_data_dir(bio);
 	u64 bytes_allowed, extra_bytes, tmp;
@@ -613,18 +890,12 @@ static bool tg_with_in_bps_limit(struct throtl_data *td, struct throtl_grp *tg,
 	return 0;
 }
 
-static bool tg_no_rule_group(struct throtl_grp *tg, bool rw) {
-	if (tg->bps[rw] == -1 && tg->iops[rw] == -1)
-		return 1;
-	return 0;
-}
-
 /*
  * Returns whether one can dispatch a bio or not. Also returns approx number
  * of jiffies to wait before this bio is with-in IO rate and can be dispatched
  */
-static bool tg_may_dispatch(struct throtl_data *td, struct throtl_grp *tg,
-				struct bio *bio, unsigned long *wait)
+static bool tg_may_dispatch(struct throtl_grp *tg, struct bio *bio,
+			    unsigned long *wait)
 {
 	bool rw = bio_data_dir(bio);
 	unsigned long bps_wait = 0, iops_wait = 0, max_wait = 0;
@@ -635,7 +906,8 @@ static bool tg_may_dispatch(struct throtl_data *td, struct throtl_grp *tg,
 	 * this function with a different bio if there are other bios
 	 * queued.
 	 */
-	BUG_ON(tg->nr_queued[rw] && bio != bio_list_peek(&tg->bio_lists[rw]));
+	BUG_ON(tg->service_queue.nr_queued[rw] &&
+	       bio != throtl_peek_queued(&tg->service_queue.queued[rw]));
 
 	/* If tg->bps = -1, then BW is unlimited */
 	if (tg->bps[rw] == -1 && tg->iops[rw] == -1) {
@@ -649,15 +921,15 @@ static bool tg_may_dispatch(struct throtl_data *td, struct throtl_grp *tg,
 	 * existing slice to make sure it is at least throtl_slice interval
 	 * long since now.
 	 */
-	if (throtl_slice_used(td, tg, rw))
-		throtl_start_new_slice(td, tg, rw);
+	if (throtl_slice_used(tg, rw))
+		throtl_start_new_slice(tg, rw);
 	else {
 		if (time_before(tg->slice_end[rw], jiffies + throtl_slice))
-			throtl_extend_slice(td, tg, rw, jiffies + throtl_slice);
+			throtl_extend_slice(tg, rw, jiffies + throtl_slice);
 	}
 
-	if (tg_with_in_bps_limit(td, tg, bio, &bps_wait)
-	    && tg_with_in_iops_limit(td, tg, bio, &iops_wait)) {
+	if (tg_with_in_bps_limit(tg, bio, &bps_wait) &&
+	    tg_with_in_iops_limit(tg, bio, &iops_wait)) {
 		if (wait)
 			*wait = 0;
 		return 1;
@@ -669,7 +941,7 @@ static bool tg_may_dispatch(struct throtl_data *td, struct throtl_grp *tg,
 		*wait = max_wait;
 
 	if (time_before(tg->slice_end[rw], jiffies + max_wait))
-		throtl_extend_slice(td, tg, rw, jiffies + max_wait);
+		throtl_extend_slice(tg, rw, jiffies + max_wait);
 
 	return 0;
 }
@@ -708,65 +980,136 @@ static void throtl_charge_bio(struct throtl_grp *tg, struct bio *bio)
 	tg->bytes_disp[rw] += bio->bi_size;
 	tg->io_disp[rw]++;
 
-	throtl_update_dispatch_stats(tg_to_blkg(tg), bio->bi_size, bio->bi_rw);
+	/*
+	 * REQ_THROTTLED is used to prevent the same bio to be throttled
+	 * more than once as a throttled bio will go through blk-throtl the
+	 * second time when it eventually gets issued.  Set it when a bio
+	 * is being charged to a tg.
+	 *
+	 * Dispatch stats aren't recursive and each @bio should only be
+	 * accounted by the @tg it was originally associated with.  Let's
+	 * update the stats when setting REQ_THROTTLED for the first time
+	 * which is guaranteed to be for the @bio's original tg.
+	 */
+	if (!(bio->bi_rw & REQ_THROTTLED)) {
+		bio->bi_rw |= REQ_THROTTLED;
+		throtl_update_dispatch_stats(tg_to_blkg(tg), bio->bi_size,
+					     bio->bi_rw);
+	}
 }
 
-static void throtl_add_bio_tg(struct throtl_data *td, struct throtl_grp *tg,
-			struct bio *bio)
+/**
+ * throtl_add_bio_tg - add a bio to the specified throtl_grp
+ * @bio: bio to add
+ * @qn: qnode to use
+ * @tg: the target throtl_grp
+ *
+ * Add @bio to @tg's service_queue using @qn.  If @qn is not specified,
+ * tg->qnode_on_self[] is used.
+ */
+static void throtl_add_bio_tg(struct bio *bio, struct throtl_qnode *qn,
+			      struct throtl_grp *tg)
 {
+	struct throtl_service_queue *sq = &tg->service_queue;
 	bool rw = bio_data_dir(bio);
 
-	bio_list_add(&tg->bio_lists[rw], bio);
-	/* Take a bio reference on tg */
-	blkg_get(tg_to_blkg(tg));
-	tg->nr_queued[rw]++;
-	td->nr_queued[rw]++;
-	throtl_enqueue_tg(td, tg);
+	if (!qn)
+		qn = &tg->qnode_on_self[rw];
+
+	/*
+	 * If @tg doesn't currently have any bios queued in the same
+	 * direction, queueing @bio can change when @tg should be
+	 * dispatched.  Mark that @tg was empty.  This is automatically
+	 * cleaered on the next tg_update_disptime().
+	 */
+	if (!sq->nr_queued[rw])
+		tg->flags |= THROTL_TG_WAS_EMPTY;
+
+	throtl_qnode_add_bio(bio, qn, &sq->queued[rw]);
+
+	sq->nr_queued[rw]++;
+	throtl_enqueue_tg(tg);
 }
 
-static void tg_update_disptime(struct throtl_data *td, struct throtl_grp *tg)
+static void tg_update_disptime(struct throtl_grp *tg)
 {
+	struct throtl_service_queue *sq = &tg->service_queue;
 	unsigned long read_wait = -1, write_wait = -1, min_wait = -1, disptime;
 	struct bio *bio;
 
-	if ((bio = bio_list_peek(&tg->bio_lists[READ])))
-		tg_may_dispatch(td, tg, bio, &read_wait);
+	if ((bio = throtl_peek_queued(&sq->queued[READ])))
+		tg_may_dispatch(tg, bio, &read_wait);
 
-	if ((bio = bio_list_peek(&tg->bio_lists[WRITE])))
-		tg_may_dispatch(td, tg, bio, &write_wait);
+	if ((bio = throtl_peek_queued(&sq->queued[WRITE])))
+		tg_may_dispatch(tg, bio, &write_wait);
 
 	min_wait = min(read_wait, write_wait);
 	disptime = jiffies + min_wait;
 
 	/* Update dispatch time */
-	throtl_dequeue_tg(td, tg);
+	throtl_dequeue_tg(tg);
 	tg->disptime = disptime;
-	throtl_enqueue_tg(td, tg);
+	throtl_enqueue_tg(tg);
+
+	/* see throtl_add_bio_tg() */
+	tg->flags &= ~THROTL_TG_WAS_EMPTY;
 }
 
-static void tg_dispatch_one_bio(struct throtl_data *td, struct throtl_grp *tg,
-				bool rw, struct bio_list *bl)
+static void start_parent_slice_with_credit(struct throtl_grp *child_tg,
+					struct throtl_grp *parent_tg, bool rw)
 {
-	struct bio *bio;
+	if (throtl_slice_used(parent_tg, rw)) {
+		throtl_start_new_slice_with_credit(parent_tg, rw,
+				child_tg->slice_start[rw]);
+	}
+
+}
 
-	bio = bio_list_pop(&tg->bio_lists[rw]);
-	tg->nr_queued[rw]--;
-	/* Drop bio reference on blkg */
-	blkg_put(tg_to_blkg(tg));
+static void tg_dispatch_one_bio(struct throtl_grp *tg, bool rw)
+{
+	struct throtl_service_queue *sq = &tg->service_queue;
+	struct throtl_service_queue *parent_sq = sq->parent_sq;
+	struct throtl_grp *parent_tg = sq_to_tg(parent_sq);
+	struct throtl_grp *tg_to_put = NULL;
+	struct bio *bio;
 
-	BUG_ON(td->nr_queued[rw] <= 0);
-	td->nr_queued[rw]--;
+	/*
+	 * @bio is being transferred from @tg to @parent_sq.  Popping a bio
+	 * from @tg may put its reference and @parent_sq might end up
+	 * getting released prematurely.  Remember the tg to put and put it
+	 * after @bio is transferred to @parent_sq.
+	 */
+	bio = throtl_pop_queued(&sq->queued[rw], &tg_to_put);
+	sq->nr_queued[rw]--;
 
 	throtl_charge_bio(tg, bio);
-	bio_list_add(bl, bio);
-	bio->bi_rw |= REQ_THROTTLED;
 
-	throtl_trim_slice(td, tg, rw);
+	/*
+	 * If our parent is another tg, we just need to transfer @bio to
+	 * the parent using throtl_add_bio_tg().  If our parent is
+	 * @td->service_queue, @bio is ready to be issued.  Put it on its
+	 * bio_lists[] and decrease total number queued.  The caller is
+	 * responsible for issuing these bios.
+	 */
+	if (parent_tg) {
+		throtl_add_bio_tg(bio, &tg->qnode_on_parent[rw], parent_tg);
+		start_parent_slice_with_credit(tg, parent_tg, rw);
+	} else {
+		throtl_qnode_add_bio(bio, &tg->qnode_on_parent[rw],
+				     &parent_sq->queued[rw]);
+		BUG_ON(tg->td->nr_queued[rw] <= 0);
+		tg->td->nr_queued[rw]--;
+	}
+
+	throtl_trim_slice(tg, rw);
+
+	if (tg_to_put)
+		blkg_put(tg_to_blkg(tg_to_put));
 }
 
-static int throtl_dispatch_tg(struct throtl_data *td, struct throtl_grp *tg,
-				struct bio_list *bl)
+static int throtl_dispatch_tg(struct throtl_grp *tg)
 {
+	struct throtl_service_queue *sq = &tg->service_queue;
 	unsigned int nr_reads = 0, nr_writes = 0;
 	unsigned int max_nr_reads = throtl_grp_quantum*3/4;
 	unsigned int max_nr_writes = throtl_grp_quantum - max_nr_reads;
@@ -774,20 +1117,20 @@ static int throtl_dispatch_tg(struct throtl_data *td, struct throtl_grp *tg,
 
 	/* Try to dispatch 75% READS and 25% WRITES */
 
-	while ((bio = bio_list_peek(&tg->bio_lists[READ]))
-		&& tg_may_dispatch(td, tg, bio, NULL)) {
+	while ((bio = throtl_peek_queued(&sq->queued[READ])) &&
+	       tg_may_dispatch(tg, bio, NULL)) {
 
-		tg_dispatch_one_bio(td, tg, bio_data_dir(bio), bl);
+		tg_dispatch_one_bio(tg, bio_data_dir(bio));
 		nr_reads++;
 
 		if (nr_reads >= max_nr_reads)
 			break;
 	}
 
-	while ((bio = bio_list_peek(&tg->bio_lists[WRITE]))
-		&& tg_may_dispatch(td, tg, bio, NULL)) {
+	while ((bio = throtl_peek_queued(&sq->queued[WRITE])) &&
+	       tg_may_dispatch(tg, bio, NULL)) {
 
-		tg_dispatch_one_bio(td, tg, bio_data_dir(bio), bl);
+		tg_dispatch_one_bio(tg, bio_data_dir(bio));
 		nr_writes++;
 
 		if (nr_writes >= max_nr_writes)
@@ -797,14 +1140,13 @@ static int throtl_dispatch_tg(struct throtl_data *td, struct throtl_grp *tg,
 	return nr_reads + nr_writes;
 }
 
-static int throtl_select_dispatch(struct throtl_data *td, struct bio_list *bl)
+static int throtl_select_dispatch(struct throtl_service_queue *parent_sq)
 {
 	unsigned int nr_disp = 0;
-	struct throtl_grp *tg;
-	struct throtl_rb_root *st = &td->tg_service_tree;
 
 	while (1) {
-		tg = throtl_rb_first(st);
+		struct throtl_grp *tg = throtl_rb_first(parent_sq);
+		struct throtl_service_queue *sq = &tg->service_queue;
 
 		if (!tg)
 			break;
@@ -812,14 +1154,12 @@ static int throtl_select_dispatch(struct throtl_data *td, struct bio_list *bl)
 		if (time_before(jiffies, tg->disptime))
 			break;
 
-		throtl_dequeue_tg(td, tg);
+		throtl_dequeue_tg(tg);
 
-		nr_disp += throtl_dispatch_tg(td, tg, bl);
+		nr_disp += throtl_dispatch_tg(tg);
 
-		if (tg->nr_queued[0] || tg->nr_queued[1]) {
-			tg_update_disptime(td, tg);
-			throtl_enqueue_tg(td, tg);
-		}
+		if (sq->nr_queued[0] || sq->nr_queued[1])
+			tg_update_disptime(tg);
 
 		if (nr_disp >= throtl_quantum)
 			break;
@@ -828,111 +1168,111 @@ static int throtl_select_dispatch(struct throtl_data *td, struct bio_list *bl)
 	return nr_disp;
 }
 
-static void throtl_process_limit_change(struct throtl_data *td)
+/**
+ * throtl_pending_timer_fn - timer function for service_queue->pending_timer
+ * @arg: the throtl_service_queue being serviced
+ *
+ * This timer is armed when a child throtl_grp with active bio's become
+ * pending and queued on the service_queue's pending_tree and expires when
+ * the first child throtl_grp should be dispatched.  This function
+ * dispatches bio's from the children throtl_grps to the parent
+ * service_queue.
+ *
+ * If the parent's parent is another throtl_grp, dispatching is propagated
+ * by either arming its pending_timer or repeating dispatch directly.  If
+ * the top-level service_tree is reached, throtl_data->dispatch_work is
+ * kicked so that the ready bio's are issued.
+ */
+static void throtl_pending_timer_fn(unsigned long arg)
 {
+	struct throtl_service_queue *sq = (void *)arg;
+	struct throtl_grp *tg = sq_to_tg(sq);
+	struct throtl_data *td = sq_to_td(sq);
 	struct request_queue *q = td->queue;
-	struct blkcg_gq *blkg, *n;
-
-	if (!td->limits_changed)
-		return;
-
-	xchg(&td->limits_changed, false);
-
-	throtl_log(td, "limits changed");
-
-	list_for_each_entry_safe(blkg, n, &q->blkg_list, q_node) {
-		struct throtl_grp *tg = blkg_to_tg(blkg);
+	struct throtl_service_queue *parent_sq;
+	bool dispatched;
+	int ret;
 
-		if (!tg->limits_changed)
-			continue;
+	spin_lock_irq(q->queue_lock);
+again:
+	parent_sq = sq->parent_sq;
+	dispatched = false;
+
+	while (true) {
+		throtl_log(sq, "dispatch nr_queued=%u read=%u write=%u",
+			   sq->nr_queued[READ] + sq->nr_queued[WRITE],
+			   sq->nr_queued[READ], sq->nr_queued[WRITE]);
+
+		ret = throtl_select_dispatch(sq);
+		if (ret) {
+			throtl_log(sq, "bios disp=%u", ret);
+			dispatched = true;
+		}
 
-		if (!xchg(&tg->limits_changed, false))
-			continue;
+		if (throtl_schedule_next_dispatch(sq, false))
+			break;
 
-		throtl_log_tg(td, tg, "limit change rbps=%llu wbps=%llu"
-			" riops=%u wiops=%u", tg->bps[READ], tg->bps[WRITE],
-			tg->iops[READ], tg->iops[WRITE]);
+		/* this dispatch windows is still open, relax and repeat */
+		spin_unlock_irq(q->queue_lock);
+		cpu_relax();
+		spin_lock_irq(q->queue_lock);
+	}
 
-		/*
-		 * Restart the slices for both READ and WRITES. It
-		 * might happen that a group's limit are dropped
-		 * suddenly and we don't want to account recently
-		 * dispatched IO with new low rate
-		 */
-		throtl_start_new_slice(td, tg, 0);
-		throtl_start_new_slice(td, tg, 1);
+	if (!dispatched)
+		goto out_unlock;
 
-		if (throtl_tg_on_rr(tg))
-			tg_update_disptime(td, tg);
+	if (parent_sq) {
+		/* @parent_sq is another throl_grp, propagate dispatch */
+		if (tg->flags & THROTL_TG_WAS_EMPTY) {
+			tg_update_disptime(tg);
+			if (!throtl_schedule_next_dispatch(parent_sq, false)) {
+				/* window is already open, repeat dispatching */
+				sq = parent_sq;
+				tg = sq_to_tg(sq);
+				goto again;
+			}
+		}
+	} else {
+		/* reached the top-level, queue issueing */
+		queue_work(kthrotld_workqueue, &td->dispatch_work);
 	}
+out_unlock:
+	spin_unlock_irq(q->queue_lock);
 }
 
-/* Dispatch throttled bios. Should be called without queue lock held. */
-static int throtl_dispatch(struct request_queue *q)
+/**
+ * blk_throtl_dispatch_work_fn - work function for throtl_data->dispatch_work
+ * @work: work item being executed
+ *
+ * This function is queued for execution when bio's reach the bio_lists[]
+ * of throtl_data->service_queue.  Those bio's are ready and issued by this
+ * function.
+ */
+void blk_throtl_dispatch_work_fn(struct work_struct *work)
 {
-	struct throtl_data *td = q->td;
-	unsigned int nr_disp = 0;
+	struct throtl_data *td = container_of(work, struct throtl_data,
+					      dispatch_work);
+	struct throtl_service_queue *td_sq = &td->service_queue;
+	struct request_queue *q = td->queue;
 	struct bio_list bio_list_on_stack;
 	struct bio *bio;
 	struct blk_plug plug;
-
-	spin_lock_irq(q->queue_lock);
-
-	throtl_process_limit_change(td);
-
-	if (!total_nr_queued(td))
-		goto out;
+	int rw;
 
 	bio_list_init(&bio_list_on_stack);
 
-	throtl_log(td, "dispatch nr_queued=%u read=%u write=%u",
-			total_nr_queued(td), td->nr_queued[READ],
-			td->nr_queued[WRITE]);
-
-	nr_disp = throtl_select_dispatch(td, &bio_list_on_stack);
-
-	if (nr_disp)
-		throtl_log(td, "bios disp=%u", nr_disp);
-
-	throtl_schedule_next_dispatch(td);
-out:
+	spin_lock_irq(q->queue_lock);
+	for (rw = READ; rw <= WRITE; rw++)
+		while ((bio = throtl_pop_queued(&td_sq->queued[rw], NULL)))
+			bio_list_add(&bio_list_on_stack, bio);
 	spin_unlock_irq(q->queue_lock);
 
-	/*
-	 * If we dispatched some requests, unplug the queue to make sure
-	 * immediate dispatch
-	 */
-	if (nr_disp) {
+	if (!bio_list_empty(&bio_list_on_stack)) {
 		blk_start_plug(&plug);
 		while((bio = bio_list_pop(&bio_list_on_stack)))
 			generic_make_request(bio);
 		blk_finish_plug(&plug);
 	}
-	return nr_disp;
-}
-
-void blk_throtl_work(struct work_struct *work)
-{
-	struct throtl_data *td = container_of(work, struct throtl_data,
-					throtl_work.work);
-	struct request_queue *q = td->queue;
-
-	throtl_dispatch(q);
-}
-
-/* Call with queue lock held */
-static void
-throtl_schedule_delayed_work(struct throtl_data *td, unsigned long delay)
-{
-
-	struct delayed_work *dwork = &td->throtl_work;
-
-	/* schedule work if limits changed even if no bio is queued */
-	if (total_nr_queued(td) || td->limits_changed) {
-		mod_delayed_work(kthrotld_workqueue, dwork, delay);
-		throtl_log(td, "schedule work. delay=%lu jiffies=%lu",
-				delay, jiffies);
-	}
 }
 
 static u64 tg_prfill_cpu_rwstat(struct seq_file *sf,
@@ -1007,7 +1347,9 @@ static int tg_set_conf(struct cgroup *cgrp, struct cftype *cft, const char *buf,
 	struct blkcg *blkcg = cgroup_to_blkcg(cgrp);
 	struct blkg_conf_ctx ctx;
 	struct throtl_grp *tg;
-	struct throtl_data *td;
+	struct throtl_service_queue *sq;
+	struct blkcg_gq *blkg;
+	struct cgroup *pos_cgrp;
 	int ret;
 
 	ret = blkg_conf_prep(blkcg, &blkcg_policy_throtl, buf, &ctx);
@@ -1015,7 +1357,7 @@ static int tg_set_conf(struct cgroup *cgrp, struct cftype *cft, const char *buf,
 		return ret;
 
 	tg = blkg_to_tg(ctx.blkg);
-	td = ctx.blkg->q->td;
+	sq = &tg->service_queue;
 
 	if (!ctx.v)
 		ctx.v = -1;
@@ -1025,10 +1367,37 @@ static int tg_set_conf(struct cgroup *cgrp, struct cftype *cft, const char *buf,
 	else
 		*(unsigned int *)((void *)tg + cft->private) = ctx.v;
 
-	/* XXX: we don't need the following deferred processing */
-	xchg(&tg->limits_changed, true);
-	xchg(&td->limits_changed, true);
-	throtl_schedule_delayed_work(td, 0);
+	throtl_log(&tg->service_queue,
+		   "limit change rbps=%llu wbps=%llu riops=%u wiops=%u",
+		   tg->bps[READ], tg->bps[WRITE],
+		   tg->iops[READ], tg->iops[WRITE]);
+
+	/*
+	 * Update has_rules[] flags for the updated tg's subtree.  A tg is
+	 * considered to have rules if either the tg itself or any of its
+	 * ancestors has rules.  This identifies groups without any
+	 * restrictions in the whole hierarchy and allows them to bypass
+	 * blk-throttle.
+	 */
+	tg_update_has_rules(tg);
+	blkg_for_each_descendant_pre(blkg, pos_cgrp, ctx.blkg)
+		tg_update_has_rules(blkg_to_tg(blkg));
+
+	/*
+	 * We're already holding queue_lock and know @tg is valid.  Let's
+	 * apply the new config directly.
+	 *
+	 * Restart the slices for both READ and WRITES. It might happen
+	 * that a group's limit are dropped suddenly and we don't want to
+	 * account recently dispatched IO with new low rate.
+	 */
+	throtl_start_new_slice(tg, 0);
+	throtl_start_new_slice(tg, 1);
+
+	if (tg->flags & THROTL_TG_PENDING) {
+		tg_update_disptime(tg);
+		throtl_schedule_next_dispatch(sq->parent_sq, true);
+	}
 
 	blkg_conf_finish(&ctx);
 	return 0;
@@ -1092,7 +1461,7 @@ static void throtl_shutdown_wq(struct request_queue *q)
 {
 	struct throtl_data *td = q->td;
 
-	cancel_delayed_work_sync(&td->throtl_work);
+	cancel_work_sync(&td->dispatch_work);
 }
 
 static struct blkcg_policy blkcg_policy_throtl = {
@@ -1100,6 +1469,7 @@ static struct blkcg_policy blkcg_policy_throtl = {
 	.cftypes		= throtl_files,
 
 	.pd_init_fn		= throtl_pd_init,
+	.pd_online_fn		= throtl_pd_online,
 	.pd_exit_fn		= throtl_pd_exit,
 	.pd_reset_stats_fn	= throtl_pd_reset_stats,
 };
@@ -1107,15 +1477,16 @@ static struct blkcg_policy blkcg_policy_throtl = {
 bool blk_throtl_bio(struct request_queue *q, struct bio *bio)
 {
 	struct throtl_data *td = q->td;
+	struct throtl_qnode *qn = NULL;
 	struct throtl_grp *tg;
-	bool rw = bio_data_dir(bio), update_disptime = true;
+	struct throtl_service_queue *sq;
+	bool rw = bio_data_dir(bio);
 	struct blkcg *blkcg;
 	bool throttled = false;
 
-	if (bio->bi_rw & REQ_THROTTLED) {
-		bio->bi_rw &= ~REQ_THROTTLED;
+	/* see throtl_charge_bio() */
+	if (bio->bi_rw & REQ_THROTTLED)
 		goto out;
-	}
 
 	/*
 	 * A throtl_grp pointer retrieved under rcu can be used to access
@@ -1126,7 +1497,7 @@ bool blk_throtl_bio(struct request_queue *q, struct bio *bio)
 	blkcg = bio_blkcg(bio);
 	tg = throtl_lookup_tg(td, blkcg);
 	if (tg) {
-		if (tg_no_rule_group(tg, rw)) {
+		if (!tg->has_rules[rw]) {
 			throtl_update_dispatch_stats(tg_to_blkg(tg),
 						     bio->bi_size, bio->bi_rw);
 			goto out_unlock_rcu;
@@ -1142,18 +1513,18 @@ bool blk_throtl_bio(struct request_queue *q, struct bio *bio)
 	if (unlikely(!tg))
 		goto out_unlock;
 
-	if (tg->nr_queued[rw]) {
-		/*
-		 * There is already another bio queued in same dir. No
-		 * need to update dispatch time.
-		 */
-		update_disptime = false;
-		goto queue_bio;
+	sq = &tg->service_queue;
 
-	}
+	while (true) {
+		/* throtl is FIFO - if bios are already queued, should queue */
+		if (sq->nr_queued[rw])
+			break;
+
+		/* if above limits, break to queue */
+		if (!tg_may_dispatch(tg, bio, NULL))
+			break;
 
-	/* Bio is with-in rate limit of group */
-	if (tg_may_dispatch(td, tg, bio, NULL)) {
+		/* within limits, let's charge and dispatch directly */
 		throtl_charge_bio(tg, bio);
 
 		/*
@@ -1167,25 +1538,41 @@ bool blk_throtl_bio(struct request_queue *q, struct bio *bio)
 		 *
 		 * So keep on trimming slice even if bio is not queued.
 		 */
-		throtl_trim_slice(td, tg, rw);
-		goto out_unlock;
+		throtl_trim_slice(tg, rw);
+
+		/*
+		 * @bio passed through this layer without being throttled.
+		 * Climb up the ladder.  If we''re already at the top, it
+		 * can be executed directly.
+		 */
+		qn = &tg->qnode_on_parent[rw];
+		sq = sq->parent_sq;
+		tg = sq_to_tg(sq);
+		if (!tg)
+			goto out_unlock;
 	}
 
-queue_bio:
-	throtl_log_tg(td, tg, "[%c] bio. bdisp=%llu sz=%u bps=%llu"
-			" iodisp=%u iops=%u queued=%d/%d",
-			rw == READ ? 'R' : 'W',
-			tg->bytes_disp[rw], bio->bi_size, tg->bps[rw],
-			tg->io_disp[rw], tg->iops[rw],
-			tg->nr_queued[READ], tg->nr_queued[WRITE]);
+	/* out-of-limit, queue to @tg */
+	throtl_log(sq, "[%c] bio. bdisp=%llu sz=%u bps=%llu iodisp=%u iops=%u queued=%d/%d",
+		   rw == READ ? 'R' : 'W',
+		   tg->bytes_disp[rw], bio->bi_size, tg->bps[rw],
+		   tg->io_disp[rw], tg->iops[rw],
+		   sq->nr_queued[READ], sq->nr_queued[WRITE]);
 
 	bio_associate_current(bio);
-	throtl_add_bio_tg(q->td, tg, bio);
+	tg->td->nr_queued[rw]++;
+	throtl_add_bio_tg(bio, qn, tg);
 	throttled = true;
 
-	if (update_disptime) {
-		tg_update_disptime(td, tg);
-		throtl_schedule_next_dispatch(td);
+	/*
+	 * Update @tg's dispatch time and force schedule dispatch if @tg
+	 * was empty before @bio.  The forced scheduling isn't likely to
+	 * cause undue delay as @bio is likely to be dispatched directly if
+	 * its @tg's disptime is not in the future.
+	 */
+	if (tg->flags & THROTL_TG_WAS_EMPTY) {
+		tg_update_disptime(tg);
+		throtl_schedule_next_dispatch(tg->service_queue.parent_sq, true);
 	}
 
 out_unlock:
@@ -1193,9 +1580,38 @@ out_unlock:
 out_unlock_rcu:
 	rcu_read_unlock();
 out:
+	/*
+	 * As multiple blk-throtls may stack in the same issue path, we
+	 * don't want bios to leave with the flag set.  Clear the flag if
+	 * being issued.
+	 */
+	if (!throttled)
+		bio->bi_rw &= ~REQ_THROTTLED;
 	return throttled;
 }
 
+/*
+ * Dispatch all bios from all children tg's queued on @parent_sq.  On
+ * return, @parent_sq is guaranteed to not have any active children tg's
+ * and all bios from previously active tg's are on @parent_sq->bio_lists[].
+ */
+static void tg_drain_bios(struct throtl_service_queue *parent_sq)
+{
+	struct throtl_grp *tg;
+
+	while ((tg = throtl_rb_first(parent_sq))) {
+		struct throtl_service_queue *sq = &tg->service_queue;
+		struct bio *bio;
+
+		throtl_dequeue_tg(tg);
+
+		while ((bio = throtl_peek_queued(&sq->queued[READ])))
+			tg_dispatch_one_bio(tg, bio_data_dir(bio));
+		while ((bio = throtl_peek_queued(&sq->queued[WRITE])))
+			tg_dispatch_one_bio(tg, bio_data_dir(bio));
+	}
+}
+
 /**
  * blk_throtl_drain - drain throttled bios
  * @q: request_queue to drain throttled bios for
@@ -1206,27 +1622,36 @@ void blk_throtl_drain(struct request_queue *q)
 	__releases(q->queue_lock) __acquires(q->queue_lock)
 {
 	struct throtl_data *td = q->td;
-	struct throtl_rb_root *st = &td->tg_service_tree;
-	struct throtl_grp *tg;
-	struct bio_list bl;
+	struct blkcg_gq *blkg;
+	struct cgroup *pos_cgrp;
 	struct bio *bio;
+	int rw;
 
 	queue_lockdep_assert_held(q);
+	rcu_read_lock();
+
+	/*
+	 * Drain each tg while doing post-order walk on the blkg tree, so
+	 * that all bios are propagated to td->service_queue.  It'd be
+	 * better to walk service_queue tree directly but blkg walk is
+	 * easier.
+	 */
+	blkg_for_each_descendant_post(blkg, pos_cgrp, td->queue->root_blkg)
+		tg_drain_bios(&blkg_to_tg(blkg)->service_queue);
 
-	bio_list_init(&bl);
+	tg_drain_bios(&td_root_tg(td)->service_queue);
 
-	while ((tg = throtl_rb_first(st))) {
-		throtl_dequeue_tg(td, tg);
+	/* finally, transfer bios from top-level tg's into the td */
+	tg_drain_bios(&td->service_queue);
 
-		while ((bio = bio_list_peek(&tg->bio_lists[READ])))
-			tg_dispatch_one_bio(td, tg, bio_data_dir(bio), &bl);
-		while ((bio = bio_list_peek(&tg->bio_lists[WRITE])))
-			tg_dispatch_one_bio(td, tg, bio_data_dir(bio), &bl);
-	}
+	rcu_read_unlock();
 	spin_unlock_irq(q->queue_lock);
 
-	while ((bio = bio_list_pop(&bl)))
-		generic_make_request(bio);
+	/* all bios now should be in td->service_queue, issue them */
+	for (rw = READ; rw <= WRITE; rw++)
+		while ((bio = throtl_pop_queued(&td->service_queue.queued[rw],
+						NULL)))
+			generic_make_request(bio);
 
 	spin_lock_irq(q->queue_lock);
 }
@@ -1240,9 +1665,8 @@ int blk_throtl_init(struct request_queue *q)
 	if (!td)
 		return -ENOMEM;
 
-	td->tg_service_tree = THROTL_RB_ROOT;
-	td->limits_changed = false;
-	INIT_DELAYED_WORK(&td->throtl_work, blk_throtl_work);
+	INIT_WORK(&td->dispatch_work, blk_throtl_dispatch_work_fn);
+	throtl_service_queue_init(&td->service_queue, NULL);
 
 	q->td = td;
 	td->queue = q;
diff --git a/block/cfq-iosched.c b/block/cfq-iosched.c
index d5cd3131c57a..d5bbdcfd0dab 100644
--- a/block/cfq-iosched.c
+++ b/block/cfq-iosched.c
@@ -4347,18 +4347,28 @@ static void cfq_exit_queue(struct elevator_queue *e)
 	kfree(cfqd);
 }
 
-static int cfq_init_queue(struct request_queue *q)
+static int cfq_init_queue(struct request_queue *q, struct elevator_type *e)
 {
 	struct cfq_data *cfqd;
 	struct blkcg_gq *blkg __maybe_unused;
 	int i, ret;
+	struct elevator_queue *eq;
+
+	eq = elevator_alloc(q, e);
+	if (!eq)
+		return -ENOMEM;
 
 	cfqd = kmalloc_node(sizeof(*cfqd), GFP_KERNEL | __GFP_ZERO, q->node);
-	if (!cfqd)
+	if (!cfqd) {
+		kobject_put(&eq->kobj);
 		return -ENOMEM;
+	}
+	eq->elevator_data = cfqd;
 
 	cfqd->queue = q;
-	q->elevator->elevator_data = cfqd;
+	spin_lock_irq(q->queue_lock);
+	q->elevator = eq;
+	spin_unlock_irq(q->queue_lock);
 
 	/* Init root service tree */
 	cfqd->grp_service_tree = CFQ_RB_ROOT;
@@ -4433,6 +4443,7 @@ static int cfq_init_queue(struct request_queue *q)
 
 out_free:
 	kfree(cfqd);
+	kobject_put(&eq->kobj);
 	return ret;
 }
 
diff --git a/block/deadline-iosched.c b/block/deadline-iosched.c
index ba19a3afab79..20614a332362 100644
--- a/block/deadline-iosched.c
+++ b/block/deadline-iosched.c
@@ -337,13 +337,21 @@ static void deadline_exit_queue(struct elevator_queue *e)
 /*
  * initialize elevator private data (deadline_data).
  */
-static int deadline_init_queue(struct request_queue *q)
+static int deadline_init_queue(struct request_queue *q, struct elevator_type *e)
 {
 	struct deadline_data *dd;
+	struct elevator_queue *eq;
+
+	eq = elevator_alloc(q, e);
+	if (!eq)
+		return -ENOMEM;
 
 	dd = kmalloc_node(sizeof(*dd), GFP_KERNEL | __GFP_ZERO, q->node);
-	if (!dd)
+	if (!dd) {
+		kobject_put(&eq->kobj);
 		return -ENOMEM;
+	}
+	eq->elevator_data = dd;
 
 	INIT_LIST_HEAD(&dd->fifo_list[READ]);
 	INIT_LIST_HEAD(&dd->fifo_list[WRITE]);
@@ -355,7 +363,9 @@ static int deadline_init_queue(struct request_queue *q)
 	dd->front_merges = 1;
 	dd->fifo_batch = fifo_batch;
 
-	q->elevator->elevator_data = dd;
+	spin_lock_irq(q->queue_lock);
+	q->elevator = eq;
+	spin_unlock_irq(q->queue_lock);
 	return 0;
 }
 
diff --git a/block/elevator.c b/block/elevator.c
index eba5b04c29b1..668394d18588 100644
--- a/block/elevator.c
+++ b/block/elevator.c
@@ -150,7 +150,7 @@ void __init load_default_elevator_module(void)
 
 static struct kobj_type elv_ktype;
 
-static struct elevator_queue *elevator_alloc(struct request_queue *q,
+struct elevator_queue *elevator_alloc(struct request_queue *q,
 				  struct elevator_type *e)
 {
 	struct elevator_queue *eq;
@@ -170,6 +170,7 @@ err:
 	elevator_put(e);
 	return NULL;
 }
+EXPORT_SYMBOL(elevator_alloc);
 
 static void elevator_release(struct kobject *kobj)
 {
@@ -221,16 +222,7 @@ int elevator_init(struct request_queue *q, char *name)
 		}
 	}
 
-	q->elevator = elevator_alloc(q, e);
-	if (!q->elevator)
-		return -ENOMEM;
-
-	err = e->ops.elevator_init_fn(q);
-	if (err) {
-		kobject_put(&q->elevator->kobj);
-		return err;
-	}
-
+	err = e->ops.elevator_init_fn(q, e);
 	return 0;
 }
 EXPORT_SYMBOL(elevator_init);
@@ -935,16 +927,9 @@ static int elevator_switch(struct request_queue *q, struct elevator_type *new_e)
 	spin_unlock_irq(q->queue_lock);
 
 	/* allocate, init and register new elevator */
-	err = -ENOMEM;
-	q->elevator = elevator_alloc(q, new_e);
-	if (!q->elevator)
-		goto fail_init;
-
-	err = new_e->ops.elevator_init_fn(q);
-	if (err) {
-		kobject_put(&q->elevator->kobj);
+	err = new_e->ops.elevator_init_fn(q, new_e);
+	if (err)
 		goto fail_init;
-	}
 
 	if (registered) {
 		err = elv_register_queue(q);
diff --git a/block/noop-iosched.c b/block/noop-iosched.c
index 5d1bf70e33d5..3de89d4690f3 100644
--- a/block/noop-iosched.c
+++ b/block/noop-iosched.c
@@ -59,16 +59,27 @@ noop_latter_request(struct request_queue *q, struct request *rq)
 	return list_entry(rq->queuelist.next, struct request, queuelist);
 }
 
-static int noop_init_queue(struct request_queue *q)
+static int noop_init_queue(struct request_queue *q, struct elevator_type *e)
 {
 	struct noop_data *nd;
+	struct elevator_queue *eq;
+
+	eq = elevator_alloc(q, e);
+	if (!eq)
+		return -ENOMEM;
 
 	nd = kmalloc_node(sizeof(*nd), GFP_KERNEL, q->node);
-	if (!nd)
+	if (!nd) {
+		kobject_put(&eq->kobj);
 		return -ENOMEM;
+	}
+	eq->elevator_data = nd;
 
 	INIT_LIST_HEAD(&nd->queue);
-	q->elevator->elevator_data = nd;
+
+	spin_lock_irq(q->queue_lock);
+	q->elevator = eq;
+	spin_unlock_irq(q->queue_lock);
 	return 0;
 }