summary refs log tree commit diff
path: root/drivers/md
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-01-22 20:17:48 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2014-01-22 20:17:48 -0800
commitfe41c2c018b8af9b370a40845f547e22894ff68a (patch)
tree3573a10912e24ffcd48177785043e0de17b8e9d0 /drivers/md
parent194e57fd1835564735fd0ba5e3870230861cacd2 (diff)
parent5066a4df1f427faac8372d20494483bb09a4a1cd (diff)
downloadlinux-fe41c2c018b8af9b370a40845f547e22894ff68a.tar.gz
Merge tag 'dm-3.14-changes' of git://git.kernel.org/pub/scm/linux/kernel/git/device-mapper/linux-dm
Pull device-mapper changes from Mike Snitzer:
 "A lot of attention was paid to improving the thin-provisioning
  target's handling of metadata operation failures and running out of
  space.  A new 'error_if_no_space' feature was added to allow users to
  error IOs rather than queue them when either the data or metadata
  space is exhausted.

  Additional fixes/features include:
   - a few fixes to properly support thin metadata device resizing
   - a solution for reliably waiting for a DM device's embedded kobject
     to be released before destroying the device
   - old dm-snapshot is updated to use the dm-bufio interface to take
     advantage of readahead capabilities that improve snapshot
     activation
   - new dm-cache target tunables to control how quickly data is
     promoted to the cache (fast) device
   - improved write efficiency of cluster mirror target by combining
     userspace flush and mark requests"

* tag 'dm-3.14-changes' of git://git.kernel.org/pub/scm/linux/kernel/git/device-mapper/linux-dm: (35 commits)
  dm log userspace: allow mark requests to piggyback on flush requests
  dm space map metadata: fix bug in resizing of thin metadata
  dm cache: add policy name to status output
  dm thin: fix pool feature parsing
  dm sysfs: fix a module unload race
  dm snapshot: use dm-bufio prefetch
  dm snapshot: use dm-bufio
  dm snapshot: prepare for switch to using dm-bufio
  dm snapshot: use GFP_KERNEL when initializing exceptions
  dm cache: add block sizes and total cache blocks to status output
  dm btree: add dm_btree_find_lowest_key
  dm space map metadata: fix extending the space map
  dm space map common: make sure new space is used during extend
  dm: wait until embedded kobject is released before destroying a device
  dm: remove pointless kobject comparison in dm_get_from_kobject
  dm snapshot: call destroy_work_on_stack() to pair with INIT_WORK_ONSTACK()
  dm cache policy mq: introduce three promotion threshold tunables
  dm cache policy mq: use list_del_init instead of list_del + INIT_LIST_HEAD
  dm thin: fix set_pool_mode exposed pool operation races
  dm thin: eliminate the no_free_space flag
  ...
Diffstat (limited to 'drivers/md')
-rw-r--r--drivers/md/Kconfig11
-rw-r--r--drivers/md/Makefile1
-rw-r--r--drivers/md/dm-bufio.c36
-rw-r--r--drivers/md/dm-bufio.h12
-rw-r--r--drivers/md/dm-builtin.c48
-rw-r--r--drivers/md/dm-cache-policy-mq.c70
-rw-r--r--drivers/md/dm-cache-policy.c4
-rw-r--r--drivers/md/dm-cache-policy.h6
-rw-r--r--drivers/md/dm-cache-target.c20
-rw-r--r--drivers/md/dm-delay.c35
-rw-r--r--drivers/md/dm-log-userspace-base.c206
-rw-r--r--drivers/md/dm-snap-persistent.c87
-rw-r--r--drivers/md/dm-snap.c10
-rw-r--r--drivers/md/dm-sysfs.c5
-rw-r--r--drivers/md/dm-table.c22
-rw-r--r--drivers/md/dm-thin-metadata.c20
-rw-r--r--drivers/md/dm-thin-metadata.h4
-rw-r--r--drivers/md/dm-thin.c284
-rw-r--r--drivers/md/dm.c15
-rw-r--r--drivers/md/dm.h17
-rw-r--r--drivers/md/persistent-data/dm-block-manager.c2
-rw-r--r--drivers/md/persistent-data/dm-btree.c33
-rw-r--r--drivers/md/persistent-data/dm-btree.h8
-rw-r--r--drivers/md/persistent-data/dm-space-map-common.c6
-rw-r--r--drivers/md/persistent-data/dm-space-map-metadata.c32
25 files changed, 700 insertions, 294 deletions
diff --git a/drivers/md/Kconfig b/drivers/md/Kconfig
index f2ccbc3b9fe4..9a06fe883766 100644
--- a/drivers/md/Kconfig
+++ b/drivers/md/Kconfig
@@ -176,8 +176,12 @@ config MD_FAULTY
 
 source "drivers/md/bcache/Kconfig"
 
+config BLK_DEV_DM_BUILTIN
+	boolean
+
 config BLK_DEV_DM
 	tristate "Device mapper support"
+	select BLK_DEV_DM_BUILTIN
 	---help---
 	  Device-mapper is a low level volume manager.  It works by allowing
 	  people to specify mappings for ranges of logical sectors.  Various
@@ -238,6 +242,7 @@ config DM_CRYPT
 config DM_SNAPSHOT
        tristate "Snapshot target"
        depends on BLK_DEV_DM
+       select DM_BUFIO
        ---help---
          Allow volume managers to take writable snapshots of a device.
 
@@ -250,12 +255,12 @@ config DM_THIN_PROVISIONING
          Provides thin provisioning and snapshots that share a data store.
 
 config DM_DEBUG_BLOCK_STACK_TRACING
-	boolean "Keep stack trace of thin provisioning block lock holders"
-	depends on STACKTRACE_SUPPORT && DM_THIN_PROVISIONING
+	boolean "Keep stack trace of persistent data block lock holders"
+	depends on STACKTRACE_SUPPORT && DM_PERSISTENT_DATA
 	select STACKTRACE
 	---help---
 	  Enable this for messages that may help debug problems with the
-	  block manager locking used by thin provisioning.
+	  block manager locking used by thin provisioning and caching.
 
 	  If unsure, say N.
 
diff --git a/drivers/md/Makefile b/drivers/md/Makefile
index 2acc43fe0229..f26d83292579 100644
--- a/drivers/md/Makefile
+++ b/drivers/md/Makefile
@@ -32,6 +32,7 @@ obj-$(CONFIG_MD_FAULTY)		+= faulty.o
 obj-$(CONFIG_BCACHE)		+= bcache/
 obj-$(CONFIG_BLK_DEV_MD)	+= md-mod.o
 obj-$(CONFIG_BLK_DEV_DM)	+= dm-mod.o
+obj-$(CONFIG_BLK_DEV_DM_BUILTIN) += dm-builtin.o
 obj-$(CONFIG_DM_BUFIO)		+= dm-bufio.o
 obj-$(CONFIG_DM_BIO_PRISON)	+= dm-bio-prison.o
 obj-$(CONFIG_DM_CRYPT)		+= dm-crypt.o
