summary refs log tree commit diff
path: root/drivers/block
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c689
1 files changed, 365 insertions, 324 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index b2c98c1bc037..623c84145b79 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -42,6 +42,7 @@
 #include <linux/blkdev.h>
 #include <linux/slab.h>
 #include <linux/idr.h>
+#include <linux/workqueue.h>
 
 #include "rbd_types.h"
 
@@ -332,7 +333,10 @@ struct rbd_device {
 
 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
+	struct list_head	rq_queue;	/* incoming rq queue */
 	spinlock_t		lock;		/* queue, flags, open_count */
+	struct workqueue_struct	*rq_wq;
+	struct work_struct	rq_work;
 
 	struct rbd_image_header	header;
 	unsigned long		flags;		/* possibly lock protected */
@@ -514,7 +518,8 @@ static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
 
 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
-static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_header_info(struct rbd_device *rbd_dev);
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
 					u64 snap_id);
 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
@@ -971,12 +976,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
 	header->snap_names = snap_names;
 	header->snap_sizes = snap_sizes;
 
-	/* Make sure mapping size is consistent with header info */
-
-	if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
-		if (rbd_dev->mapping.size != header->image_size)
-			rbd_dev->mapping.size = header->image_size;
-
 	return 0;
 out_2big:
 	ret = -EIO;
@@ -1139,6 +1138,13 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
 	rbd_dev->mapping.features = 0;
 }
 
+static void rbd_segment_name_free(const char *name)
+{
+	/* The explicit cast here is needed to drop the const qualifier */
+
+	kmem_cache_free(rbd_segment_name_cache, (void *)name);
+}
+
 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 {
 	char *name;
@@ -1158,20 +1164,13 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 	if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
 		pr_err("error formatting segment name for #%llu (%d)\n",
 			segment, ret);
-		kfree(name);
+		rbd_segment_name_free(name);
 		name = NULL;
 	}
 
 	return name;
 }
 
-static void rbd_segment_name_free(const char *name)
-{
-	/* The explicit cast here is needed to drop the const qualifier */
-
-	kmem_cache_free(rbd_segment_name_cache, (void *)name);
-}
-
 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
 {
 	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
@@ -1371,7 +1370,7 @@ static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
 		struct rbd_device *rbd_dev;
 
 		rbd_dev = obj_request->img_request->rbd_dev;
-		rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
+		rbd_warn(rbd_dev, "obj_request %p already marked img_data",
 			obj_request);
 	}
 }
@@ -1389,7 +1388,7 @@ static void obj_request_done_set(struct rbd_obj_request *obj_request)
 
 		if (obj_request_img_data_test(obj_request))
 			rbd_dev = obj_request->img_request->rbd_dev;
-		rbd_warn(rbd_dev, "obj_request %p already marked done\n",
+		rbd_warn(rbd_dev, "obj_request %p already marked done",
 			obj_request);
 	}
 }
@@ -1527,11 +1526,37 @@ static bool obj_request_type_valid(enum obj_request_type type)
 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
 				struct rbd_obj_request *obj_request)
 {
-	dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
-
+	dout("%s %p\n", __func__, obj_request);
 	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
 }
 
+static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
+{
+	dout("%s %p\n", __func__, obj_request);
+	ceph_osdc_cancel_request(obj_request->osd_req);
+}
+
+/*
+ * Wait for an object request to complete.  If interrupted, cancel the
+ * underlying osd request.
+ */
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+{
+	int ret;
+
+	dout("%s %p\n", __func__, obj_request);
+
+	ret = wait_for_completion_interruptible(&obj_request->completion);
+	if (ret < 0) {
+		dout("%s %p interrupted\n", __func__, obj_request);
+		rbd_obj_request_end(obj_request);
+		return ret;
+	}
+
+	dout("%s %p done\n", __func__, obj_request);
+	return 0;
+}
+
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
 
@@ -1558,15 +1583,6 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
 		rbd_img_request_put(img_request);
 }
 
-/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
-
-static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
-{
-	dout("%s: obj %p\n", __func__, obj_request);
-
-	return wait_for_completion_interruptible(&obj_request->completion);
-}
-
 /*
  * The default/initial value for all image request flags is 0.  Each
  * is conditionally set to 1 at image request initialization time
@@ -1763,7 +1779,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
 		rbd_osd_trivial_callback(obj_request);
 		break;
 	default:
-		rbd_warn(NULL, "%s: unsupported op %hu\n",
+		rbd_warn(NULL, "%s: unsupported op %hu",
 			obj_request->object_name, (unsigned short) opcode);
 		break;
 	}
@@ -1998,7 +2014,7 @@ static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
 	if (!counter)
 		rbd_dev_unparent(rbd_dev);
 	else
-		rbd_warn(rbd_dev, "parent reference underflow\n");
+		rbd_warn(rbd_dev, "parent reference underflow");
 }
 
 /*
@@ -2028,7 +2044,7 @@ static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
 	/* Image was flattened, but parent is not yet torn down */
 
 	if (counter < 0)
-		rbd_warn(rbd_dev, "parent reference overflow\n");
+		rbd_warn(rbd_dev, "parent reference overflow");
 
 	return false;
 }
@@ -2045,7 +2061,7 @@ static struct rbd_img_request *rbd_img_request_create(
 {
 	struct rbd_img_request *img_request;
 
-	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
+	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
 	if (!img_request)
 		return NULL;
 
@@ -2161,11 +2177,11 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
 	if (result) {
 		struct rbd_device *rbd_dev = img_request->rbd_dev;
 
-		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
+		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
 			img_request_write_test(img_request) ? "write" : "read",
 			obj_request->length, obj_request->img_offset,
 			obj_request->offset);
-		rbd_warn(rbd_dev, "  result %d xferred %x\n",
+		rbd_warn(rbd_dev, "  result %d xferred %x",
 			result, xferred);
 		if (!img_request->result)
 			img_request->result = result;
@@ -2946,154 +2962,135 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
 		rbd_dev->header_name, (unsigned long long)notify_id,
 		(unsigned int)opcode);
+
+	/*
+	 * Until adequate refresh error handling is in place, there is
+	 * not much we can do here, except warn.
+	 *
+	 * See http://tracker.ceph.com/issues/5040
+	 */
 	ret = rbd_dev_refresh(rbd_dev);
 	if (ret)
-		rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
+		rbd_warn(rbd_dev, "refresh failed: %d", ret);
 
-	rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+	if (ret)
+		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
 /*
- * Initiate a watch request, synchronously.
+ * Send a (un)watch request and wait for the ack.  Return a request
+ * with a ref held on success or error.
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+static struct rbd_obj_request *rbd_obj_watch_request_helper(
+						struct rbd_device *rbd_dev,
+						bool watch)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
 	int ret;
 
-	rbd_assert(!rbd_dev->watch_event);
-	rbd_assert(!rbd_dev->watch_request);
-
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-				     &rbd_dev->watch_event);
-	if (ret < 0)
-		return ret;
-
-	rbd_assert(rbd_dev->watch_event);
-
 	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
 					     OBJ_REQUEST_NODATA);
-	if (!obj_request) {
-		ret = -ENOMEM;
-		goto out_cancel;
-	}
+	if (!obj_request)
+		return ERR_PTR(-ENOMEM);
 
 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
 						  obj_request);
 	if (!obj_request->osd_req) {
 		ret = -ENOMEM;
-		goto out_put;
+		goto out;
 	}
 
-	ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-
 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, 1);
+			      rbd_dev->watch_event->cookie, 0, watch);
 	rbd_osd_req_format_write(obj_request);
 
+	if (watch)
+		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+
 	ret = rbd_obj_request_submit(osdc, obj_request);
 	if (ret)
-		goto out_linger;
+		goto out;
 
 	ret = rbd_obj_request_wait(obj_request);
 	if (ret)
-		goto out_linger;
+		goto out;
 
 	ret = obj_request->result;
-	if (ret)
-		goto out_linger;
-
-	/*
-	 * A watch request is set to linger, so the underlying osd
-	 * request won't go away until we unregister it.  We retain
-	 * a pointer to the object request during that time (in
-	 * rbd_dev->watch_request), so we'll keep a reference to
-	 * it.  We'll drop that reference (below) after we've
-	 * unregistered it.
-	 */
-	rbd_dev->watch_request = obj_request;
+	if (ret) {
+		if (watch)
+			rbd_obj_request_end(obj_request);
+		goto out;
+	}
 
-	return 0;
+	return obj_request;
 
-out_linger:
-	ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
-out_put:
+out:
 	rbd_obj_request_put(obj_request);
-out_cancel:
-	ceph_osdc_cancel_event(rbd_dev->watch_event);
-	rbd_dev->watch_event = NULL;
-
-	return ret;
+	return ERR_PTR(ret);
 }
 
 /*
- * Tear down a watch request, synchronously.
+ * Initiate a watch request, synchronously.
  */
-static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
 	int ret;
 
-	rbd_assert(rbd_dev->watch_event);
-	rbd_assert(rbd_dev->watch_request);
+	rbd_assert(!rbd_dev->watch_event);
+	rbd_assert(!rbd_dev->watch_request);
 
-	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
-					     OBJ_REQUEST_NODATA);
-	if (!obj_request) {
-		ret = -ENOMEM;
-		goto out_cancel;
-	}
+	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+				     &rbd_dev->watch_event);
+	if (ret < 0)
+		return ret;
 