diff --git a/drivers/md/dm-bufio.c b/drivers/md/dm-bufio.c
index 54bdd923316f..9ed42125514b 100644
--- a/drivers/md/dm-bufio.c
+++ b/drivers/md/dm-bufio.c
@@ -104,6 +104,8 @@ struct dm_bufio_client {
 	struct list_head reserved_buffers;
 	unsigned need_reserved_buffers;
 
+	unsigned minimum_buffers;
+
 	struct hlist_head *cache_hash;
 	wait_queue_head_t free_buffer_wait;
 
@@ -861,8 +863,8 @@ static void __get_memory_limit(struct dm_bufio_client *c,
 	buffers = dm_bufio_cache_size_per_client >>
 		  (c->sectors_per_block_bits + SECTOR_SHIFT);
 
-	if (buffers < DM_BUFIO_MIN_BUFFERS)
-		buffers = DM_BUFIO_MIN_BUFFERS;
+	if (buffers < c->minimum_buffers)
+		buffers = c->minimum_buffers;
 
 	*limit_buffers = buffers;
 	*threshold_buffers = buffers * DM_BUFIO_WRITEBACK_PERCENT / 100;
@@ -1350,6 +1352,34 @@ retry:
 }
 EXPORT_SYMBOL_GPL(dm_bufio_release_move);
 
+/*
+ * Free the given buffer.
+ *
+ * This is just a hint, if the buffer is in use or dirty, this function
+ * does nothing.
+ */
+void dm_bufio_forget(struct dm_bufio_client *c, sector_t block)
+{
+	struct dm_buffer *b;
+
+	dm_bufio_lock(c);
+
+	b = __find(c, block);
+	if (b && likely(!b->hold_count) && likely(!b->state)) {
+		__unlink_buffer(b);
+		__free_buffer_wake(b);
+	}
+
+	dm_bufio_unlock(c);
+}
+EXPORT_SYMBOL(dm_bufio_forget);
+
+void dm_bufio_set_minimum_buffers(struct dm_bufio_client *c, unsigned n)
+{
+	c->minimum_buffers = n;
+}
+EXPORT_SYMBOL(dm_bufio_set_minimum_buffers);
+
 unsigned dm_bufio_get_block_size(struct dm_bufio_client *c)
 {
 	return c->block_size;
@@ -1546,6 +1576,8 @@ struct dm_bufio_client *dm_bufio_client_create(struct block_device *bdev, unsign
 	INIT_LIST_HEAD(&c->reserved_buffers);
 	c->need_reserved_buffers = reserved_buffers;
 
+	c->minimum_buffers = DM_BUFIO_MIN_BUFFERS;
+
 	init_waitqueue_head(&c->free_buffer_wait);
 	c->async_write_error = 0;
 
diff --git a/drivers/md/dm-bufio.h b/drivers/md/dm-bufio.h
index b142946a9e32..c096779a7292 100644
--- a/drivers/md/dm-bufio.h
+++ b/drivers/md/dm-bufio.h
@@ -108,6 +108,18 @@ int dm_bufio_issue_flush(struct dm_bufio_client *c);
  */
 void dm_bufio_release_move(struct dm_buffer *b, sector_t new_block);
 
+/*
+ * Free the given buffer.
+ * This is just a hint, if the buffer is in use or dirty, this function
+ * does nothing.
+ */
+void dm_bufio_forget(struct dm_bufio_client *c, sector_t block);
+
+/*
+ * Set the minimum number of buffers before cleanup happens.
+ */
+void dm_bufio_set_minimum_buffers(struct dm_bufio_client *c, unsigned n);
+
 unsigned dm_bufio_get_block_size(struct dm_bufio_client *c);
 sector_t dm_bufio_get_device_size(struct dm_bufio_client *c);
 sector_t dm_bufio_get_block_number(struct dm_buffer *b);
diff --git a/drivers/md/dm-builtin.c b/drivers/md/dm-builtin.c
new file mode 100644
index 000000000000..6c9049c51b2b
--- /dev/null
+++ b/drivers/md/dm-builtin.c
@@ -0,0 +1,48 @@
+#include "dm.h"
+
+/*
+ * The kobject release method must not be placed in the module itself,
+ * otherwise we are subject to module unload races.
+ *
+ * The release method is called when the last reference to the kobject is
+ * dropped. It may be called by any other kernel code that drops the last
+ * reference.
+ *
+ * The release method suffers from module unload race. We may prevent the
+ * module from being unloaded at the start of the release method (using
+ * increased module reference count or synchronizing against the release
+ * method), however there is no way to prevent the module from being
+ * unloaded at the end of the release method.
+ *
+ * If this code were placed in the dm module, the following race may
+ * happen:
+ *  1. Some other process takes a reference to dm kobject
+ *  2. The user issues ioctl function to unload the dm device
+ *  3. dm_sysfs_exit calls kobject_put, however the object is not released
+ *     because of the other reference taken at step 1
+ *  4. dm_sysfs_exit waits on the completion
+ *  5. The other process that took the reference in step 1 drops it,
+ *     dm_kobject_release is called from this process
+ *  6. dm_kobject_release calls complete()
+ *  7. a reschedule happens before dm_kobject_release returns
+ *  8. dm_sysfs_exit continues, the dm device is unloaded, module reference
+ *     count is decremented
+ *  9. The user unloads the dm module
+ * 10. The other process that was rescheduled in step 7 continues to run,
+ *     it is now executing code in unloaded module, so it crashes
+ *
+ * Note that if the process that takes the foreign reference to dm kobject
+ * has a low priority and the system is sufficiently loaded with
+ * higher-priority processes that prevent the low-priority process from
+ * being scheduled long enough, this bug may really happen.
+ *
+ * In order to fix this module unload race, we place the release method
+ * into a helper code that is compiled directly into the kernel.
+ */
+
+void dm_kobject_release(struct kobject *kobj)
+{
+	complete(dm_get_completion_from_kobject(kobj));
+}
+
+EXPORT_SYMBOL(dm_kobject_release);
diff --git a/drivers/md/dm-cache-policy-mq.c b/drivers/md/dm-cache-policy-mq.c
index 64780ad73bb0..930e8c3d73e9 100644
--- a/drivers/md/dm-cache-policy-mq.c
+++ b/drivers/md/dm-cache-policy-mq.c
@@ -287,9 +287,8 @@ static struct entry *alloc_entry(struct entry_pool *ep)
 static struct entry *alloc_particular_entry(struct entry_pool *ep, dm_cblock_t cblock)
 {
 	struct entry *e = ep->entries + from_cblock(cblock);
-	list_del(&e->list);
 
-	INIT_LIST_HEAD(&e->list);
+	list_del_init(&e->list);
 	INIT_HLIST_NODE(&e->hlist);
 	ep->nr_allocated++;
 
@@ -391,6 +390,10 @@ struct mq_policy {
 	 */
 	unsigned promote_threshold;
 
+	unsigned discard_promote_adjustment;
+	unsigned read_promote_adjustment;
+	unsigned write_promote_adjustment;
+
 	/*
 	 * The hash table allows us to quickly find an entry by origin
 	 * block.  Both pre_cache and cache entries are in here.
@@ -400,6 +403,10 @@ struct mq_policy {
 	struct hlist_head *table;
 };
 
+#define DEFAULT_DISCARD_PROMOTE_ADJUSTMENT 1
+#define DEFAULT_READ_PROMOTE_ADJUSTMENT 4
+#define DEFAULT_WRITE_PROMOTE_ADJUSTMENT 8
+
 /*----------------------------------------------------------------*/
 
 /*
@@ -642,25 +649,21 @@ static int demote_cblock(struct mq_policy *mq, dm_oblock_t *oblock)
  * We bias towards reads, since they can be demoted at no cost if they
  * haven't been dirtied.
  */
-#define DISCARDED_PROMOTE_THRESHOLD 1
-#define READ_PROMOTE_THRESHOLD 4
-#define WRITE_PROMOTE_THRESHOLD 8
-
 static unsigned adjusted_promote_threshold(struct mq_policy *mq,
 					   bool discarded_oblock, int data_dir)
 {
 	if (data_dir == READ)
-		return mq->promote_threshold + READ_PROMOTE_THRESHOLD;
+		return mq->promote_threshold + mq->read_promote_adjustment;
 
 	if (discarded_oblock && (any_free_cblocks(mq) || any_clean_cblocks(mq))) {
 		/*
 		 * We don't need to do any copying at all, so give this a
 		 * very low threshold.
 		 */
-		return DISCARDED_PROMOTE_THRESHOLD;
+		return mq->discard_promote_adjustment;
 	}
 
-	return mq->promote_threshold + WRITE_PROMOTE_THRESHOLD;
+	return mq->promote_threshold + mq->write_promote_adjustment;
 }
 
 static bool should_promote(struct mq_policy *mq, struct entry *e,
@@ -809,7 +812,7 @@ static int no_entry_found(struct mq_policy *mq, dm_oblock_t oblock,
 			  bool can_migrate, bool discarded_oblock,
 			  int data_dir, struct policy_result *result)
 {
-	if (adjusted_promote_threshold(mq, discarded_oblock, data_dir) == 1) {
+	if (adjusted_promote_threshold(mq, discarded_oblock, data_dir) <= 1) {
 		if (can_migrate)
 			insert_in_cache(mq, oblock, result);
 		else
@@ -1135,20 +1138,28 @@ static int mq_set_config_value(struct dm_cache_policy *p,
 			       const char *key, const char *value)
 {
 	struct mq_policy *mq = to_mq_policy(p);
-	enum io_pattern pattern;
 	unsigned long tmp;
 
-	if (!strcasecmp(key, "random_threshold"))
-		pattern = PATTERN_RANDOM;
-	else if (!strcasecmp(key, "sequential_threshold"))
-		pattern = PATTERN_SEQUENTIAL;
-	else
-		return -EINVAL;
-
 	if (kstrtoul(value, 10, &tmp))
 		return -EINVAL;
 
-	mq->tracker.thresholds[pattern] = tmp;
+	if (!strcasecmp(key, "random_threshold")) {
+		mq->tracker.thresholds[PATTERN_RANDOM] = tmp;
+
+	} else if (!strcasecmp(key, "sequential_threshold")) {
+		mq->tracker.thresholds[PATTERN_SEQUENTIAL] = tmp;
+
+	} else if (!strcasecmp(key, "discard_promote_adjustment"))
+		mq->discard_promote_adjustment = tmp;
+
+	else if (!strcasecmp(key, "read_promote_adjustment"))
+		mq->read_promote_adjustment = tmp;
+
+	else if (!strcasecmp(key, "write_promote_adjustment"))
+		mq->write_promote_adjustment = tmp;
+
+	else
+		return -EINVAL;
 
 	return 0;
 }
@@ -1158,9 +1169,16 @@ static int mq_emit_config_values(struct dm_cache_policy *p, char *result, unsign
 	ssize_t sz = 0;
 	struct mq_policy *mq = to_mq_policy(p);
 
-	DMEMIT("4 random_threshold %u sequential_threshold %u",
+	DMEMIT("10 random_threshold %u "
+	       "sequential_threshold %u "
+	       "discard_promote_adjustment %u "
+	       "read_promote_adjustment %u "
+	       "write_promote_adjustment %u",
 	       mq->tracker.thresholds[PATTERN_RANDOM],
-	       mq->tracker.thresholds[PATTERN_SEQUENTIAL]);
+	       mq->tracker.thresholds[PATTERN_SEQUENTIAL],
+	       mq->discard_promote_adjustment,
+	       mq->read_promote_adjustment,
+	       mq->write_promote_adjustment);
 
 	return 0;
 }
@@ -1213,6 +1231,9 @@ static struct dm_cache_policy *mq_create(dm_cblock_t cache_size,
 	mq->hit_count = 0;
 	mq->generation = 0;
 	mq->promote_threshold = 0;
+	mq->discard_promote_adjustment = DEFAULT_DISCARD_PROMOTE_ADJUSTMENT;
+	mq->read_promote_adjustment = DEFAULT_READ_PROMOTE_ADJUSTMENT;
+	mq->write_promote_adjustment = DEFAULT_WRITE_PROMOTE_ADJUSTMENT;
 	mutex_init(&mq->lock);
 	spin_lock_init(&mq->tick_lock);
 
@@ -1244,7 +1265,7 @@ bad_pre_cache_init:
 
 static struct dm_cache_policy_type mq_policy_type = {
 	.name = "mq",
-	.version = {1, 1, 0},
+	.version = {1, 2, 0},
 	.hint_size = 4,
 	.owner = THIS_MODULE,
 	.create = mq_create
@@ -1252,10 +1273,11 @@ static struct dm_cache_policy_type mq_policy_type = {
 
 static struct dm_cache_policy_type default_policy_type = {
 	.name = "default",
-	.version = {1, 1, 0},
+	.version = {1, 2, 0},
 	.hint_size = 4,
 	.owner = THIS_MODULE,
-	.create = mq_create
+	.create = mq_create,
+	.real = &mq_policy_type
 };
 
 static int __init mq_init(void)
diff --git a/drivers/md/dm-cache-policy.c b/drivers/md/dm-cache-policy.c
index d80057968407..c1a3cee99b44 100644
--- a/drivers/md/dm-cache-policy.c
+++ b/drivers/md/dm-cache-policy.c
@@ -146,6 +146,10 @@ const char *dm_cache_policy_get_name(struct dm_cache_policy *p)
 {
 	struct dm_cache_policy_type *t = p->private;
 
+	/* if t->real is set then an alias was used (e.g. "default") */
+	if (t->real)
+		return t->real->name;
+
 	return t->name;
 }
 EXPORT_SYMBOL_GPL(dm_cache_policy_get_name);
diff --git a/drivers/md/dm-cache-policy.h b/drivers/md/dm-cache-policy.h
index 052c00a84a5c..f50fe360c546 100644
--- a/drivers/md/dm-cache-policy.h
+++ b/drivers/md/dm-cache-policy.h
@@ -223,6 +223,12 @@ struct dm_cache_policy_type {
 	unsigned version[CACHE_POLICY_VERSION_SIZE];
 
 	/*
+	 * For use by an alias dm_cache_policy_type to point to the
+	 * real dm_cache_policy_type.
+	 */
+	struct dm_cache_policy_type *real;
+
+	/*
 	 * Policies may store a hint for each each cache block.
 	 * Currently the size of this hint must be 0 or 4 bytes but we
 	 * expect to relax this in future.
diff --git a/drivers/md/dm-cache-target.c b/drivers/md/dm-cache-target.c
index 1b1469ebe5cb..09334c275c79 100644
--- a/drivers/md/dm-cache-target.c
+++ b/drivers/md/dm-cache-target.c
@@ -2826,12 +2826,13 @@ static void cache_resume(struct dm_target *ti)
 /*
  * Status format:
  *
- * <#used metadata blocks>/<#total metadata blocks>
+ * <metadata block size> <#used metadata blocks>/<#total metadata blocks>
+ * <cache block size> <#used cache blocks>/<#total cache blocks>
  * <#read hits> <#read misses> <#write hits> <#write misses>
- * <#demotions> <#promotions> <#blocks in cache> <#dirty>
+ * <#demotions> <#promotions> <#dirty>
  * <#features> <features>*
  * <#core args> <core args>
- * <#policy args> <policy args>*
+ * <policy name> <#policy args> <policy args>*
  */
 static void cache_status(struct dm_target *ti, status_type_t type,
 			 unsigned status_flags, char *result, unsigned maxlen)
@@ -2869,17 +2870,20 @@ static void cache_status(struct dm_target *ti, status_type_t type,
 
 		residency = policy_residency(cache->policy);
 
-		DMEMIT("%llu/%llu %u %u %u %u %u %u %llu %u ",
+		DMEMIT("%u %llu/%llu %u %llu/%llu %u %u %u %u %u %u %llu ",
+		       (unsigned)(DM_CACHE_METADATA_BLOCK_SIZE >> SECTOR_SHIFT),
 		       (unsigned long long)(nr_blocks_metadata - nr_free_blocks_metadata),
 		       (unsigned long long)nr_blocks_metadata,
+		       cache->sectors_per_block,
+		       (unsigned long long) from_cblock(residency),
+		       (unsigned long long) from_cblock(cache->cache_size),
 		       (unsigned) atomic_read(&cache->stats.read_hit),
 		       (unsigned) atomic_read(&cache->stats.read_miss),
 		       (unsigned) atomic_read(&cache->stats.write_hit),
 		       (unsigned) atomic_read(&cache->stats.write_miss),
 		       (unsigned) atomic_read(&cache->stats.demotion),
 		       (unsigned) atomic_read(&cache->stats.promotion),
-		       (unsigned long long) from_cblock(residency),
-		       cache->nr_dirty);
+		       (unsigned long long) from_cblock(cache->nr_dirty));
 
 		if (writethrough_mode(&cache->features))
 			DMEMIT("1 writethrough ");
@@ -2896,6 +2900,8 @@ static void cache_status(struct dm_target *ti, status_type_t type,
 		}
 
 		DMEMIT("2 migration_threshold %llu ", (unsigned long long) cache->migration_threshold);
+
+		DMEMIT("%s ", dm_cache_policy_get_name(cache->policy));
 		if (sz < maxlen) {
 			r = policy_emit_config_values(cache->policy, result + sz, maxlen - sz);
 			if (r)
@@ -3129,7 +3135,7 @@ static void cache_io_hints(struct dm_target *ti, struct queue_limits *limits)
 
 static struct target_type cache_target = {
 	.name = "cache",
-	.version = {1, 2, 0},
+	.version = {1, 3, 0},
 	.module = THIS_MODULE,
 	.ctr = cache_ctr,
 	.dtr = cache_dtr,
diff --git a/drivers/md/dm-delay.c b/drivers/md/dm-delay.c
index 2f91d6d4a2cc..a8a511c053a5 100644
--- a/drivers/md/dm-delay.c
+++ b/drivers/md/dm-delay.c
@@ -24,7 +24,6 @@ struct delay_c {
 	struct work_struct flush_expired_bios;
 	struct list_head delayed_bios;
 	atomic_t may_delay;
-	mempool_t *delayed_pool;
 
 	struct dm_dev *dev_read;
 	sector_t start_read;
@@ -40,14 +39,11 @@ struct delay_c {
 struct dm_delay_info {
 	struct delay_c *context;
 	struct list_head list;
-	struct bio *bio;
 	unsigned long expires;
 };
 
 static DEFINE_MUTEX(delayed_bios_lock);
 
-static struct kmem_cache *delayed_cache;
-
 static void handle_delayed_timer(unsigned long data)
 {
 	struct delay_c *dc = (struct delay_c *)data;
@@ -87,13 +83,14 @@ static struct bio *flush_delayed_bios(struct delay_c *dc, int flush_all)
 	mutex_lock(&delayed_bios_lock);
 	list_for_each_entry_safe(delayed, next, &dc->delayed_bios, list) {
 		if (flush_all || time_after_eq(jiffies, delayed->expires)) {
+			struct bio *bio = dm_bio_from_per_bio_data(delayed,
+						sizeof(struct dm_delay_info));
 			list_del(&delayed->list);
-			bio_list_add(&flush_bios, delayed->bio);
-			if ((bio_data_dir(delayed->bio) == WRITE))
+			bio_list_add(&flush_bios, bio);
+			if ((bio_data_dir(bio) == WRITE))
 				delayed->context->writes--;
 			else
 				delayed->context->reads--;
-			mempool_free(delayed, dc->delayed_pool);
 			continue;
 		}
 
@@ -185,12 +182,6 @@ static int delay_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 	}
 
 out:
-	dc->delayed_pool = mempool_create_slab_pool(128, delayed_cache);
-	if (!dc->delayed_pool) {
-		DMERR("Couldn't create delayed bio pool.");
-		goto bad_dev_write;
-	}
-
 	dc->kdelayd_wq = alloc_workqueue("kdelayd", WQ_MEM_RECLAIM, 0);
 	if (!dc->kdelayd_wq) {
 		DMERR("Couldn't start kdelayd");
@@ -206,12 +197,11 @@ out:
 
 	ti->num_flush_bios = 1;
 	ti->num_discard_bios = 1;
+	ti->per_bio_data_size = sizeof(struct dm_delay_info);
 	ti->private = dc;
 	return 0;
 
 bad_queue:
-	mempool_destroy(dc->delayed_pool);
-bad_dev_write:
 	if (dc->dev_write)
 		dm_put_device(ti, dc->dev_write);
 bad_dev_read:
@@ -232,7 +222,6 @@ static void delay_dtr(struct dm_target *ti)
 	if (dc->dev_write)
 		dm_put_device(ti, dc->dev_write);
 
-	mempool_destroy(dc->delayed_pool);
 	kfree(dc);
 }
 
@@ -244,10 +233,9 @@ static int delay_bio(struct delay_c *dc, int delay, struct bio *bio)
 	if (!delay || !atomic_read(&dc->may_delay))
 		return 1;
 
-	delayed = mempool_alloc(dc->delayed_pool, GFP_NOIO);
+	delayed = dm_per_bio_data(bio, sizeof(struct dm_delay_info));
 
 	delayed->context = dc;
-	delayed->bio = bio;
 	delayed->expires = expires = jiffies + (delay * HZ / 1000);
 
 	mutex_lock(&delayed_bios_lock);
@@ -356,13 +344,7 @@ static struct target_type delay_target = {
 
 static int __init dm_delay_init(void)
 {
-	int r = -ENOMEM;
-
-	delayed_cache = KMEM_CACHE(dm_delay_info, 0);
-	if (!delayed_cache) {
-		DMERR("Couldn't create delayed bio cache.");
-		goto bad_memcache;
-	}
+	int r;
 
 	r = dm_register_target(&delay_target);
 	if (r < 0) {
@@ -373,15 +355,12 @@ static int __init dm_delay_init(void)
 	return 0;
 
 bad_register:
-	kmem_cache_destroy(delayed_cache);
-bad_memcache:
 	return r;
 }
 
 static void __exit dm_delay_exit(void)
 {
 	dm_unregister_target(&delay_target);
-	kmem_cache_destroy(delayed_cache);
 }
 
 /* Module hooks */
diff --git a/drivers/md/dm-log-userspace-base.c b/drivers/md/dm-log-userspace-base.c
index 9429159d9ee3..b953db6cc229 100644
--- a/drivers/md/dm-log-userspace-base.c
+++ b/drivers/md/dm-log-userspace-base.c
@@ -10,10 +10,11 @@
 #include <linux/device-mapper.h>
 #include <linux/dm-log-userspace.h>
 #include <linux/module.h>
+#include <linux/workqueue.h>
 
 #include "dm-log-userspace-transfer.h"
 
-#define DM_LOG_USERSPACE_VSN "1.1.0"
+#define DM_LOG_USERSPACE_VSN "1.3.0"
 
 struct flush_entry {
 	int type;
@@ -58,6 +59,18 @@ struct log_c {
 	spinlock_t flush_lock;
 	struct list_head mark_list;
 	struct list_head clear_list;
+
+	/*
+	 * Workqueue for flush of clear region requests.
+	 */
+	struct workqueue_struct *dmlog_wq;
+	struct delayed_work flush_log_work;
+	atomic_t sched_flush;
+
+	/*
+	 * Combine userspace flush and mark requests for efficiency.
+	 */
+	uint32_t integrated_flush;
 };
 
 static mempool_t *flush_entry_pool;
@@ -122,6 +135,9 @@ static int build_constructor_string(struct dm_target *ti,
 
 	*ctr_str = NULL;
 
+	/*
+	 * Determine overall size of the string.
+	 */
 	for (i = 0, str_size = 0; i < argc; i++)
 		str_size += strlen(argv[i]) + 1; /* +1 for space between args */
 
@@ -141,18 +157,39 @@ static int build_constructor_string(struct dm_target *ti,
 	return str_size;
 }
 
+static void do_flush(struct work_struct *work)
+{
+	int r;
+	struct log_c *lc = container_of(work, struct log_c, flush_log_work.work);
+
+	atomic_set(&lc->sched_flush, 0);
+
+	r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH, NULL, 0, NULL, NULL);
+
+	if (r)
+		dm_table_event(lc->ti->table);
+}
+
 /*
  * userspace_ctr
  *
  * argv contains:
- *	<UUID> <other args>
- * Where 'other args' is the userspace implementation specific log
- * arguments.  An example might be:
- *	<UUID> clustered-disk <arg count> <log dev> <region_size> [[no]sync]
+ *	<UUID> [integrated_flush] <other args>
+ * Where 'other args' are the userspace implementation-specific log
+ * arguments.
+ *
+ * Example:
+ *	<UUID> [integrated_flush] clustered-disk <arg count> <log dev>
+ *	<region_size> [[no]sync]
+ *
+ * This module strips off the <UUID> and uses it for identification
+ * purposes when communicating with userspace about a log.
  *
- * So, this module will strip off the <UUID> for identification purposes
- * when communicating with userspace about a log; but will pass on everything
- * else.
+ * If integrated_flush is defined, the kernel combines flush
+ * and mark requests.
+ *
+ * The rest of the line, beginning with 'clustered-disk', is passed
+ * to the userspace ctr function.
  */
 static int userspace_ctr(struct dm_dirty_log *log, struct dm_target *ti,
 			 unsigned argc, char **argv)
@@ -188,12 +225,22 @@ static int userspace_ctr(struct dm_dirty_log *log, struct dm_target *ti,
 		return -EINVAL;
 	}
 
+	lc->usr_argc = argc;
+
 	strncpy(lc->uuid, argv[0], DM_UUID_LEN);
+	argc--;
+	argv++;
 	spin_lock_init(&lc->flush_lock);
 	INIT_LIST_HEAD(&lc->mark_list);
 	INIT_LIST_HEAD(&lc->clear_list);
 
-	str_size = build_constructor_string(ti, argc - 1, argv + 1, &ctr_str);
+	if (!strcasecmp(argv[0], "integrated_flush")) {
+		lc->integrated_flush = 1;
+		argc--;
+		argv++;
+	}
+
+	str_size = build_constructor_string(ti, argc, argv, &ctr_str);
 	if (str_size < 0) {
 		kfree(lc);
 		return str_size;
@@ -246,6 +293,19 @@ static int userspace_ctr(struct dm_dirty_log *log, struct dm_target *ti,
 			DMERR("Failed to register %s with device-mapper",
 			      devices_rdata);
 	}
+
+	if (lc->integrated_flush) {
+		lc->dmlog_wq = alloc_workqueue("dmlogd", WQ_MEM_RECLAIM, 0);
+		if (!lc->dmlog_wq) {
+			DMERR("couldn't start dmlogd");
+			r = -ENOMEM;
+			goto out;
+		}
+
+		INIT_DELAYED_WORK(&lc->flush_log_work, do_flush);
+		atomic_set(&lc->sched_flush, 0);
+	}
+
 out:
 	kfree(devices_rdata);
 	if (r) {
@@ -253,7 +313,6 @@ out:
 		kfree(ctr_str);
 	} else {
 		lc->usr_argv_str = ctr_str;
-		lc->usr_argc = argc;
 		log->context = lc;
 	}
 
@@ -264,9 +323,16 @@ static void userspace_dtr(struct dm_dirty_log *log)
 {
 	struct log_c *lc = log->context;
 
+	if (lc->integrated_flush) {
+		/* flush workqueue */
+		if (atomic_read(&lc->sched_flush))
+			flush_delayed_work(&lc->flush_log_work);
+
+		destroy_workqueue(lc->dmlog_wq);
+	}
+
 	(void) dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_DTR,
-				 NULL, 0,
-				 NULL, NULL);
+				    NULL, 0, NULL, NULL);
 
 	if (lc->log_dev)
 		dm_put_device(lc->ti, lc->log_dev);
@@ -283,8 +349,7 @@ static int userspace_presuspend(struct dm_dirty_log *log)
 	struct log_c *lc = log->context;
 
 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_PRESUSPEND,
-				 NULL, 0,
-				 NULL, NULL);
+				 NULL, 0, NULL, NULL);
 
 	return r;
 }
@@ -294,9 +359,14 @@ static int userspace_postsuspend(struct dm_dirty_log *log)
 	int r;
 	struct log_c *lc = log->context;
 
+	/*
+	 * Run planned flush earlier.
+	 */
+	if (lc->integrated_flush && atomic_read(&lc->sched_flush))
+		flush_delayed_work(&lc->flush_log_work);
+
 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_POSTSUSPEND,
-				 NULL, 0,
-				 NULL, NULL);
+				 NULL, 0, NULL, NULL);
 
 	return r;
 }
@@ -308,8 +378,7 @@ static int userspace_resume(struct dm_dirty_log *log)
 
 	lc->in_sync_hint = 0;
 	r = dm_consult_userspace(lc->uuid, lc->luid, DM_ULOG_RESUME,
-				 NULL, 0,
-				 NULL, NULL);
+				 NULL, 0, NULL, NULL);
 
 	return r;
 }
@@ -405,7 +474,8 @@ static int flush_one_by_one(struct log_c *lc, struct list_head *flush_list)
 	return r;
 }
 
-static int flush_by_group(struct log_c *lc, struct list_head *flush_list)
+static int flush_by_group(struct log_c *lc, struct list_head *flush_list,
+			  int flush_with_payload)
 {
 	int r = 0;
 	int count;
@@ -431,15 +501,29 @@ static int flush_by_group(struct log_c *lc, struct list_head *flush_list)
 				break;
 		}
 
-		r = userspace_do_request(lc, lc->uuid, type,
-					 (char *)(group),
-					 count * sizeof(uint64_t),
-					 NULL, NULL);
-		if (r) {
-			/* Group send failed.  Attempt one-by-one. */
-			list_splice_init(&tmp_list, flush_list);
-			r = flush_one_by_one(lc, flush_list);
-			break;
+		if (flush_with_payload) {
+			r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH,
+						 (char *)(group),
+						 count * sizeof(uint64_t),
+						 NULL, NULL);
+			/*
+			 * Integrated flush failed.
+			 */
+			if (r)
+				break;
+		} else {
+			r = userspace_do_request(lc, lc->uuid, type,
+						 (char *)(group),
+						 count * sizeof(uint64_t),
+						 NULL, NULL);
+			if (r) {
+				/*
+				 * Group send failed.  Attempt one-by-one.
+				 */
+				list_splice_init(&tmp_list, flush_list);
+				r = flush_one_by_one(lc, flush_list);
+				break;
+			}
 		}
 	}
 
@@ -476,6 +560,8 @@ static int userspace_flush(struct dm_dirty_log *log)
 	struct log_c *lc = log->context;
 	LIST_HEAD(mark_list);
 	LIST_HEAD(clear_list);
+	int mark_list_is_empty;
+	int clear_list_is_empty;
 	struct flush_entry *fe, *tmp_fe;
 
 	spin_lock_irqsave(&lc->flush_lock, flags);
@@ -483,23 +569,51 @@ static int userspace_flush(struct dm_dirty_log *log)
 	list_splice_init(&lc->clear_list, &clear_list);
 	spin_unlock_irqrestore(&lc->flush_lock, flags);
 
-	if (list_empty(&mark_list) && list_empty(&clear_list))
+	mark_list_is_empty = list_empty(&mark_list);
+	clear_list_is_empty = list_empty(&clear_list);
+
+	if (mark_list_is_empty && clear_list_is_empty)
 		return 0;
 
-	r = flush_by_group(lc, &mark_list);
+	r = flush_by_group(lc, &clear_list, 0);
 	if (r)
-		goto fail;
+		goto out;
 
-	r = flush_by_group(lc, &clear_list);
+	if (!lc->integrated_flush) {
+		r = flush_by_group(lc, &mark_list, 0);
+		if (r)
+			goto out;
+		r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH,
+					 NULL, 0, NULL, NULL);
+		goto out;
+	}
+
+	/*
+	 * Send integrated flush request with mark_list as payload.
+	 */
+	r = flush_by_group(lc, &mark_list, 1);
 	if (r)
-		goto fail;
+		goto out;
 
-	r = userspace_do_request(lc, lc->uuid, DM_ULOG_FLUSH,
-				 NULL, 0, NULL, NULL);
+	if (mark_list_is_empty && !atomic_read(&lc->sched_flush)) {
+		/*
+		 * When there are only clear region requests,
+		 * we schedule a flush in the future.
+		 */
+		queue_delayed_work(lc->dmlog_wq, &lc->flush_log_work, 3 * HZ);
+		atomic_set(&lc->sched_flush, 1);
+	} else {
+		/*
+		 * Cancel pending flush because we
+		 * have already flushed in mark_region.
+		 */
+		cancel_delayed_work(&lc->flush_log_work);
+		atomic_set(&lc->sched_flush, 0);
+	}
 
-fail:
+out:
 	/*
-	 * We can safely remove these entries, even if failure.
+	 * We can safely remove these entries, even after failure.
 	 * Calling code will receive an error and will know that
 	 * the log facility has failed.
 	 */
@@ -603,8 +717,7 @@ static int userspace_get_resync_work(struct dm_dirty_log *log, region_t *region)
 
 	rdata_size = sizeof(pkg);
 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_GET_RESYNC_WORK,
-				 NULL, 0,
-				 (char *)&pkg, &rdata_size);
+				 NULL, 0, (char *)&pkg, &rdata_size);
 
 	*region = pkg.r;
 	return (r) ? r : (int)pkg.i;
@@ -630,8 +743,7 @@ static void userspace_set_region_sync(struct dm_dirty_log *log,
 	pkg.i = (int64_t)in_sync;
 
 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_SET_REGION_SYNC,
-				 (char *)&pkg, sizeof(pkg),
-				 NULL, NULL);
+				 (char *)&pkg, sizeof(pkg), NULL, NULL);
 
 	/*
 	 * It would be nice to be able to report failures.
@@ -657,8 +769,7 @@ static region_t userspace_get_sync_count(struct dm_dirty_log *log)
 
 	rdata_size = sizeof(sync_count);
 	r = userspace_do_request(lc, lc->uuid, DM_ULOG_GET_SYNC_COUNT,
-				 NULL, 0,
-				 (char *)&sync_count, &rdata_size);
+				 NULL, 0, (char *)&sync_count, &rdata_size);
 
 	if (r)
 		return 0;
@@ -685,8 +796,7 @@ static int userspace_status(struct dm_dirty_log *log, status_type_t status_type,
 	switch (status_type) {
 	case STATUSTYPE_INFO:
 		r = userspace_do_request(lc, lc->uuid, DM_ULOG_STATUS_INFO,
-					 NULL, 0,
-					 result, &sz);
+					 NULL, 0, result, &sz);
 
 		if (r) {
 			sz = 0;
@@ -699,8 +809,10 @@ static int userspace_status(struct dm_dirty_log *log, status_type_t status_type,
 		BUG_ON(!table_args); /* There will always be a ' ' */
 		table_args++;
 
-		DMEMIT("%s %u %s %s ", log->type->name, lc->usr_argc,
-		       lc->uuid, table_args);
+		DMEMIT("%s %u %s ", log->type->name, lc->usr_argc, lc->uuid);
+		if (lc->integrated_flush)
+			DMEMIT("integrated_flush ");
+		DMEMIT("%s ", table_args);
 		break;
 	}
 	return (r) ? 0 : (int)sz;
diff --git a/drivers/md/dm-snap-persistent.c b/drivers/md/dm-snap-persistent.c
index 2d2b1b7588d7..afc3d017de4c 100644
--- a/drivers/md/dm-snap-persistent.c
+++ b/drivers/md/dm-snap-persistent.c
@@ -13,10 +13,13 @@
 #include <linux/export.h>
 #include <linux/slab.h>
 #include <linux/dm-io.h>
+#include "dm-bufio.h"
 
 #define DM_MSG_PREFIX "persistent snapshot"
 #define DM_CHUNK_SIZE_DEFAULT_SECTORS 32	/* 16KB */
 
+#define DM_PREFETCH_CHUNKS		12
+
 /*-----------------------------------------------------------------
  * Persistent snapshots, by persistent we mean that the snapshot
  * will survive a reboot.
@@ -257,6 +260,7 @@ static int chunk_io(struct pstore *ps, void *area, chunk_t chunk, int rw,
 	INIT_WORK_ONSTACK(&req.work, do_metadata);
 	queue_work(ps->metadata_wq, &req.work);
 	flush_workqueue(ps->metadata_wq);
+	destroy_work_on_stack(&req.work);
 
 	return req.result;
 }
@@ -401,17 +405,18 @@ static int write_header(struct pstore *ps)
 /*
  * Access functions for the disk exceptions, these do the endian conversions.
  */
-static struct disk_exception *get_exception(struct pstore *ps, uint32_t index)
+static struct disk_exception *get_exception(struct pstore *ps, void *ps_area,
+					    uint32_t index)
 {
 	BUG_ON(index >= ps->exceptions_per_area);
 
-	return ((struct disk_exception *) ps->area) + index;
+	return ((struct disk_exception *) ps_area) + index;
 }
 
-static void read_exception(struct pstore *ps,
+static void read_exception(struct pstore *ps, void *ps_area,
 			   uint32_t index, struct core_exception *result)
 {
-	struct disk_exception *de = get_exception(ps, index);
+	struct disk_exception *de = get_exception(ps, ps_area, index);
 
 	/* copy it */
 	result->old_chunk = le64_to_cpu(de->old_chunk);
@@ -421,7 +426,7 @@ static void read_exception(struct pstore *ps,
 static void write_exception(struct pstore *ps,
 			    uint32_t index, struct core_exception *e)
 {
-	struct disk_exception *de = get_exception(ps, index);
+	struct disk_exception *de = get_exception(ps, ps->area, index);
 
 	/* copy it */
 	de->old_chunk = cpu_to_le64(e->old_chunk);
@@ -430,7 +435,7 @@ static void write_exception(struct pstore *ps,
 
 static void clear_exception(struct pstore *ps, uint32_t index)
 {
-	struct disk_exception *de = get_exception(ps, index);
+	struct disk_exception *de = get_exception(ps, ps->area, index);
 
 	/* clear it */
 	de->old_chunk = 0;
@@ -442,7 +447,7 @@ static void clear_exception(struct pstore *ps, uint32_t index)
  * 'full' is filled in to indicate if the area has been
  * filled.
  */
-static int insert_exceptions(struct pstore *ps,
+static int insert_exceptions(struct pstore *ps, void *ps_area,
 			     int (*callback)(void *callback_context,
 					     chunk_t old, chunk_t new),
 			     void *callback_context,
@@ -456,7 +461,7 @@ static int insert_exceptions(struct pstore *ps,
 	*full = 1;
 
 	for (i = 0; i < ps->exceptions_per_area; i++) {
-		read_exception(ps, i, &e);
+		read_exception(ps, ps_area, i, &e);
 
 		/*
 		 * If the new_chunk is pointing at the start of
@@ -493,26 +498,72 @@ static int read_exceptions(struct pstore *ps,
 			   void *callback_context)
 {
 	int r, full = 1;
+	struct dm_bufio_client *client;
+	chunk_t prefetch_area = 0;
+
+	client = dm_bufio_client_create(dm_snap_cow(ps->store->snap)->bdev,
+					ps->store->chunk_size << SECTOR_SHIFT,
+					1, 0, NULL, NULL);
+
+	if (IS_ERR(client))
+		return PTR_ERR(client);
+
+	/*
+	 * Setup for one current buffer + desired readahead buffers.
+	 */
+	dm_bufio_set_minimum_buffers(client, 1 + DM_PREFETCH_CHUNKS);
 
 	/*
 	 * Keeping reading chunks and inserting exceptions until
 	 * we find a partially full area.
 	 */
 	for (ps->current_area = 0; full; ps->current_area++) {
-		r = area_io(ps, READ);
-		if (r)
-			return r;
+		struct dm_buffer *bp;
+		void *area;
+		chunk_t chunk;
+
+		if (unlikely(prefetch_area < ps->current_area))
+			prefetch_area = ps->current_area;
+
+		if (DM_PREFETCH_CHUNKS) do {
+			chunk_t pf_chunk = area_location(ps, prefetch_area);
+			if (unlikely(pf_chunk >= dm_bufio_get_device_size(client)))
+				break;
+			dm_bufio_prefetch(client, pf_chunk, 1);
+			prefetch_area++;
+			if (unlikely(!prefetch_area))
+				break;
+		} while (prefetch_area <= ps->current_area + DM_PREFETCH_CHUNKS);
+
+		chunk = area_location(ps, ps->current_area);
+
+		area = dm_bufio_read(client, chunk, &bp);
+		if (unlikely(IS_ERR(area))) {
+			r = PTR_ERR(area);
+			goto ret_destroy_bufio;
+		}
 
-		r = insert_exceptions(ps, callback, callback_context, &full);
-		if (r)
-			return r;
+		r = insert_exceptions(ps, area, callback, callback_context,
+				      &full);
+
+		dm_bufio_release(bp);
+
+		dm_bufio_forget(client, chunk);
+
+		if (unlikely(r))
+			goto ret_destroy_bufio;
 	}
 
 	ps->current_area--;
 
 	skip_metadata(ps);
 
-	return 0;
+	r = 0;
+
+ret_destroy_bufio:
+	dm_bufio_client_destroy(client);
+
+	return r;
 }
 
 static struct pstore *get_info(struct dm_exception_store *store)
@@ -733,7 +784,7 @@ static int persistent_prepare_merge(struct dm_exception_store *store,
 		ps->current_committed = ps->exceptions_per_area;
 	}
 
-	read_exception(ps, ps->current_committed - 1, &ce);
+	read_exception(ps, ps->area, ps->current_committed - 1, &ce);
 	*last_old_chunk = ce.old_chunk;
 	*last_new_chunk = ce.new_chunk;
 
@@ -743,8 +794,8 @@ static int persistent_prepare_merge(struct dm_exception_store *store,
 	 */
 	for (nr_consecutive = 1; nr_consecutive < ps->current_committed;
 	     nr_consecutive++) {
-		read_exception(ps, ps->current_committed - 1 - nr_consecutive,
-			       &ce);
+		read_exception(ps, ps->area,
+			       ps->current_committed - 1 - nr_consecutive, &ce);
 		if (ce.old_chunk != *last_old_chunk - nr_consecutive ||
 		    ce.new_chunk != *last_new_chunk - nr_consecutive)
 			break;
diff --git a/drivers/md/dm-snap.c b/drivers/md/dm-snap.c
index 944690bafd93..717718558bd9 100644
--- a/drivers/md/dm-snap.c
+++ b/drivers/md/dm-snap.c
@@ -610,12 +610,12 @@ static struct dm_exception *dm_lookup_exception(struct dm_exception_table *et,
 	return NULL;
 }
 
-static struct dm_exception *alloc_completed_exception(void)
+static struct dm_exception *alloc_completed_exception(gfp_t gfp)
 {
 	struct dm_exception *e;
 
-	e = kmem_cache_alloc(exception_cache, GFP_NOIO);
-	if (!e)
+	e = kmem_cache_alloc(exception_cache, gfp);
+	if (!e && gfp == GFP_NOIO)
 		e = kmem_cache_alloc(exception_cache, GFP_ATOMIC);
 
 	return e;
@@ -697,7 +697,7 @@ static int dm_add_exception(void *context, chunk_t old, chunk_t new)
 	struct dm_snapshot *s = context;
 	struct dm_exception *e;
 
-	e = alloc_completed_exception();
+	e = alloc_completed_exception(GFP_KERNEL);
 	if (!e)
 		return -ENOMEM;
 
@@ -1405,7 +1405,7 @@ static void pending_complete(struct dm_snap_pending_exception *pe, int success)
 		goto out;
 	}
 
-	e = alloc_completed_exception();
+	e = alloc_completed_exception(GFP_NOIO);
 	if (!e) {
 		down_write(&s->lock);
 		__invalidate_snapshot(s, -ENOMEM);
diff --git a/drivers/md/dm-sysfs.c b/drivers/md/dm-sysfs.c
index 84d2b91e4efb..c62c5ab6aed5 100644
--- a/drivers/md/dm-sysfs.c
+++ b/drivers/md/dm-sysfs.c
@@ -86,6 +86,7 @@ static const struct sysfs_ops dm_sysfs_ops = {
 static struct kobj_type dm_ktype = {
 	.sysfs_ops	= &dm_sysfs_ops,
 	.default_attrs	= dm_attrs,
+	.release	= dm_kobject_release,
 };
 
 /*
@@ -104,5 +105,7 @@ int dm_sysfs_init(struct mapped_device *md)
  */
 void dm_sysfs_exit(struct mapped_device *md)
 {
-	kobject_put(dm_kobject(md));
+	struct kobject *kobj = dm_kobject(md);
+	kobject_put(kobj);
+	wait_for_completion(dm_get_completion_from_kobject(kobj));
 }
diff --git a/drivers/md/dm-table.c b/drivers/md/dm-table.c
index 3ba6a3859ce3..6a7f2b83a126 100644
--- a/drivers/md/dm-table.c
+++ b/drivers/md/dm-table.c
@@ -155,7 +155,6 @@ static int alloc_targets(struct dm_table *t, unsigned int num)
 {
 	sector_t *n_highs;
 	struct dm_target *n_targets;
-	int n = t->num_targets;
 
 	/*
 	 * Allocate both the target array and offset array at once.
@@ -169,12 +168,7 @@ static int alloc_targets(struct dm_table *t, unsigned int num)
 
 	n_targets = (struct dm_target *) (n_highs + num);
 
-	if (n) {
-		memcpy(n_highs, t->highs, sizeof(*n_highs) * n);
-		memcpy(n_targets, t->targets, sizeof(*n_targets) * n);
-	}
-
-	memset(n_highs + n, -1, sizeof(*n_highs) * (num - n));
+	memset(n_highs, -1, sizeof(*n_highs) * num);
 	vfree(t->highs);
 
 	t->num_allocated = num;
@@ -261,17 +255,6 @@ void dm_table_destroy(struct dm_table *t)
 }
 
 /*
- * Checks to see if we need to extend highs or targets.
- */
-static inline int check_space(struct dm_table *t)
-{
-	if (t->num_targets >= t->num_allocated)
-		return alloc_targets(t, t->num_allocated * 2);
-
-	return 0;
-}
-
-/*
  * See if we've already got a device in the list.
  */
 static struct dm_dev_internal *find_device(struct list_head *l, dev_t dev)
@@ -731,8 +714,7 @@ int dm_table_add_target(struct dm_table *t, const char *type,
 		return -EINVAL;
 	}
 
-	if ((r = check_space(t)))
-		return r;
+	BUG_ON(t->num_targets >= t->num_allocated);
 
 	tgt = t->targets + t->num_targets;
 	memset(tgt, 0, sizeof(*tgt));
diff --git a/drivers/md/dm-thin-metadata.c b/drivers/md/dm-thin-metadata.c
index 8a30ad54bd46..7da347665552 100644
--- a/drivers/md/dm-thin-metadata.c
+++ b/drivers/md/dm-thin-metadata.c
@@ -1349,6 +1349,12 @@ dm_thin_id dm_thin_dev_id(struct dm_thin_device *td)
 	return td->id;
 }
 
+/*
+ * Check whether @time (of block creation) is older than @td's last snapshot.
+ * If so then the associated block is shared with the last snapshot device.
+ * Any block on a device created *after* the device last got snapshotted is
+ * necessarily not shared.
+ */
 static bool __snapshotted_since(struct dm_thin_device *td, uint32_t time)
 {
 	return td->snapshotted_time > time;
@@ -1458,6 +1464,20 @@ int dm_thin_remove_block(struct dm_thin_device *td, dm_block_t block)
 	return r;
 }
 
+int dm_pool_block_is_used(struct dm_pool_metadata *pmd, dm_block_t b, bool *result)
+{
+	int r;
+	uint32_t ref_count;
+
+	down_read(&pmd->root_lock);
+	r = dm_sm_get_count(pmd->data_sm, b, &ref_count);
+	if (!r)
+		*result = (ref_count != 0);
+	up_read(&pmd->root_lock);
+
+	return r;
+}
+
 bool dm_thin_changed_this_transaction(struct dm_thin_device *td)
 {
 	int r;
diff --git a/drivers/md/dm-thin-metadata.h b/drivers/md/dm-thin-metadata.h
index 7bcc0e1d6238..9a368567632f 100644
--- a/drivers/md/dm-thin-metadata.h
+++ b/drivers/md/dm-thin-metadata.h
@@ -131,7 +131,7 @@ dm_thin_id dm_thin_dev_id(struct dm_thin_device *td);
 
 struct dm_thin_lookup_result {
 	dm_block_t block;
-	unsigned shared:1;
+	bool shared:1;
 };
 
 /*
@@ -181,6 +181,8 @@ int dm_pool_get_data_block_size(struct dm_pool_metadata *pmd, sector_t *result);
 
 int dm_pool_get_data_dev_size(struct dm_pool_metadata *pmd, dm_block_t *result);
 
+int dm_pool_block_is_used(struct dm_pool_metadata *pmd, dm_block_t b, bool *result);
+
 /*
  * Returns -ENOSPC if the new size is too small and already allocated
  * blocks would be lost.
diff --git a/drivers/md/dm-thin.c b/drivers/md/dm-thin.c
index ee29037ffc2e..726228b33a01 100644
--- a/drivers/md/dm-thin.c
+++ b/drivers/md/dm-thin.c
@@ -144,6 +144,7 @@ struct pool_features {
 	bool zero_new_blocks:1;
 	bool discard_enabled:1;
 	bool discard_passdown:1;
+	bool error_if_no_space:1;
 };
 
 struct thin_c;
@@ -163,8 +164,7 @@ struct pool {
 	int sectors_per_block_shift;
 
 	struct pool_features pf;
-	unsigned low_water_triggered:1;	/* A dm event has been sent */
-	unsigned no_free_space:1;	/* A -ENOSPC warning has been issued */
+	bool low_water_triggered:1;	/* A dm event has been sent */
 
 	struct dm_bio_prison *prison;
 	struct dm_kcopyd_client *copier;
@@ -198,7 +198,8 @@ struct pool {
 };
 
 static enum pool_mode get_pool_mode(struct pool *pool);
-static void set_pool_mode(struct pool *pool, enum pool_mode mode);
+static void out_of_data_space(struct pool *pool);
+static void metadata_operation_failed(struct pool *pool, const char *op, int r);
 
 /*
  * Target context for a pool.
@@ -509,15 +510,16 @@ static void remap_and_issue(struct thin_c *tc, struct bio *bio,
 struct dm_thin_new_mapping {
 	struct list_head list;
 
-	unsigned quiesced:1;
-	unsigned prepared:1;
-	unsigned pass_discard:1;
+	bool quiesced:1;
+	bool prepared:1;
+	bool pass_discard:1;
+	bool definitely_not_shared:1;
 
+	int err;
 	struct thin_c *tc;
 	dm_block_t virt_block;
 	dm_block_t data_block;
 	struct dm_bio_prison_cell *cell, *cell2;
-	int err;
 
 	/*
 	 * If the bio covers the whole area of a block then we can avoid
@@ -534,7 +536,7 @@ static void __maybe_add_mapping(struct dm_thin_new_mapping *m)
 	struct pool *pool = m->tc->pool;
 
 	if (m->quiesced && m->prepared) {
-		list_add(&m->list, &pool->prepared_mappings);
+		list_add_tail(&m->list, &pool->prepared_mappings);
 		wake_worker(pool);
 	}
 }
@@ -548,7 +550,7 @@ static void copy_complete(int read_err, unsigned long write_err, void *context)
 	m->err = read_err || write_err ? -EIO : 0;
 
 	spin_lock_irqsave(&pool->lock, flags);
-	m->prepared = 1;
+	m->prepared = true;
 	__maybe_add_mapping(m);
 	spin_unlock_irqrestore(&pool->lock, flags);
 }
@@ -563,7 +565,7 @@ static void overwrite_endio(struct bio *bio, int err)
 	m->err = err;
 
 	spin_lock_irqsave(&pool->lock, flags);
-	m->prepared = 1;
+	m->prepared = true;
 	__maybe_add_mapping(m);
 	spin_unlock_irqrestore(&pool->lock, flags);
 }
@@ -640,9 +642,7 @@ static void process_prepared_mapping(struct dm_thin_new_mapping *m)
 	 */
 	r = dm_thin_insert_block(tc->td, m->virt_block, m->data_block);
 	if (r) {
-		DMERR_LIMIT("%s: dm_thin_insert_block() failed: error = %d",
-			    dm_device_name(pool->pool_md), r);
-		set_pool_mode(pool, PM_READ_ONLY);
+		metadata_operation_failed(pool, "dm_thin_insert_block", r);
 		cell_error(pool, m->cell);
 		goto out;
 	}
@@ -683,7 +683,15 @@ static void process_prepared_discard_passdown(struct dm_thin_new_mapping *m)
 	cell_defer_no_holder(tc, m->cell2);
 
 	if (m->pass_discard)
-		remap_and_issue(tc, m->bio, m->data_block);
+		if (m->definitely_not_shared)
+			remap_and_issue(tc, m->bio, m->data_block);
+		else {
+			bool used = false;
+			if (dm_pool_block_is_used(tc->pool->pmd, m->data_block, &used) || used)
+				bio_endio(m->bio, 0);
+			else
+				remap_and_issue(tc, m->bio, m->data_block);
+		}
 	else
 		bio_endio(m->bio, 0);
 
@@ -751,13 +759,17 @@ static int ensure_next_mapping(struct pool *pool)
 
 static struct dm_thin_new_mapping *get_next_mapping(struct pool *pool)
 {
-	struct dm_thin_new_mapping *r = pool->next_mapping;
+	struct dm_thin_new_mapping *m = pool->next_mapping;
 
 	BUG_ON(!pool->next_mapping);
 
+	memset(m, 0, sizeof(struct dm_thin_new_mapping));
+	INIT_LIST_HEAD(&m->list);
+	m->bio = NULL;
+
 	pool->next_mapping = NULL;
 
-	return r;
+	return m;
 }
 
 static void schedule_copy(struct thin_c *tc, dm_block_t virt_block,
@@ -769,18 +781,13 @@ static void schedule_copy(struct thin_c *tc, dm_block_t virt_block,
 	struct pool *pool = tc->pool;
 	struct dm_thin_new_mapping *m = get_next_mapping(pool);
 
-	INIT_LIST_HEAD(&m->list);
-	m->quiesced = 0;
-	m->prepared = 0;
 	m->tc = tc;
 	m->virt_block = virt_block;
 	m->data_block = data_dest;
 	m->cell = cell;
-	m->err = 0;
-	m->bio = NULL;
 
 	if (!dm_deferred_set_add_work(pool->shared_read_ds, &m->list))
-		m->quiesced = 1;
+		m->quiesced = true;
 
 	/*
 	 * IO to pool_dev remaps to the pool target's data_dev.
@@ -840,15 +847,12 @@ static void schedule_zero(struct thin_c *tc, dm_block_t virt_block,
 	struct pool *pool = tc->pool;
 	struct dm_thin_new_mapping *m = get_next_mapping(pool);
 
-	INIT_LIST_HEAD(&m->list);
-	m->quiesced = 1;
-	m->prepared = 0;
+	m->quiesced = true;
+	m->prepared = false;
 	m->tc = tc;
 	m->virt_block = virt_block;
 	m->data_block = data_block;
 	m->cell = cell;
-	m->err = 0;
-	m->bio = NULL;
 
 	/*
 	 * If the whole block of data is being overwritten or we are not
@@ -895,41 +899,42 @@ static int commit(struct pool *pool)
 		return -EINVAL;
 
 	r = dm_pool_commit_metadata(pool->pmd);
-	if (r) {
-		DMERR_LIMIT("%s: dm_pool_commit_metadata failed: error = %d",
-			    dm_device_name(pool->pool_md), r);
-		set_pool_mode(pool, PM_READ_ONLY);
-	}
+	if (r)
+		metadata_operation_failed(pool, "dm_pool_commit_metadata", r);
 
 	return r;
 }
 
-static int alloc_data_block(struct thin_c *tc, dm_block_t *result)
+static void check_low_water_mark(struct pool *pool, dm_block_t free_blocks)
 {
-	int r;
-	dm_block_t free_blocks;
 	unsigned long flags;
-	struct pool *pool = tc->pool;
-
-	/*
-	 * Once no_free_space is set we must not allow allocation to succeed.
-	 * Otherwise it is difficult to explain, debug, test and support.
-	 */
-	if (pool->no_free_space)
-		return -ENOSPC;
-
-	r = dm_pool_get_free_block_count(pool->pmd, &free_blocks);
-	if (r)
-		return r;
 
 	if (free_blocks <= pool->low_water_blocks && !pool->low_water_triggered) {
 		DMWARN("%s: reached low water mark for data device: sending event.",
 		       dm_device_name(pool->pool_md));
 		spin_lock_irqsave(&pool->lock, flags);
-		pool->low_water_triggered = 1;
+		pool->low_water_triggered = true;
 		spin_unlock_irqrestore(&pool->lock, flags);
 		dm_table_event(pool->ti->table);
 	}
+}
+
+static int alloc_data_block(struct thin_c *tc, dm_block_t *result)
+{
+	int r;
+	dm_block_t free_blocks;
+	struct pool *pool = tc->pool;
+
+	if (get_pool_mode(pool) != PM_WRITE)
+		return -EINVAL;
+
+	r = dm_pool_get_free_block_count(pool->pmd, &free_blocks);
+	if (r) {
+		metadata_operation_failed(pool, "dm_pool_get_free_block_count", r);
+		return r;
+	}
+
+	check_low_water_mark(pool, free_blocks);
 
 	if (!free_blocks) {
 		/*
@@ -941,35 +946,20 @@ static int alloc_data_block(struct thin_c *tc, dm_block_t *result)
 			return r;
 
 		r = dm_pool_get_free_block_count(pool->pmd, &free_blocks);
-		if (r)
+		if (r) {
+			metadata_operation_failed(pool, "dm_pool_get_free_block_count", r);
 			return r;
+		}
 
-		/*
-		 * If we still have no space we set a flag to avoid
-		 * doing all this checking and return -ENOSPC.  This
-		 * flag serves as a latch that disallows allocations from
-		 * this pool until the admin takes action (e.g. resize or
-		 * table reload).
-		 */
 		if (!free_blocks) {
-			DMWARN("%s: no free data space available.",
-			       dm_device_name(pool->pool_md));
-			spin_lock_irqsave(&pool->lock, flags);
-			pool->no_free_space = 1;
-			spin_unlock_irqrestore(&pool->lock, flags);
+			out_of_data_space(pool);
 			return -ENOSPC;
 		}
 	}
 
 	r = dm_pool_alloc_data_block(pool->pmd, result);
 	if (r) {
-		if (r == -ENOSPC &&
-		    !dm_pool_get_free_metadata_block_count(pool->pmd, &free_blocks) &&
-		    !free_blocks) {
-			DMWARN("%s: no free metadata space available.",
-			       dm_device_name(pool->pool_md));
-			set_pool_mode(pool, PM_READ_ONLY);
-		}
+		metadata_operation_failed(pool, "dm_pool_alloc_data_block", r);
 		return r;
 	}
 
@@ -992,7 +982,21 @@ static void retry_on_resume(struct bio *bio)
 	spin_unlock_irqrestore(&pool->lock, flags);
 }
 
-static void no_space(struct pool *pool, struct dm_bio_prison_cell *cell)
+static void handle_unserviceable_bio(struct pool *pool, struct bio *bio)
+{
+	/*
+	 * When pool is read-only, no cell locking is needed because
+	 * nothing is changing.
+	 */
+	WARN_ON_ONCE(get_pool_mode(pool) != PM_READ_ONLY);
+
+	if (pool->pf.error_if_no_space)
+		bio_io_error(bio);
+	else
+		retry_on_resume(bio);
+}
+
+static void retry_bios_on_resume(struct pool *pool, struct dm_bio_prison_cell *cell)
 {
 	struct bio *bio;
 	struct bio_list bios;
@@ -1001,7 +1005,7 @@ static void no_space(struct pool *pool, struct dm_bio_prison_cell *cell)
 	cell_release(pool, cell, &bios);
 
 	while ((bio = bio_list_pop(&bios)))
-		retry_on_resume(bio);
+		handle_unserviceable_bio(pool, bio);
 }
 
 static void process_discard(struct thin_c *tc, struct bio *bio)
@@ -1040,17 +1044,17 @@ static void process_discard(struct thin_c *tc, struct bio *bio)
 			 */
 			m = get_next_mapping(pool);
 			m->tc = tc;
-			m->pass_discard = (!lookup_result.shared) && pool->pf.discard_passdown;
+			m->pass_discard = pool->pf.discard_passdown;
+			m->definitely_not_shared = !lookup_result.shared;
 			m->virt_block = block;
 			m->data_block = lookup_result.block;
 			m->cell = cell;
 			m->cell2 = cell2;
-			m->err = 0;
 			m->bio = bio;
 
 			if (!dm_deferred_set_add_work(pool->all_io_ds, &m->list)) {
 				spin_lock_irqsave(&pool->lock, flags);
-				list_add(&m->list, &pool->prepared_discards);
+				list_add_tail(&m->list, &pool->prepared_discards);
 				spin_unlock_irqrestore(&pool->lock, flags);
 				wake_worker(pool);
 			}
@@ -1105,13 +1109,12 @@ static void break_sharing(struct thin_c *tc, struct bio *bio, dm_block_t block,
 		break;
 
 	case -ENOSPC:
-		no_space(pool, cell);
+		retry_bios_on_resume(pool, cell);
 		break;
 
 	default:
 		DMERR_LIMIT("%s: alloc_data_block() failed: error = %d",
 			    __func__, r);
-		set_pool_mode(pool, PM_READ_ONLY);
 		cell_error(pool, cell);
 		break;
 	}
@@ -1184,13 +1187,12 @@ static void provision_block(struct thin_c *tc, struct bio *bio, dm_block_t block
 		break;
 
 	case -ENOSPC:
-		no_space(pool, cell);
+		retry_bios_on_resume(pool, cell);
 		break;
 
 	default:
 		DMERR_LIMIT("%s: alloc_data_block() failed: error = %d",
 			    __func__, r);
-		set_pool_mode(pool, PM_READ_ONLY);
 		cell_error(pool, cell);
 		break;
 	}
@@ -1257,7 +1259,7 @@ static void process_bio_read_only(struct thin_c *tc, struct bio *bio)
 	switch (r) {
 	case 0:
 		if (lookup_result.shared && (rw == WRITE) && bio->bi_size)
-			bio_io_error(bio);
+			handle_unserviceable_bio(tc->pool, bio);
 		else {
 			inc_all_io_entry(tc->pool, bio);
 			remap_and_issue(tc, bio, lookup_result.block);
@@ -1266,7 +1268,7 @@ static void process_bio_read_only(struct thin_c *tc, struct bio *bio)
 
 	case -ENODATA:
 		if (rw != READ) {
-			bio_io_error(bio);
+			handle_unserviceable_bio(tc->pool, bio);
 			break;
 		}
 
@@ -1390,16 +1392,16 @@ static enum pool_mode get_pool_mode(struct pool *pool)
 	return pool->pf.mode;
 }
 
-static void set_pool_mode(struct pool *pool, enum pool_mode mode)
+static void set_pool_mode(struct pool *pool, enum pool_mode new_mode)
 {
 	int r;
+	enum pool_mode old_mode = pool->pf.mode;
 
-	pool->pf.mode = mode;
-
-	switch (mode) {
+	switch (new_mode) {
 	case PM_FAIL:
-		DMERR("%s: switching pool to failure mode",
-		      dm_device_name(pool->pool_md));
+		if (old_mode != new_mode)
+			DMERR("%s: switching pool to failure mode",
+			      dm_device_name(pool->pool_md));
 		dm_pool_metadata_read_only(pool->pmd);
 		pool->process_bio = process_bio_fail;
 		pool->process_discard = process_bio_fail;
@@ -1408,13 +1410,15 @@ static void set_pool_mode(struct pool *pool, enum pool_mode mode)
 		break;
 
 	case PM_READ_ONLY:
-		DMERR("%s: switching pool to read-only mode",
-		      dm_device_name(pool->pool_md));
+		if (old_mode != new_mode)
+			DMERR("%s: switching pool to read-only mode",
+			      dm_device_name(pool->pool_md));
 		r = dm_pool_abort_metadata(pool->pmd);
 		if (r) {
 			DMERR("%s: aborting transaction failed",
 			      dm_device_name(pool->pool_md));
-			set_pool_mode(pool, PM_FAIL);
+			new_mode = PM_FAIL;
+			set_pool_mode(pool, new_mode);
 		} else {
 			dm_pool_metadata_read_only(pool->pmd);
 			pool->process_bio = process_bio_read_only;
@@ -1425,6 +1429,9 @@ static void set_pool_mode(struct pool *pool, enum pool_mode mode)
 		break;
 
 	case PM_WRITE:
+		if (old_mode != new_mode)
+			DMINFO("%s: switching pool to write mode",
+			       dm_device_name(pool->pool_md));
 		dm_pool_metadata_read_write(pool->pmd);
 		pool->process_bio = process_bio;
 		pool->process_discard = process_discard;
@@ -1432,6 +1439,35 @@ static void set_pool_mode(struct pool *pool, enum pool_mode mode)
 		pool->process_prepared_discard = process_prepared_discard;
 		break;
 	}
+
+	pool->pf.mode = new_mode;
+}
+
+/*
+ * Rather than calling set_pool_mode directly, use these which describe the
+ * reason for mode degradation.
+ */
+static void out_of_data_space(struct pool *pool)
+{
+	DMERR_LIMIT("%s: no free data space available.",
+		    dm_device_name(pool->pool_md));
+	set_pool_mode(pool, PM_READ_ONLY);
+}
+
+static void metadata_operation_failed(struct pool *pool, const char *op, int r)
+{
+	dm_block_t free_blocks;
+
+	DMERR_LIMIT("%s: metadata operation '%s' failed: error = %d",
+		    dm_device_name(pool->pool_md), op, r);
+
+	if (r == -ENOSPC &&
+	    !dm_pool_get_free_metadata_block_count(pool->pmd, &free_blocks) &&
+	    !free_blocks)
+		DMERR_LIMIT("%s: no free metadata space available.",
+			    dm_device_name(pool->pool_md));
+
+	set_pool_mode(pool, PM_READ_ONLY);
 }
 
 /*----------------------------------------------------------------*/
@@ -1538,9 +1574,9 @@ static int thin_bio_map(struct dm_target *ti, struct bio *bio)
 		if (get_pool_mode(tc->pool) == PM_READ_ONLY) {
 			/*
 			 * This block isn't provisioned, and we have no way
-			 * of doing so.  Just error it.
+			 * of doing so.
 			 */
-			bio_io_error(bio);
+			handle_unserviceable_bio(tc->pool, bio);
 			return DM_MAPIO_SUBMITTED;
 		}
 		/* fall through */
@@ -1648,6 +1684,17 @@ static int bind_control_target(struct pool *pool, struct dm_target *ti)
 	enum pool_mode new_mode = pt->adjusted_pf.mode;
 
 	/*
+	 * Don't change the pool's mode until set_pool_mode() below.
+	 * Otherwise the pool's process_* function pointers may
+	 * not match the desired pool mode.
+	 */
+	pt->adjusted_pf.mode = old_mode;
+
+	pool->ti = ti;
+	pool->pf = pt->adjusted_pf;
+	pool->low_water_blocks = pt->low_water_blocks;
+
+	/*
 	 * If we were in PM_FAIL mode, rollback of metadata failed.  We're
 	 * not going to recover without a thin_repair.  So we never let the
 	 * pool move out of the old mode.  On the other hand a PM_READ_ONLY
@@ -1657,10 +1704,6 @@ static int bind_control_target(struct pool *pool, struct dm_target *ti)
 	if (old_mode == PM_FAIL)
 		new_mode = old_mode;
 
-	pool->ti = ti;
-	pool->low_water_blocks = pt->low_water_blocks;
-	pool->pf = pt->adjusted_pf;
-
 	set_pool_mode(pool, new_mode);
 
 	return 0;
@@ -1682,6 +1725,7 @@ static void pool_features_init(struct pool_features *pf)
 	pf->zero_new_blocks = true;
 	pf->discard_enabled = true;
 	pf->discard_passdown = true;
+	pf->error_if_no_space = false;
 }
 
 static void __pool_destroy(struct pool *pool)
@@ -1772,8 +1816,7 @@ static struct pool *pool_create(struct mapped_device *pool_md,
 	bio_list_init(&pool->deferred_flush_bios);
 	INIT_LIST_HEAD(&pool->prepared_mappings);
 	INIT_LIST_HEAD(&pool->prepared_discards);
-	pool->low_water_triggered = 0;
-	pool->no_free_space = 0;
+	pool->low_water_triggered = false;
 	bio_list_init(&pool->retry_on_resume_list);
 
 	pool->shared_read_ds = dm_deferred_set_create();
@@ -1898,7 +1941,7 @@ static int parse_pool_features(struct dm_arg_set *as, struct pool_features *pf,
 	const char *arg_name;
 
 	static struct dm_arg _args[] = {
-		{0, 3, "Invalid number of pool feature arguments"},
+		{0, 4, "Invalid number of pool feature arguments"},
 	};
 
 	/*
@@ -1927,6 +1970,9 @@ static int parse_pool_features(struct dm_arg_set *as, struct pool_features *pf,
 		else if (!strcasecmp(arg_name, "read_only"))
 			pf->mode = PM_READ_ONLY;
 
+		else if (!strcasecmp(arg_name, "error_if_no_space"))
+			pf->error_if_no_space = true;
+
 		else {
 			ti->error = "Unrecognised pool feature requested";
 			r = -EINVAL;
@@ -1997,6 +2043,8 @@ static dm_block_t calc_metadata_threshold(struct pool_c *pt)
  *	     skip_block_zeroing: skips the zeroing of newly-provisioned blocks.
  *	     ignore_discard: disable discard
  *	     no_discard_passdown: don't pass discards down to the data device
+ *	     read_only: Don't allow any changes to be made to the pool metadata.
+ *	     error_if_no_space: error IOs, instead of queueing, if no space.
  */
 static int pool_ctr(struct dm_target *ti, unsigned argc, char **argv)
 {
@@ -2192,11 +2240,13 @@ static int maybe_resize_data_dev(struct dm_target *ti, bool *need_commit)
 		return -EINVAL;
 
 	} else if (data_size > sb_data_size) {
+		if (sb_data_size)
+			DMINFO("%s: growing the data device from %llu to %llu blocks",
+			       dm_device_name(pool->pool_md),
+			       sb_data_size, (unsigned long long)data_size);
 		r = dm_pool_resize_data_dev(pool->pmd, data_size);
 		if (r) {
-			DMERR("%s: failed to resize data device",
-			      dm_device_name(pool->pool_md));
-			set_pool_mode(pool, PM_READ_ONLY);
+			metadata_operation_failed(pool, "dm_pool_resize_data_dev", r);
 			return r;
 		}
 
@@ -2231,10 +2281,12 @@ static int maybe_resize_metadata_dev(struct dm_target *ti, bool *need_commit)
 		return -EINVAL;
 
 	} else if (metadata_dev_size > sb_metadata_dev_size) {
+		DMINFO("%s: growing the metadata device from %llu to %llu blocks",
+		       dm_device_name(pool->pool_md),
+		       sb_metadata_dev_size, metadata_dev_size);
 		r = dm_pool_resize_metadata_dev(pool->pmd, metadata_dev_size);
 		if (r) {
-			DMERR("%s: failed to resize metadata device",
-			      dm_device_name(pool->pool_md));
+			metadata_operation_failed(pool, "dm_pool_resize_metadata_dev", r);
 			return r;
 		}
 
@@ -2290,8 +2342,7 @@ static void pool_resume(struct dm_target *ti)
 	unsigned long flags;
 
 	spin_lock_irqsave(&pool->lock, flags);
-	pool->low_water_triggered = 0;
-	pool->no_free_space = 0;
+	pool->low_water_triggered = false;
 	__requeue_bios(pool);
 	spin_unlock_irqrestore(&pool->lock, flags);
 
@@ -2510,7 +2561,8 @@ static void emit_flags(struct pool_features *pf, char *result,
 		       unsigned sz, unsigned maxlen)
 {
 	unsigned count = !pf->zero_new_blocks + !pf->discard_enabled +
-		!pf->discard_passdown + (pf->mode == PM_READ_ONLY);
+		!pf->discard_passdown + (pf->mode == PM_READ_ONLY) +
+		pf->error_if_no_space;
 	DMEMIT("%u ", count);
 
 	if (!pf->zero_new_blocks)
@@ -2524,6 +2576,9 @@ static void emit_flags(struct pool_features *pf, char *result,
 
 	if (pf->mode == PM_READ_ONLY)
 		DMEMIT("read_only ");
+
+	if (pf->error_if_no_space)
+		DMEMIT("error_if_no_space ");
 }
 
 /*
@@ -2618,11 +2673,16 @@ static void pool_status(struct dm_target *ti, status_type_t type,
 			DMEMIT("rw ");
 
 		if (!pool->pf.discard_enabled)
-			DMEMIT("ignore_discard");
+			DMEMIT("ignore_discard ");
 		else if (pool->pf.discard_passdown)
-			DMEMIT("discard_passdown");
+			DMEMIT("discard_passdown ");
+		else
+			DMEMIT("no_discard_passdown ");
+
+		if (pool->pf.error_if_no_space)
+			DMEMIT("error_if_no_space ");
 		else
-			DMEMIT("no_discard_passdown");
+			DMEMIT("queue_if_no_space ");
 
 		break;
 
@@ -2721,7 +2781,7 @@ static struct target_type pool_target = {
 	.name = "thin-pool",
 	.features = DM_TARGET_SINGLETON | DM_TARGET_ALWAYS_WRITEABLE |
 		    DM_TARGET_IMMUTABLE,
-	.version = {1, 9, 0},
+	.version = {1, 10, 0},
 	.module = THIS_MODULE,
 	.ctr = pool_ctr,
 	.dtr = pool_dtr,
@@ -2899,7 +2959,7 @@ static int thin_endio(struct dm_target *ti, struct bio *bio, int err)
 		spin_lock_irqsave(&pool->lock, flags);
 		list_for_each_entry_safe(m, tmp, &work, list) {
 			list_del(&m->list);
-			m->quiesced = 1;
+			m->quiesced = true;
 			__maybe_add_mapping(m);
 		}
 		spin_unlock_irqrestore(&pool->lock, flags);
@@ -2911,7 +2971,7 @@ static int thin_endio(struct dm_target *ti, struct bio *bio, int err)
 		if (!list_empty(&work)) {
 			spin_lock_irqsave(&pool->lock, flags);
 			list_for_each_entry_safe(m, tmp, &work, list)
-				list_add(&m->list, &pool->prepared_discards);
+				list_add_tail(&m->list, &pool->prepared_discards);
 			spin_unlock_irqrestore(&pool->lock, flags);
 			wake_worker(pool);
 		}
@@ -3008,7 +3068,7 @@ static int thin_iterate_devices(struct dm_target *ti,
 
 static struct target_type thin_target = {
 	.name = "thin",
-	.version = {1, 9, 0},
+	.version = {1, 10, 0},
 	.module	= THIS_MODULE,
 	.ctr = thin_ctr,
 	.dtr = thin_dtr,
diff --git a/drivers/md/dm.c b/drivers/md/dm.c
index 0704c523a76b..b49c76284241 100644
--- a/drivers/md/dm.c
+++ b/drivers/md/dm.c
@@ -200,8 +200,8 @@ struct mapped_device {
 	/* forced geometry settings */
 	struct hd_geometry geometry;
 
-	/* sysfs handle */
-	struct kobject kobj;
+	/* kobject and completion */
+	struct dm_kobject_holder kobj_holder;
 
 	/* zero-length flush that will be cloned and submitted to targets */
 	struct bio flush_bio;
@@ -2041,6 +2041,7 @@ static struct mapped_device *alloc_dev(int minor)
 	init_waitqueue_head(&md->wait);
 	INIT_WORK(&md->work, dm_wq_work);
 	init_waitqueue_head(&md->eventq);
+	init_completion(&md->kobj_holder.completion);
 
 	md->disk->major = _major;
 	md->disk->first_minor = minor;
@@ -2902,20 +2903,14 @@ struct gendisk *dm_disk(struct mapped_device *md)
 
 struct kobject *dm_kobject(struct mapped_device *md)
 {
-	return &md->kobj;
+	return &md->kobj_holder.kobj;
 }
 
-/*
- * struct mapped_device should not be exported outside of dm.c
- * so use this check to verify that kobj is part of md structure
- */
 struct mapped_device *dm_get_from_kobject(struct kobject *kobj)
 {
 	struct mapped_device *md;
 
-	md = container_of(kobj, struct mapped_device, kobj);
-	if (&md->kobj != kobj)
-		return NULL;
+	md = container_of(kobj, struct mapped_device, kobj_holder.kobj);
 
 	if (test_bit(DMF_FREEING, &md->flags) ||
 	    dm_deleting_md(md))
diff --git a/drivers/md/dm.h b/drivers/md/dm.h
index c57ba550f69e..c4569f02f50f 100644
--- a/drivers/md/dm.h
+++ b/drivers/md/dm.h
@@ -15,6 +15,8 @@
 #include <linux/list.h>
 #include <linux/blkdev.h>
 #include <linux/hdreg.h>
+#include <linux/completion.h>
+#include <linux/kobject.h>
 
 #include "dm-stats.h"
 
@@ -148,12 +150,27 @@ void dm_interface_exit(void);
 /*
  * sysfs interface
  */
+struct dm_kobject_holder {
+	struct kobject kobj;
+	struct completion completion;
+};
+
+static inline struct completion *dm_get_completion_from_kobject(struct kobject *kobj)
+{
+	return &container_of(kobj, struct dm_kobject_holder, kobj)->completion;
+}
+
 int dm_sysfs_init(struct mapped_device *md);
 void dm_sysfs_exit(struct mapped_device *md);
 struct kobject *dm_kobject(struct mapped_device *md);
 struct mapped_device *dm_get_from_kobject(struct kobject *kobj);
 
 /*
+ * The kobject helper
+ */
+void dm_kobject_release(struct kobject *kobj);
+
+/*
  * Targets for linear and striped mappings
  */
 int dm_linear_init(void);
diff --git a/drivers/md/persistent-data/dm-block-manager.c b/drivers/md/persistent-data/dm-block-manager.c
index 064a3c271baa..455f79279a16 100644
--- a/drivers/md/persistent-data/dm-block-manager.c
+++ b/drivers/md/persistent-data/dm-block-manager.c
@@ -104,7 +104,7 @@ static int __check_holder(struct block_lock *lock)
 
 	for (i = 0; i < MAX_HOLDERS; i++) {
 		if (lock->holders[i] == current) {
-			DMERR("recursive lock detected in pool metadata");
+			DMERR("recursive lock detected in metadata");
 #ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
 			DMERR("previously held here:");
 			print_stack_trace(lock->traces + i, 4);
diff --git a/drivers/md/persistent-data/dm-btree.c b/drivers/md/persistent-data/dm-btree.c
index 468e371ee9b2..416060c25709 100644
--- a/drivers/md/persistent-data/dm-btree.c
+++ b/drivers/md/persistent-data/dm-btree.c
@@ -770,8 +770,8 @@ EXPORT_SYMBOL_GPL(dm_btree_insert_notify);
 
 /*----------------------------------------------------------------*/
 
-static int find_highest_key(struct ro_spine *s, dm_block_t block,
-			    uint64_t *result_key, dm_block_t *next_block)
+static int find_key(struct ro_spine *s, dm_block_t block, bool find_highest,
+		    uint64_t *result_key, dm_block_t *next_block)
 {
 	int i, r;
 	uint32_t flags;
@@ -788,7 +788,11 @@ static int find_highest_key(struct ro_spine *s, dm_block_t block,
 		else
 			i--;
 
-		*result_key = le64_to_cpu(ro_node(s)->keys[i]);
+		if (find_highest)
+			*result_key = le64_to_cpu(ro_node(s)->keys[i]);
+		else
+			*result_key = le64_to_cpu(ro_node(s)->keys[0]);
+
 		if (next_block || flags & INTERNAL_NODE)
 			block = value64(ro_node(s), i);
 
@@ -799,16 +803,16 @@ static int find_highest_key(struct ro_spine *s, dm_block_t block,
 	return 0;
 }
 
-int dm_btree_find_highest_key(struct dm_btree_info *info, dm_block_t root,
-			      uint64_t *result_keys)
+static int dm_btree_find_key(struct dm_btree_info *info, dm_block_t root,
+			     bool find_highest, uint64_t *result_keys)
 {
 	int r = 0, count = 0, level;
 	struct ro_spine spine;
 
 	init_ro_spine(&spine, info);
 	for (level = 0; level < info->levels; level++) {
-		r = find_highest_key(&spine, root, result_keys + level,
-				     level == info->levels - 1 ? NULL : &root);
+		r = find_key(&spine, root, find_highest, result_keys + level,
+			     level == info->levels - 1 ? NULL : &root);
 		if (r == -ENODATA) {
 			r = 0;
 			break;
@@ -822,8 +826,23 @@ int dm_btree_find_highest_key(struct dm_btree_info *info, dm_block_t root,
 
 	return r ? r : count;
 }
+
+int dm_btree_find_highest_key(struct dm_btree_info *info, dm_block_t root,
+			      uint64_t *result_keys)
+{
+	return dm_btree_find_key(info, root, true, result_keys);
+}
 EXPORT_SYMBOL_GPL(dm_btree_find_highest_key);
 
+int dm_btree_find_lowest_key(struct dm_btree_info *info, dm_block_t root,
+			     uint64_t *result_keys)
+{
+	return dm_btree_find_key(info, root, false, result_keys);
+}
+EXPORT_SYMBOL_GPL(dm_btree_find_lowest_key);
+
+/*----------------------------------------------------------------*/
+
 /*
  * FIXME: We shouldn't use a recursive algorithm when we have limited stack
  * space.  Also this only works for single level trees.
diff --git a/drivers/md/persistent-data/dm-btree.h b/drivers/md/persistent-data/dm-btree.h
index 8672d159e0b5..dacfc34180b4 100644
--- a/drivers/md/persistent-data/dm-btree.h
+++ b/drivers/md/persistent-data/dm-btree.h
@@ -137,6 +137,14 @@ int dm_btree_remove(struct dm_btree_info *info, dm_block_t root,
 /*
  * Returns < 0 on failure.  Otherwise the number of key entries that have
  * been filled out.  Remember trees can have zero entries, and as such have
+ * no lowest key.
+ */
+int dm_btree_find_lowest_key(struct dm_btree_info *info, dm_block_t root,
+			     uint64_t *result_keys);
+
+/*
+ * Returns < 0 on failure.  Otherwise the number of key entries that have
+ * been filled out.  Remember trees can have zero entries, and as such have
  * no highest key.
  */
 int dm_btree_find_highest_key(struct dm_btree_info *info, dm_block_t root,
diff --git a/drivers/md/persistent-data/dm-space-map-common.c b/drivers/md/persistent-data/dm-space-map-common.c
index 466a60bbd716..aacbe70c2c2e 100644
--- a/drivers/md/persistent-data/dm-space-map-common.c
+++ b/drivers/md/persistent-data/dm-space-map-common.c
@@ -245,6 +245,10 @@ int sm_ll_extend(struct ll_disk *ll, dm_block_t extra_blocks)
 		return -EINVAL;
 	}
 
+	/*
+	 * We need to set this before the dm_tm_new_block() call below.
+	 */
+	ll->nr_blocks = nr_blocks;
 	for (i = old_blocks; i < blocks; i++) {
 		struct dm_block *b;
 		struct disk_index_entry idx;
@@ -252,6 +256,7 @@ int sm_ll_extend(struct ll_disk *ll, dm_block_t extra_blocks)
 		r = dm_tm_new_block(ll->tm, &dm_sm_bitmap_validator, &b);
 		if (r < 0)
 			return r;
+
 		idx.blocknr = cpu_to_le64(dm_block_location(b));
 
 		r = dm_tm_unlock(ll->tm, b);
@@ -266,7 +271,6 @@ int sm_ll_extend(struct ll_disk *ll, dm_block_t extra_blocks)
 			return r;
 	}
 
-	ll->nr_blocks = nr_blocks;
 	return 0;
 }
 
diff --git a/drivers/md/persistent-data/dm-space-map-metadata.c b/drivers/md/persistent-data/dm-space-map-metadata.c
index 58fc1eef7499..536782e3bcb7 100644
--- a/drivers/md/persistent-data/dm-space-map-metadata.c
+++ b/drivers/md/persistent-data/dm-space-map-metadata.c
@@ -385,13 +385,13 @@ static int sm_metadata_new_block(struct dm_space_map *sm, dm_block_t *b)
 
 	int r = sm_metadata_new_block_(sm, b);
 	if (r) {
-		DMERR("unable to allocate new metadata block");
+		DMERR_LIMIT("unable to allocate new metadata block");
 		return r;
 	}
 
 	r = sm_metadata_get_nr_free(sm, &count);
 	if (r) {
-		DMERR("couldn't get free block count");
+		DMERR_LIMIT("couldn't get free block count");
 		return r;
 	}
 
@@ -608,20 +608,38 @@ static int sm_metadata_extend(struct dm_space_map *sm, dm_block_t extra_blocks)
 	 * Flick into a mode where all blocks get allocated in the new area.
 	 */
 	smm->begin = old_len;
-	memcpy(&smm->sm, &bootstrap_ops, sizeof(smm->sm));
+	memcpy(sm, &bootstrap_ops, sizeof(*sm));
 
 	/*
 	 * Extend.
 	 */
 	r = sm_ll_extend(&smm->ll, extra_blocks);
+	if (r)
+		goto out;
 
 	/*
-	 * Switch back to normal behaviour.
+	 * We repeatedly increment then commit until the commit doesn't
+	 * allocate any new blocks.
 	 */
-	memcpy(&smm->sm, &ops, sizeof(smm->sm));
-	for (i = old_len; !r && i < smm->begin; i++)
-		r = sm_ll_inc(&smm->ll, i, &ev);
+	do {
+		for (i = old_len; !r && i < smm->begin; i++) {
+			r = sm_ll_inc(&smm->ll, i, &ev);
+			if (r)
+				goto out;
+		}
+		old_len = smm->begin;
+
+		r = sm_ll_commit(&smm->ll);
+		if (r)
+			goto out;
+
+	} while (old_len != smm->begin);
 
+out:
+	/*
+	 * Switch back to normal behaviour.
+	 */
+	memcpy(sm, &ops, sizeof(*sm));
 	return r;
 }