-	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
-						  obj_request);
-	if (!obj_request->osd_req) {
-		ret = -ENOMEM;
-		goto out_put;
+	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
+	if (IS_ERR(obj_request)) {
+		ceph_osdc_cancel_event(rbd_dev->watch_event);
+		rbd_dev->watch_event = NULL;
+		return PTR_ERR(obj_request);
 	}
 
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, 0);
-	rbd_osd_req_format_write(obj_request);
-
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out_put;
+	/*
+	 * A watch request is set to linger, so the underlying osd
+	 * request won't go away until we unregister it.  We retain
+	 * a pointer to the object request during that time (in
+	 * rbd_dev->watch_request), so we'll keep a reference to it.
+	 * We'll drop that reference after we've unregistered it in
+	 * rbd_dev_header_unwatch_sync().
+	 */
+	rbd_dev->watch_request = obj_request;
 
-	ret = rbd_obj_request_wait(obj_request);
-	if (ret)
-		goto out_put;
+	return 0;
+}
 
-	ret = obj_request->result;
-	if (ret)
-		goto out_put;
+/*
+ * Tear down a watch request, synchronously.
+ */
+static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+{
+	struct rbd_obj_request *obj_request;
 
-	/* We have successfully torn down the watch request */
+	rbd_assert(rbd_dev->watch_event);
+	rbd_assert(rbd_dev->watch_request);
 
-	ceph_osdc_unregister_linger_request(osdc,
-					    rbd_dev->watch_request->osd_req);
+	rbd_obj_request_end(rbd_dev->watch_request);
 	rbd_obj_request_put(rbd_dev->watch_request);
 	rbd_dev->watch_request = NULL;
 
-out_put:
-	rbd_obj_request_put(obj_request);
-out_cancel:
+	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
+	if (!IS_ERR(obj_request))
+		rbd_obj_request_put(obj_request);
+	else
+		rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
+			 PTR_ERR(obj_request));
+
 	ceph_osdc_cancel_event(rbd_dev->watch_event);
 	rbd_dev->watch_event = NULL;
-
-	return ret;
-}
-
-static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
-{
-	int ret;
-
-	ret = __rbd_dev_header_unwatch_sync(rbd_dev);
-	if (ret) {
-		rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
-			 ret);
-	}
 }
 
 /*
@@ -3183,102 +3180,129 @@ out:
 	return ret;
 }
 
-static void rbd_request_fn(struct request_queue *q)
-		__releases(q->queue_lock) __acquires(q->queue_lock)
+static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
 {
-	struct rbd_device *rbd_dev = q->queuedata;
-	struct request *rq;
+	struct rbd_img_request *img_request;
+	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
+	u64 length = blk_rq_bytes(rq);
+	bool wr = rq_data_dir(rq) == WRITE;
 	int result;
 
-	while ((rq = blk_fetch_request(q))) {
-		bool write_request = rq_data_dir(rq) == WRITE;
-		struct rbd_img_request *img_request;
-		u64 offset;
-		u64 length;
+	/* Ignore/skip any zero-length requests */
 
-		/* Ignore any non-FS requests that filter through. */
+	if (!length) {
+		dout("%s: zero-length request\n", __func__);
+		result = 0;
+		goto err_rq;
+	}
 
-		if (rq->cmd_type != REQ_TYPE_FS) {
-			dout("%s: non-fs request type %d\n", __func__,
-				(int) rq->cmd_type);
-			__blk_end_request_all(rq, 0);
-			continue;
+	/* Disallow writes to a read-only device */
+
+	if (wr) {
+		if (rbd_dev->mapping.read_only) {
+			result = -EROFS;
+			goto err_rq;
 		}
+		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
+	}
 
-		/* Ignore/skip any zero-length requests */
+	/*
+	 * Quit early if the mapped snapshot no longer exists.  It's
+	 * still possible the snapshot will have disappeared by the
+	 * time our request arrives at the osd, but there's no sense in
+	 * sending it if we already know.
+	 */
+	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+		dout("request for non-existent snapshot");
+		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+		result = -ENXIO;
+		goto err_rq;
+	}
 
-		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
-		length = (u64) blk_rq_bytes(rq);
+	if (offset && length > U64_MAX - offset + 1) {
+		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
+			 length);
+		result = -EINVAL;
+		goto err_rq;	/* Shouldn't happen */
+	}
 
-		if (!length) {
-			dout("%s: zero-length request\n", __func__);
-			__blk_end_request_all(rq, 0);
-			continue;
-		}
+	if (offset + length > rbd_dev->mapping.size) {
+		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
+			 length, rbd_dev->mapping.size);
+		result = -EIO;
+		goto err_rq;
+	}
 
-		spin_unlock_irq(q->queue_lock);
+	img_request = rbd_img_request_create(rbd_dev, offset, length, wr);
+	if (!img_request) {
+		result = -ENOMEM;
+		goto err_rq;
+	}
+	img_request->rq = rq;
 
-		/* Disallow writes to a read-only device */
+	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, rq->bio);
+	if (result)
+		goto err_img_request;
 
-		if (write_request) {
-			result = -EROFS;
-			if (rbd_dev->mapping.read_only)
-				goto end_request;
-			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
-		}
+	result = rbd_img_request_submit(img_request);
+	if (result)
+		goto err_img_request;
 
-		/*
-		 * Quit early if the mapped snapshot no longer
-		 * exists.  It's still possible the snapshot will
-		 * have disappeared by the time our request arrives
-		 * at the osd, but there's no sense in sending it if
-		 * we already know.
-		 */
-		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
-			dout("request for non-existent snapshot");
-			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-			result = -ENXIO;
-			goto end_request;
-		}
+	return;
 
-		result = -EINVAL;
-		if (offset && length > U64_MAX - offset + 1) {
-			rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
-				offset, length);
-			goto end_request;	/* Shouldn't happen */
-		}
+err_img_request:
+	rbd_img_request_put(img_request);
+err_rq:
+	if (result)
+		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
+			 wr ? "write" : "read", length, offset, result);
+	blk_end_request_all(rq, result);
+}
 
-		result = -EIO;
-		if (offset + length > rbd_dev->mapping.size) {
-			rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
-				offset, length, rbd_dev->mapping.size);
-			goto end_request;
-		}
+static void rbd_request_workfn(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev =
+	    container_of(work, struct rbd_device, rq_work);
+	struct request *rq, *next;
+	LIST_HEAD(requests);
 
-		result = -ENOMEM;
-		img_request = rbd_img_request_create(rbd_dev, offset, length,
-							write_request);
-		if (!img_request)
-			goto end_request;
+	spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
+	list_splice_init(&rbd_dev->rq_queue, &requests);
+	spin_unlock_irq(&rbd_dev->lock);
 
-		img_request->rq = rq;
+	list_for_each_entry_safe(rq, next, &requests, queuelist) {
+		list_del_init(&rq->queuelist);
+		rbd_handle_request(rbd_dev, rq);
+	}
+}
 
-		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
-						rq->bio);
-		if (!result)
-			result = rbd_img_request_submit(img_request);
-		if (result)
-			rbd_img_request_put(img_request);
-end_request:
-		spin_lock_irq(q->queue_lock);
-		if (result < 0) {
-			rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
-				write_request ? "write" : "read",
-				length, offset, result);
-
-			__blk_end_request_all(rq, result);
+/*
+ * Called with q->queue_lock held and interrupts disabled, possibly on
+ * the way to schedule().  Do not sleep here!
+ */
+static void rbd_request_fn(struct request_queue *q)
+{
+	struct rbd_device *rbd_dev = q->queuedata;
+	struct request *rq;
+	int queued = 0;
+
+	rbd_assert(rbd_dev);
+
+	while ((rq = blk_fetch_request(q))) {
+		/* Ignore any non-FS requests that filter through. */
+		if (rq->cmd_type != REQ_TYPE_FS) {
+			dout("%s: non-fs request type %d\n", __func__,
+				(int) rq->cmd_type);
+			__blk_end_request_all(rq, 0);
+			continue;
 		}
+
+		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
+		queued++;
 	}
+
+	if (queued)
+		queue_work(rbd_dev->rq_wq, &rbd_dev->rq_work);
 }
 
 /*
@@ -3517,24 +3541,37 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
 	u64 mapping_size;
 	int ret;
 
-	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 	down_write(&rbd_dev->header_rwsem);
 	mapping_size = rbd_dev->mapping.size;
-	if (rbd_dev->image_format == 1)
-		ret = rbd_dev_v1_header_info(rbd_dev);
-	else
-		ret = rbd_dev_v2_header_info(rbd_dev);
 
-	/* If it's a mapped snapshot, validate its EXISTS flag */
+	ret = rbd_dev_header_info(rbd_dev);
+	if (ret)
+		return ret;
+
+	/*
+	 * If there is a parent, see if it has disappeared due to the
+	 * mapped image getting flattened.
+	 */
+	if (rbd_dev->parent) {
+		ret = rbd_dev_v2_parent_info(rbd_dev);
+		if (ret)
+			return ret;
+	}
+
+	if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
+		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
+			rbd_dev->mapping.size = rbd_dev->header.image_size;
+	} else {
+		/* validate mapped snapshot's EXISTS flag */
+		rbd_exists_validate(rbd_dev);
+	}
 
-	rbd_exists_validate(rbd_dev);
 	up_write(&rbd_dev->header_rwsem);
 
-	if (mapping_size != rbd_dev->mapping.size) {
+	if (mapping_size != rbd_dev->mapping.size)
 		rbd_dev_update_size(rbd_dev);
-	}
 
-	return ret;
+	return 0;
 }
 
 static int rbd_init_disk(struct rbd_device *rbd_dev)
@@ -3696,46 +3733,36 @@ static ssize_t rbd_snap_show(struct device *dev,
 }
 
 /*
- * For an rbd v2 image, shows the pool id, image id, and snapshot id
- * for the parent image.  If there is no parent, simply shows
- * "(no parent image)".
+ * For a v2 image, shows the chain of parent images, separated by empty
+ * lines.  For v1 images or if there is no parent, shows "(no parent
+ * image)".
  */
 static ssize_t rbd_parent_show(struct device *dev,
-			     struct device_attribute *attr,
-			     char *buf)
+			       struct device_attribute *attr,
+			       char *buf)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-	struct rbd_spec *spec = rbd_dev->parent_spec;
-	int count;
-	char *bufp = buf;
+	ssize_t count = 0;
 
-	if (!spec)
+	if (!rbd_dev->parent)
 		return sprintf(buf, "(no parent image)\n");
 
-	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
-			(unsigned long long) spec->pool_id, spec->pool_name);
-	if (count < 0)
-		return count;
-	bufp += count;
-
-	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
-			spec->image_name ? spec->image_name : "(unknown)");
-	if (count < 0)
-		return count;
-	bufp += count;
-
-	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
-			(unsigned long long) spec->snap_id, spec->snap_name);
-	if (count < 0)
-		return count;
-	bufp += count;
-
-	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
-	if (count < 0)
-		return count;
-	bufp += count;
+	for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
+		struct rbd_spec *spec = rbd_dev->parent_spec;
+
+		count += sprintf(&buf[count], "%s"
+			    "pool_id %llu\npool_name %s\n"
+			    "image_id %s\nimage_name %s\n"
+			    "snap_id %llu\nsnap_name %s\n"
+			    "overlap %llu\n",
+			    !count ? "" : "\n", /* first? */
+			    spec->pool_id, spec->pool_name,
+			    spec->image_id, spec->image_name ?: "(unknown)",
+			    spec->snap_id, spec->snap_name,
+			    rbd_dev->parent_overlap);
+	}
 
-	return (ssize_t) (bufp - buf);
+	return count;
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -3748,9 +3775,9 @@ static ssize_t rbd_image_refresh(struct device *dev,
 
 	ret = rbd_dev_refresh(rbd_dev);
 	if (ret)
-		rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
+		return ret;
 
-	return ret < 0 ? ret : size;
+	return size;
 }
 
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
@@ -3822,6 +3849,9 @@ static struct rbd_spec *rbd_spec_alloc(void)
 	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
 	if (!spec)
 		return NULL;
+
+	spec->pool_id = CEPH_NOPOOL;
+	spec->snap_id = CEPH_NOSNAP;
 	kref_init(&spec->kref);
 
 	return spec;
@@ -3848,6 +3878,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 		return NULL;
 
 	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->rq_queue);
+	INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
 	rbd_dev->flags = 0;
 	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
@@ -4021,7 +4053,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 		goto out_err;
 	}
 
-	snapid = cpu_to_le64(CEPH_NOSNAP);
+	snapid = cpu_to_le64(rbd_dev->spec->snap_id);
 	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_parent",
 				&snapid, sizeof (snapid),
@@ -4059,7 +4091,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 
 	ret = -EIO;
 	if (pool_id > (u64)U32_MAX) {
-		rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
+		rbd_warn(NULL, "parent pool id too large (%llu > %u)",
 			(unsigned long long)pool_id, U32_MAX);
 		goto out_err;
 	}
@@ -4083,6 +4115,8 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 		parent_spec->snap_id = snap_id;
 		rbd_dev->parent_spec = parent_spec;
 		parent_spec = NULL;	/* rbd_dev now owns this */
+	} else {
+		kfree(image_id);
 	}
 
 	/*
@@ -4110,8 +4144,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 			 * overlap is zero we just pretend there was
 			 * no parent image.
 			 */
-			rbd_warn(rbd_dev, "ignoring parent of "
-						"clone with overlap 0\n");
+			rbd_warn(rbd_dev, "ignoring parent with overlap 0");
 		}
 	}
 out:
@@ -4279,18 +4312,38 @@ static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
 }
 
 /*
- * When an rbd image has a parent image, it is identified by the
- * pool, image, and snapshot ids (not names).  This function fills
- * in the names for those ids.  (It's OK if we can't figure out the
- * name for an image id, but the pool and snapshot ids should always
- * exist and have names.)  All names in an rbd spec are dynamically
- * allocated.
+ * An image being mapped will have everything but the snap id.
+ */
+static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
+{
+	struct rbd_spec *spec = rbd_dev->spec;
+
+	rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
+	rbd_assert(spec->image_id && spec->image_name);
+	rbd_assert(spec->snap_name);
+
+	if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
+		u64 snap_id;
+
+		snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
+		if (snap_id == CEPH_NOSNAP)
+			return -ENOENT;
+
+		spec->snap_id = snap_id;
+	} else {
+		spec->snap_id = CEPH_NOSNAP;
+	}
+
+	return 0;
+}
+
+/*
+ * A parent image will have all ids but none of the names.
  *
- * When an image being mapped (not a parent) is probed, we have the
- * pool name and pool id, image name and image id, and the snapshot
- * name.  The only thing we're missing is the snapshot id.
+ * All names in an rbd spec are dynamically allocated.  It's OK if we
+ * can't figure out the name for an image id.
  */
-static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
+static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_spec *spec = rbd_dev->spec;
@@ -4299,24 +4352,9 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 	const char *snap_name;
 	int ret;
 
-	/*
-	 * An image being mapped will have the pool name (etc.), but
-	 * we need to look up the snapshot id.
-	 */
-	if (spec->pool_name) {
-		if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
-			u64 snap_id;
-
-			snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
-			if (snap_id == CEPH_NOSNAP)
-				return -ENOENT;
-			spec->snap_id = snap_id;
-		} else {
-			spec->snap_id = CEPH_NOSNAP;
-		}
-
-		return 0;
-	}
+	rbd_assert(spec->pool_id != CEPH_NOPOOL);
+	rbd_assert(spec->image_id);
+	rbd_assert(spec->snap_id != CEPH_NOSNAP);
 
 	/* Get the pool name; we have to make our own copy of this */
 
@@ -4335,7 +4373,7 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 	if (!image_name)
 		rbd_warn(rbd_dev, "unable to get image name");
 
-	/* Look up the snapshot name, and make a copy */
+	/* Fetch the snapshot name */
 
 	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
 	if (IS_ERR(snap_name)) {
@@ -4348,10 +4386,10 @@ static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
 	spec->snap_name = snap_name;
 
 	return 0;
+
 out_err:
 	kfree(image_name);
 	kfree(pool_name);
-
 	return ret;
 }
 
@@ -4483,43 +4521,22 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 			return ret;
 	}
 
-	/*
-	 * If the image supports layering, get the parent info.  We
-	 * need to probe the first time regardless.  Thereafter we
-	 * only need to if there's a parent, to see if it has
-	 * disappeared due to the mapped image getting flattened.
-	 */
-	if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
-			(first_time || rbd_dev->parent_spec)) {
-		bool warn;
-
-		ret = rbd_dev_v2_parent_info(rbd_dev);
-		if (ret)
-			return ret;
-
-		/*
-		 * Print a warning if this is the initial probe and
-		 * the image has a parent.  Don't print it if the
-		 * image now being probed is itself a parent.  We
-		 * can tell at this point because we won't know its
-		 * pool name yet (just its pool id).
-		 */
-		warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
-		if (first_time && warn)
-			rbd_warn(rbd_dev, "WARNING: kernel layering "
-					"is EXPERIMENTAL!");
-	}
-
-	if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
-		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
-			rbd_dev->mapping.size = rbd_dev->header.image_size;
-
 	ret = rbd_dev_v2_snap_context(rbd_dev);
 	dout("rbd_dev_v2_snap_context returned %d\n", ret);
 
 	return ret;
 }
 
+static int rbd_dev_header_info(struct rbd_device *rbd_dev)
+{
+	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+
+	if (rbd_dev->image_format == 1)
+		return rbd_dev_v1_header_info(rbd_dev);
+
+	return rbd_dev_v2_header_info(rbd_dev);
+}
+
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
 	struct device *dev;
@@ -5066,12 +5083,17 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 	ret = rbd_dev_mapping_set(rbd_dev);
 	if (ret)
 		goto err_out_disk;
+
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
+	rbd_dev->rq_wq = alloc_workqueue(rbd_dev->disk->disk_name, 0, 0);
+	if (!rbd_dev->rq_wq)
+		goto err_out_mapping;
+
 	ret = rbd_bus_add_dev(rbd_dev);
 	if (ret)
-		goto err_out_mapping;
+		goto err_out_workqueue;
 
 	/* Everything's ready.  Announce the disk to the world. */
 
@@ -5083,6 +5105,9 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 
 	return ret;
 
+err_out_workqueue:
+	destroy_workqueue(rbd_dev->rq_wq);
+	rbd_dev->rq_wq = NULL;
 err_out_mapping:
 	rbd_dev_mapping_clear(rbd_dev);
 err_out_disk:
@@ -5155,8 +5180,6 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 	ret = rbd_dev_image_id(rbd_dev);
 	if (ret)
 		return ret;
-	rbd_assert(rbd_dev->spec->image_id);
-	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 
 	ret = rbd_dev_header_name(rbd_dev);
 	if (ret)
@@ -5168,25 +5191,45 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
 			goto out_header_name;
 	}
 
-	if (rbd_dev->image_format == 1)
-		ret = rbd_dev_v1_header_info(rbd_dev);
-	else
-		ret = rbd_dev_v2_header_info(rbd_dev);
+	ret = rbd_dev_header_info(rbd_dev);
 	if (ret)
 		goto err_out_watch;
 
-	ret = rbd_dev_spec_update(rbd_dev);
+	/*
+	 * If this image is the one being mapped, we have pool name and
+	 * id, image name and id, and snap name - need to fill snap id.
+	 * Otherwise this is a parent image, identified by pool, image
+	 * and snap ids - need to fill in names for those ids.
+	 */
+	if (mapping)
+		ret = rbd_spec_fill_snap_id(rbd_dev);
+	else
+		ret = rbd_spec_fill_names(rbd_dev);
 	if (ret)
 		goto err_out_probe;
 
+	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+		ret = rbd_dev_v2_parent_info(rbd_dev);
+		if (ret)
+			goto err_out_probe;
+
+		/*
+		 * Need to warn users if this image is the one being
+		 * mapped and has a parent.
+		 */
+		if (mapping && rbd_dev->parent_spec)
+			rbd_warn(rbd_dev,
+				 "WARNING: kernel layering is EXPERIMENTAL!");
+	}
+
 	ret = rbd_dev_probe_parent(rbd_dev);
 	if (ret)
 		goto err_out_probe;
 
 	dout("discovered format %u image, header name is %s\n",
 		rbd_dev->image_format, rbd_dev->header_name);
-
 	return 0;
+
 err_out_probe:
 	rbd_dev_unprobe(rbd_dev);
 err_out_watch:
@@ -5199,9 +5242,6 @@ err_out_format:
 	rbd_dev->image_format = 0;
 	kfree(rbd_dev->spec->image_id);
 	rbd_dev->spec->image_id = NULL;
-
-	dout("probe failed, returning %d\n", ret);
-
 	return ret;
 }
 
@@ -5243,7 +5283,7 @@ static ssize_t do_rbd_add(struct bus_type *bus,
 	/* The ceph file layout needs to fit pool id in 32 bits */
 
 	if (spec->pool_id > (u64)U32_MAX) {
-		rbd_warn(NULL, "pool id too large (%llu > %u)\n",
+		rbd_warn(NULL, "pool id too large (%llu > %u)",
 				(unsigned long long)spec->pool_id, U32_MAX);
 		rc = -EIO;
 		goto err_out_client;
@@ -5314,6 +5354,7 @@ static void rbd_dev_device_release(struct device *dev)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
+	destroy_workqueue(rbd_dev->rq_wq);
 	rbd_free_disk(rbd_dev);
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	rbd_dev_mapping_clear(rbd_dev);