summary refs log tree commit diff
path: root/drivers/block
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-02-19 19:20:56 -0600
committerAlex Elder <elder@inktank.com>2013-02-19 19:21:08 -0600
commit4c7a08c83a7842e88838dde16684d6bafffdfaf0 (patch)
treec5fe0057b2ff9f98a64ceb6fa076e75da8225cdd /drivers/block
parent19f949f52599ba7c3f67a5897ac6be14bfcb1200 (diff)
parent903bb32e890237ca43ab847e561e5377cfe0fdb3 (diff)
downloadlinux-4c7a08c83a7842e88838dde16684d6bafffdfaf0.tar.gz
Merge branch 'testing' of github.com:ceph/ceph-client into into linux-3.8-ceph
Diffstat (limited to 'drivers/block')
-rw-r--r--drivers/block/rbd.c1773
1 files changed, 1087 insertions, 686 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 89576a0b3f2e..b0eea3eaee93 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -52,9 +52,12 @@
 #define	SECTOR_SHIFT	9
 #define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
 
-/* It might be useful to have this defined elsewhere too */
+/* It might be useful to have these defined elsewhere */
 
-#define	U64_MAX	((u64) (~0ULL))
+#define	U8_MAX	((u8)	(~0U))
+#define	U16_MAX	((u16)	(~0U))
+#define	U32_MAX	((u32)	(~0U))
+#define	U64_MAX	((u64)	(~0ULL))
 
 #define RBD_DRV_NAME "rbd"
 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
@@ -66,7 +69,6 @@
 			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
 
 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
-#define RBD_MAX_OPT_LEN		1024
 
 #define RBD_SNAP_HEAD_NAME	"-"
 
@@ -93,8 +95,6 @@
 #define DEV_NAME_LEN		32
 #define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
 
-#define RBD_READ_ONLY_DEFAULT		false
-
 /*
  * block device image metadata (in-memory version)
  */
@@ -119,16 +119,33 @@ struct rbd_image_header {
  * An rbd image specification.
  *
  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
- * identify an image.
+ * identify an image.  Each rbd_dev structure includes a pointer to
+ * an rbd_spec structure that encapsulates this identity.
+ *
+ * Each of the id's in an rbd_spec has an associated name.  For a
+ * user-mapped image, the names are supplied and the id's associated
+ * with them are looked up.  For a layered image, a parent image is
+ * defined by the tuple, and the names are looked up.
+ *
+ * An rbd_dev structure contains a parent_spec pointer which is
+ * non-null if the image it represents is a child in a layered
+ * image.  This pointer will refer to the rbd_spec structure used
+ * by the parent rbd_dev for its own identity (i.e., the structure
+ * is shared between the parent and child).
+ *
+ * Since these structures are populated once, during the discovery
+ * phase of image construction, they are effectively immutable so
+ * we make no effort to synchronize access to them.
+ *
+ * Note that code herein does not assume the image name is known (it
+ * could be a null pointer).
  */
 struct rbd_spec {
 	u64		pool_id;
 	char		*pool_name;
 
 	char		*image_id;
-	size_t		image_id_len;
 	char		*image_name;
-	size_t		image_name_len;
 
 	u64		snap_id;
 	char		*snap_name;
@@ -136,10 +153,6 @@ struct rbd_spec {
 	struct kref	kref;
 };
 
-struct rbd_options {
-	bool	read_only;
-};
-
 /*
  * an instance of the client.  multiple devices may share an rbd client.
  */
@@ -149,37 +162,76 @@ struct rbd_client {
 	struct list_head	node;
 };
 
-/*
- * a request completion status
- */
-struct rbd_req_status {
-	int done;
-	int rc;
-	u64 bytes;
+struct rbd_img_request;
+typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
+
+#define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
+
+struct rbd_obj_request;
+typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
+
+enum obj_request_type {
+	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
 };
 
-/*
- * a collection of requests
- */
-struct rbd_req_coll {
-	int			total;
-	int			num_done;
+struct rbd_obj_request {
+	const char		*object_name;
+	u64			offset;		/* object start byte */
+	u64			length;		/* bytes from offset */
+
+	struct rbd_img_request	*img_request;
+	struct list_head	links;		/* img_request->obj_requests */
+	u32			which;		/* posn image request list */
+
+	enum obj_request_type	type;
+	union {
+		struct bio	*bio_list;
+		struct {
+			struct page	**pages;
+			u32		page_count;
+		};
+	};
+
+	struct ceph_osd_request	*osd_req;
+
+	u64			xferred;	/* bytes transferred */
+	u64			version;
+	s32			result;
+	atomic_t		done;
+
+	rbd_obj_callback_t	callback;
+	struct completion	completion;
+
 	struct kref		kref;
-	struct rbd_req_status	status[0];
 };
 
-/*
- * a single io request
- */
-struct rbd_request {
-	struct request		*rq;		/* blk layer request */
-	struct bio		*bio;		/* cloned bio */
-	struct page		**pages;	/* list of used pages */
-	u64			len;
-	int			coll_index;
-	struct rbd_req_coll	*coll;
+struct rbd_img_request {
+	struct request		*rq;
+	struct rbd_device	*rbd_dev;
+	u64			offset;	/* starting image byte offset */
+	u64			length;	/* byte count from offset */
+	bool			write_request;	/* false for read */
+	union {
+		struct ceph_snap_context *snapc;	/* for writes */
+		u64		snap_id;		/* for reads */
+	};
+	spinlock_t		completion_lock;/* protects next_completion */
+	u32			next_completion;
+	rbd_img_callback_t	callback;
+
+	u32			obj_request_count;
+	struct list_head	obj_requests;	/* rbd_obj_request structs */
+
+	struct kref		kref;
 };
 
+#define for_each_obj_request(ireq, oreq) \
+	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
+#define for_each_obj_request_from(ireq, oreq) \
+	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
+#define for_each_obj_request_safe(ireq, oreq, n) \
+	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
+
 struct rbd_snap {
 	struct	device		dev;
 	const char		*name;
@@ -209,16 +261,18 @@ struct rbd_device {
 
 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
-	spinlock_t		lock;		/* queue lock */
+	spinlock_t		lock;		/* queue, flags, open_count */
 
 	struct rbd_image_header	header;
-	bool                    exists;
+	unsigned long		flags;		/* possibly lock protected */
 	struct rbd_spec		*spec;
 
 	char			*header_name;
 
+	struct ceph_file_layout	layout;
+
 	struct ceph_osd_event   *watch_event;
-	struct ceph_osd_request *watch_request;
+	struct rbd_obj_request	*watch_request;
 
 	struct rbd_spec		*parent_spec;
 	u64			parent_overlap;
@@ -235,7 +289,19 @@ struct rbd_device {
 
 	/* sysfs related */
 	struct device		dev;
-	unsigned long		open_count;
+	unsigned long		open_count;	/* protected by lock */
+};
+
+/*
+ * Flag bits for rbd_dev->flags.  If atomicity is required,
+ * rbd_dev->lock is used to protect access.
+ *
+ * Currently, only the "removing" flag (which is coupled with the
+ * "open_count" field) requires atomic access.
+ */
+enum rbd_dev_flags {
+	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
+	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
 };
 
 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
@@ -277,6 +343,33 @@ static struct device rbd_root_dev = {
 	.release =      rbd_root_dev_release,
 };
 
+static __printf(2, 3)
+void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
+{
+	struct va_format vaf;
+	va_list args;
+
+	va_start(args, fmt);
+	vaf.fmt = fmt;
+	vaf.va = &args;
+
+	if (!rbd_dev)
+		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
+	else if (rbd_dev->disk)
+		printk(KERN_WARNING "%s: %s: %pV\n",
+			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
+	else if (rbd_dev->spec && rbd_dev->spec->image_name)
+		printk(KERN_WARNING "%s: image %s: %pV\n",
+			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
+	else if (rbd_dev->spec && rbd_dev->spec->image_id)
+		printk(KERN_WARNING "%s: id %s: %pV\n",
+			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
+	else	/* punt */
+		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
+			RBD_DRV_NAME, rbd_dev, &vaf);
+	va_end(args);
+}
+
 #ifdef RBD_DEBUG
 #define rbd_assert(expr)						\
 		if (unlikely(!(expr))) {				\
@@ -296,14 +389,23 @@ static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+	bool removing = false;
 
 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
 		return -EROFS;
 
+	spin_lock_irq(&rbd_dev->lock);
+	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
+		removing = true;
+	else
+		rbd_dev->open_count++;
+	spin_unlock_irq(&rbd_dev->lock);
+	if (removing)
+		return -ENOENT;
+
 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 	(void) get_device(&rbd_dev->dev);
 	set_device_ro(bdev, rbd_dev->mapping.read_only);
-	rbd_dev->open_count++;
 	mutex_unlock(&ctl_mutex);
 
 	return 0;
@@ -312,10 +414,14 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
 static int rbd_release(struct gendisk *disk, fmode_t mode)
 {
 	struct rbd_device *rbd_dev = disk->private_data;
+	unsigned long open_count_before;
+
+	spin_lock_irq(&rbd_dev->lock);
+	open_count_before = rbd_dev->open_count--;
+	spin_unlock_irq(&rbd_dev->lock);
+	rbd_assert(open_count_before > 0);
 
 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-	rbd_assert(rbd_dev->open_count > 0);
-	rbd_dev->open_count--;
 	put_device(&rbd_dev->dev);
 	mutex_unlock(&ctl_mutex);
 
@@ -426,6 +532,12 @@ static match_table_t rbd_opts_tokens = {
 	{-1, NULL}
 };
 
+struct rbd_options {
+	bool	read_only;
+};
+
+#define RBD_READ_ONLY_DEFAULT	false
+
 static int parse_rbd_opts_token(char *c, void *private)
 {
 	struct rbd_options *rbd_opts = private;
@@ -512,18 +624,6 @@ static void rbd_put_client(struct rbd_client *rbdc)
 		kref_put(&rbdc->kref, rbd_client_release);
 }
 
-/*
- * Destroy requests collection
- */
-static void rbd_coll_release(struct kref *kref)
-{
-	struct rbd_req_coll *coll =
-		container_of(kref, struct rbd_req_coll, kref);
-
-	dout("rbd_coll_release %p\n", coll);
-	kfree(coll);
-}
-
 static bool rbd_image_format_valid(u32 image_format)
 {
 	return image_format == 1 || image_format == 2;
@@ -707,7 +807,8 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
 			goto done;
 		rbd_dev->mapping.read_only = true;
 	}
-	rbd_dev->exists = true;
+	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+
 done:
 	return ret;
 }
@@ -724,7 +825,7 @@ static void rbd_header_free(struct rbd_image_header *header)
 	header->snapc = NULL;
 }
 
-static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
+static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 {
 	char *name;
 	u64 segment;
@@ -767,23 +868,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev,
 	return length;
 }
 
-static int rbd_get_num_segments(struct rbd_image_header *header,
-				u64 ofs, u64 len)
-{
-	u64 start_seg;
-	u64 end_seg;
-
-	if (!len)
-		return 0;
-	if (len - 1 > U64_MAX - ofs)
-		return -ERANGE;
-
-	start_seg = ofs >> header->obj_order;
-	end_seg = (ofs + len - 1) >> header->obj_order;
-
-	return end_seg - start_seg + 1;
-}
-
 /*
  * returns the size of an object in the image
  */
@@ -949,8 +1033,10 @@ static struct bio *bio_chain_clone_range(struct bio **bio_src,
 		unsigned int bi_size;
 		struct bio *bio;
 
-		if (!bi)
+		if (!bi) {
+			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
 			goto out_err;	/* EINVAL; ran out of bio's */
+		}
 		bi_size = min_t(unsigned int, bi->bi_size - off, len);
 		bio = bio_clone_range(bi, off, bi_size, gfpmask);
 		if (!bio)
@@ -976,399 +1062,665 @@ out_err:
 	return NULL;
 }
 
-/*
- * helpers for osd request op vectors.
- */
-static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
-					int opcode, u32 payload_len)
+static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
 {
-	struct ceph_osd_req_op *ops;
+	kref_get(&obj_request->kref);
+}
 
-	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
-	if (!ops)
+static void rbd_obj_request_destroy(struct kref *kref);
+static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
+{
+	rbd_assert(obj_request != NULL);
+	kref_put(&obj_request->kref, rbd_obj_request_destroy);
+}
+
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+	kref_get(&img_request->kref);
+}
+
+static void rbd_img_request_destroy(struct kref *kref);
+static void rbd_img_request_put(struct rbd_img_request *img_request)
+{
+	rbd_assert(img_request != NULL);
+	kref_put(&img_request->kref, rbd_img_request_destroy);
+}
+
+static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
+					struct rbd_obj_request *obj_request)
+{
+	rbd_assert(obj_request->img_request == NULL);
+
+	rbd_obj_request_get(obj_request);
+	obj_request->img_request = img_request;
+	obj_request->which = img_request->obj_request_count;
+	rbd_assert(obj_request->which != BAD_WHICH);
+	img_request->obj_request_count++;
+	list_add_tail(&obj_request->links, &img_request->obj_requests);
+}
+
+static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
+					struct rbd_obj_request *obj_request)
+{
+	rbd_assert(obj_request->which != BAD_WHICH);
+
+	list_del(&obj_request->links);
+	rbd_assert(img_request->obj_request_count > 0);
+	img_request->obj_request_count--;
+	rbd_assert(obj_request->which == img_request->obj_request_count);
+	obj_request->which = BAD_WHICH;
+	rbd_assert(obj_request->img_request == img_request);
+	obj_request->img_request = NULL;
+	obj_request->callback = NULL;
+	rbd_obj_request_put(obj_request);
+}
+
+static bool obj_request_type_valid(enum obj_request_type type)
+{
+	switch (type) {
+	case OBJ_REQUEST_NODATA:
+	case OBJ_REQUEST_BIO:
+	case OBJ_REQUEST_PAGES:
+		return true;
+	default:
+		return false;
+	}
+}
+
+struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
+{
+	struct ceph_osd_req_op *op;
+	va_list args;
+	size_t size;
+
+	op = kzalloc(sizeof (*op), GFP_NOIO);
+	if (!op)
 		return NULL;
+	op->op = opcode;
+	va_start(args, opcode);
+	switch (opcode) {
+	case CEPH_OSD_OP_READ:
+	case CEPH_OSD_OP_WRITE:
+		/* rbd_osd_req_op_create(READ, offset, length) */
+		/* rbd_osd_req_op_create(WRITE, offset, length) */
+		op->extent.offset = va_arg(args, u64);
+		op->extent.length = va_arg(args, u64);
+		if (opcode == CEPH_OSD_OP_WRITE)
+			op->payload_len = op->extent.length;
+		break;
+	case CEPH_OSD_OP_STAT:
+		break;
+	case CEPH_OSD_OP_CALL:
+		/* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
+		op->cls.class_name = va_arg(args, char *);
+		size = strlen(op->cls.class_name);
+		rbd_assert(size <= (size_t) U8_MAX);
+		op->cls.class_len = size;
+		op->payload_len = size;
+
+		op->cls.method_name = va_arg(args, char *);
+		size = strlen(op->cls.method_name);
+		rbd_assert(size <= (size_t) U8_MAX);
+		op->cls.method_len = size;
+		op->payload_len += size;
+
+		op->cls.argc = 0;
+		op->cls.indata = va_arg(args, void *);
+		size = va_arg(args, size_t);
+		rbd_assert(size <= (size_t) U32_MAX);
+		op->cls.indata_len = (u32) size;
+		op->payload_len += size;
+		break;
+	case CEPH_OSD_OP_NOTIFY_ACK:
+	case CEPH_OSD_OP_WATCH:
+		/* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
+		/* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
+		op->watch.cookie = va_arg(args, u64);
+		op->watch.ver = va_arg(args, u64);
+		op->watch.ver = cpu_to_le64(op->watch.ver);
+		if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
+			op->watch.flag = (u8) 1;
+		break;
+	default:
+		rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
+		kfree(op);
+		op = NULL;
+		break;
+	}
+	va_end(args);
 
-	ops[0].op = opcode;
+	return op;
+}
 
-	/*
-	 * op extent offset and length will be set later on
-	 * in calc_raw_layout()
-	 */
-	ops[0].payload_len = payload_len;
+static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
+{
+	kfree(op);
+}
 
-	return ops;
+static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
+				struct rbd_obj_request *obj_request)
+{
+	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
 }
 
-static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
+static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
-	kfree(ops);
+	if (img_request->callback)
+		img_request->callback(img_request);
+	else
+		rbd_img_request_put(img_request);
 }
 
-static void rbd_coll_end_req_index(struct request *rq,
-				   struct rbd_req_coll *coll,
-				   int index,
-				   int ret, u64 len)
+/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
+
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
 {
-	struct request_queue *q;
-	int min, max, i;
+	return wait_for_completion_interruptible(&obj_request->completion);
+}
 
-	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
-	     coll, index, ret, (unsigned long long) len);
+static void obj_request_done_init(struct rbd_obj_request *obj_request)
+{
+	atomic_set(&obj_request->done, 0);
+	smp_wmb();
+}
 
-	if (!rq)
-		return;
+static void obj_request_done_set(struct rbd_obj_request *obj_request)
+{
+	atomic_set(&obj_request->done, 1);
+	smp_wmb();
+}
 
-	if (!coll) {
-		blk_end_request(rq, ret, len);
-		return;
-	}
+static bool obj_request_done_test(struct rbd_obj_request *obj_request)
+{
+	smp_rmb();
+	return atomic_read(&obj_request->done) != 0;
+}
+
+static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request,
+				struct ceph_osd_op *op)
+{
+	obj_request_done_set(obj_request);
+}
 
-	q = rq->q;
-
-	spin_lock_irq(q->queue_lock);
-	coll->status[index].done = 1;
-	coll->status[index].rc = ret;
-	coll->status[index].bytes = len;
-	max = min = coll->num_done;
-	while (max < coll->total && coll->status[max].done)
-		max++;
-
-	for (i = min; i<max; i++) {
-		__blk_end_request(rq, coll->status[i].rc,
-				  coll->status[i].bytes);
-		coll->num_done++;
-		kref_put(&coll->kref, rbd_coll_release);
+static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
+{
+	if (obj_request->callback)
+		obj_request->callback(obj_request);
+	else
+		complete_all(&obj_request->completion);
+}
+
+static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
+				struct ceph_osd_op *op)
+{
+	u64 xferred;
+
+	/*
+	 * We support a 64-bit length, but ultimately it has to be
+	 * passed to blk_end_request(), which takes an unsigned int.
+	 */
+	xferred = le64_to_cpu(op->extent.length);
+	rbd_assert(xferred < (u64) UINT_MAX);
+	if (obj_request->result == (s32) -ENOENT) {
+		zero_bio_chain(obj_request->bio_list, 0);
+		obj_request->result = 0;
+	} else if (xferred < obj_request->length && !obj_request->result) {
+		zero_bio_chain(obj_request->bio_list, xferred);
+		xferred = obj_request->length;
 	}
-	spin_unlock_irq(q->queue_lock);
+	obj_request->xferred = xferred;
+	obj_request_done_set(obj_request);
 }
 
-static void rbd_coll_end_req(struct rbd_request *req,
-			     int ret, u64 len)
+static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
+				struct ceph_osd_op *op)
 {
-	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
+	obj_request->xferred = le64_to_cpu(op->extent.length);
+	obj_request_done_set(obj_request);
 }
 
 /*
- * Send ceph osd request
+ * For a simple stat call there's nothing to do.  We'll do more if
+ * this is part of a write sequence for a layered image.
  */
-static int rbd_do_request(struct request *rq,
-			  struct rbd_device *rbd_dev,
-			  struct ceph_snap_context *snapc,
-			  u64 snapid,
-			  const char *object_name, u64 ofs, u64 len,
-			  struct bio *bio,
-			  struct page **pages,
-			  int num_pages,
-			  int flags,
-			  struct ceph_osd_req_op *ops,
-			  struct rbd_req_coll *coll,
-			  int coll_index,
-			  void (*rbd_cb)(struct ceph_osd_request *req,
-					 struct ceph_msg *msg),
-			  struct ceph_osd_request **linger_req,
-			  u64 *ver)
-{
-	struct ceph_osd_request *req;
-	struct ceph_file_layout *layout;
-	int ret;
-	u64 bno;
-	struct timespec mtime = CURRENT_TIME;
-	struct rbd_request *req_data;
-	struct ceph_osd_request_head *reqhead;
-	struct ceph_osd_client *osdc;
+static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request,
+				struct ceph_osd_op *op)
+{
+	obj_request_done_set(obj_request);
+}
 
-	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
-	if (!req_data) {
-		if (coll)
-			rbd_coll_end_req_index(rq, coll, coll_index,
-					       -ENOMEM, len);
-		return -ENOMEM;
+static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
+				struct ceph_msg *msg)
+{
+	struct rbd_obj_request *obj_request = osd_req->r_priv;
+	struct ceph_osd_reply_head *reply_head;
+	struct ceph_osd_op *op;
+	u32 num_ops;
+	u16 opcode;
+
+	rbd_assert(osd_req == obj_request->osd_req);
+	rbd_assert(!!obj_request->img_request ^
+				(obj_request->which == BAD_WHICH));
+
+	obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
+	reply_head = msg->front.iov_base;
+	obj_request->result = (s32) le32_to_cpu(reply_head->result);
+	obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
+
+	num_ops = le32_to_cpu(reply_head->num_ops);
+	WARN_ON(num_ops != 1);	/* For now */
+
+	op = &reply_head->ops[0];
+	opcode = le16_to_cpu(op->op);
+	switch (opcode) {
+	case CEPH_OSD_OP_READ:
+		rbd_osd_read_callback(obj_request, op);
+		break;
+	case CEPH_OSD_OP_WRITE:
+		rbd_osd_write_callback(obj_request, op);
+		break;
+	case CEPH_OSD_OP_STAT:
+		rbd_osd_stat_callback(obj_request, op);
+		break;
+	case CEPH_OSD_OP_CALL:
+	case CEPH_OSD_OP_NOTIFY_ACK:
+	case CEPH_OSD_OP_WATCH:
+		rbd_osd_trivial_callback(obj_request, op);
+		break;
+	default:
+		rbd_warn(NULL, "%s: unsupported op %hu\n",
+			obj_request->object_name, (unsigned short) opcode);
+		break;
 	}
 
-	if (coll) {
-		req_data->coll = coll;
-		req_data->coll_index = coll_index;
+	if (obj_request_done_test(obj_request))
+		rbd_obj_request_complete(obj_request);
+}
+
+static struct ceph_osd_request *rbd_osd_req_create(
+					struct rbd_device *rbd_dev,
+					bool write_request,
+					struct rbd_obj_request *obj_request,
+					struct ceph_osd_req_op *op)
+{
+	struct rbd_img_request *img_request = obj_request->img_request;
+	struct ceph_snap_context *snapc = NULL;
+	struct ceph_osd_client *osdc;
+	struct ceph_osd_request *osd_req;
+	struct timespec now;
+	struct timespec *mtime;
+	u64 snap_id = CEPH_NOSNAP;
+	u64 offset = obj_request->offset;
+	u64 length = obj_request->length;
+
+	if (img_request) {
+		rbd_assert(img_request->write_request == write_request);
+		if (img_request->write_request)
+			snapc = img_request->snapc;
+		else
+			snap_id = img_request->snap_id;
 	}
 
-	dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
-		object_name, (unsigned long long) ofs,
-		(unsigned long long) len, coll, coll_index);
+	/* Allocate and initialize the request, for the single op */
 
 	osdc = &rbd_dev->rbd_client->client->osdc;
-	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
-					false, GFP_NOIO, pages, bio);
-	if (!req) {
-		ret = -ENOMEM;
-		goto done_pages;
+	osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
+	if (!osd_req)
+		return NULL;	/* ENOMEM */
+
+	rbd_assert(obj_request_type_valid(obj_request->type));
+	switch (obj_request->type) {
+	case OBJ_REQUEST_NODATA:
+		break;		/* Nothing to do */
+	case OBJ_REQUEST_BIO:
+		rbd_assert(obj_request->bio_list != NULL);
+		osd_req->r_bio = obj_request->bio_list;
+		break;
+	case OBJ_REQUEST_PAGES:
+		osd_req->r_pages = obj_request->pages;
+		osd_req->r_num_pages = obj_request->page_count;
+		osd_req->r_page_alignment = offset & ~PAGE_MASK;
+		break;
 	}
 
-	req->r_callback = rbd_cb;
+	if (write_request) {
+		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+		now = CURRENT_TIME;
+		mtime = &now;
+	} else {
+		osd_req->r_flags = CEPH_OSD_FLAG_READ;
+		mtime = NULL;	/* not needed for reads */
+		offset = 0;	/* These are not used... */
+		length = 0;	/* ...for osd read requests */
+	}
 
-	req_data->rq = rq;
-	req_data->bio = bio;
-	req_data->pages = pages;
-	req_data->len = len;
+	osd_req->r_callback = rbd_osd_req_callback;
+	osd_req->r_priv = obj_request;
 
-	req->r_priv = req_data;
+	osd_req->r_oid_len = strlen(obj_request->object_name);
+	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
+	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
 
-	reqhead = req->r_request->front.iov_base;
-	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
+	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
 
-	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
-	req->r_oid_len = strlen(req->r_oid);
+	/* osd_req will get its own reference to snapc (if non-null) */
 
-	layout = &req->r_file_layout;
-	memset(layout, 0, sizeof(*layout));
-	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-	layout->fl_stripe_count = cpu_to_le32(1);
-	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-	layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
-	ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
-				   req, ops);
-	rbd_assert(ret == 0);
+	ceph_osdc_build_request(osd_req, offset, length, 1, op,
+				snapc, snap_id, mtime);
 
-	ceph_osdc_build_request(req, ofs, &len,
-				ops,
-				snapc,
-				&mtime,
-				req->r_oid, req->r_oid_len);
+	return osd_req;
+}
 
-	if (linger_req) {
-		ceph_osdc_set_request_linger(osdc, req);
-		*linger_req = req;
-	}
+static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
+{
+	ceph_osdc_put_request(osd_req);
+}
 
-	ret = ceph_osdc_start_request(osdc, req, false);
-	if (ret < 0)
-		goto done_err;
-
-	if (!rbd_cb) {
-		ret = ceph_osdc_wait_request(osdc, req);
-		if (ver)
-			*ver = le64_to_cpu(req->r_reassert_version.version);
-		dout("reassert_ver=%llu\n",
-			(unsigned long long)
-				le64_to_cpu(req->r_reassert_version.version));
-		ceph_osdc_put_request(req);
+/* object_name is assumed to be a non-null pointer and NUL-terminated */
+
+static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
+						u64 offset, u64 length,
+						enum obj_request_type type)
+{
+	struct rbd_obj_request *obj_request;
+	size_t size;
+	char *name;
+
+	rbd_assert(obj_request_type_valid(type));
+
+	size = strlen(object_name) + 1;
+	obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
+	if (!obj_request)
+		return NULL;
+
+	name = (char *)(obj_request + 1);
+	obj_request->object_name = memcpy(name, object_name, size);
+	obj_request->offset = offset;
+	obj_request->length = length;
+	obj_request->which = BAD_WHICH;
+	obj_request->type = type;
+	INIT_LIST_HEAD(&obj_request->links);
+	obj_request_done_init(obj_request);
+	init_completion(&obj_request->completion);
+	kref_init(&obj_request->kref);
+
+	return obj_request;
+}
+
+static void rbd_obj_request_destroy(struct kref *kref)
+{
+	struct rbd_obj_request *obj_request;
+
+	obj_request = container_of(kref, struct rbd_obj_request, kref);
+
+	rbd_assert(obj_request->img_request == NULL);
+	rbd_assert(obj_request->which == BAD_WHICH);
+
+	if (obj_request->osd_req)
+		rbd_osd_req_destroy(obj_request->osd_req);
+
+	rbd_assert(obj_request_type_valid(obj_request->type));
+	switch (obj_request->type) {
+	case OBJ_REQUEST_NODATA:
+		break;		/* Nothing to do */
+	case OBJ_REQUEST_BIO:
+		if (obj_request->bio_list)
+			bio_chain_put(obj_request->bio_list);
+		break;
+	case OBJ_REQUEST_PAGES:
+		if (obj_request->pages)
+			ceph_release_page_vector(obj_request->pages,
+						obj_request->page_count);
+		break;
 	}
-	return ret;
 
-done_err:
-	bio_chain_put(req_data->bio);
-	ceph_osdc_put_request(req);
-done_pages:
-	rbd_coll_end_req(req_data, ret, len);
-	kfree(req_data);
-	return ret;
+	kfree(obj_request);
 }
 
 /*
- * Ceph osd op callback
+ * Caller is responsible for filling in the list of object requests
+ * that comprises the image request, and the Linux request pointer
+ * (if there is one).
  */
-static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
+					u64 offset, u64 length,
+					bool write_request)
 {
-	struct rbd_request *req_data = req->r_priv;
-	struct ceph_osd_reply_head *replyhead;
-	struct ceph_osd_op *op;
-	__s32 rc;
-	u64 bytes;
-	int read_op;
-
-	/* parse reply */
-	replyhead = msg->front.iov_base;
-	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
-	op = (void *)(replyhead + 1);
-	rc = le32_to_cpu(replyhead->result);
-	bytes = le64_to_cpu(op->extent.length);
-	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
-
-	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
-		(unsigned long long) bytes, read_op, (int) rc);
-
-	if (rc == -ENOENT && read_op) {
-		zero_bio_chain(req_data->bio, 0);
-		rc = 0;
-	} else if (rc == 0 && read_op && bytes < req_data->len) {
-		zero_bio_chain(req_data->bio, bytes);
-		bytes = req_data->len;
-	}
+	struct rbd_img_request *img_request;
+	struct ceph_snap_context *snapc = NULL;
 
-	rbd_coll_end_req(req_data, rc, bytes);
+	img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
+	if (!img_request)
+		return NULL;
 
-	if (req_data->bio)
-		bio_chain_put(req_data->bio);
+	if (write_request) {
+		down_read(&rbd_dev->header_rwsem);
+		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+		up_read(&rbd_dev->header_rwsem);
+		if (WARN_ON(!snapc)) {
+			kfree(img_request);
+			return NULL;	/* Shouldn't happen */
+		}
+	}
 
-	ceph_osdc_put_request(req);
-	kfree(req_data);
+	img_request->rq = NULL;
+	img_request->rbd_dev = rbd_dev;
+	img_request->offset = offset;
+	img_request->length = length;
+	img_request->write_request = write_request;
+	if (write_request)
+		img_request->snapc = snapc;
+	else
+		img_request->snap_id = rbd_dev->spec->snap_id;
+	spin_lock_init(&img_request->completion_lock);
+	img_request->next_completion = 0;
+	img_request->callback = NULL;
+	img_request->obj_request_count = 0;
+	INIT_LIST_HEAD(&img_request->obj_requests);
+	kref_init(&img_request->kref);
+
+	rbd_img_request_get(img_request);	/* Avoid a warning */
+	rbd_img_request_put(img_request);	/* TEMPORARY */
+
+	return img_request;
 }
 
-static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+static void rbd_img_request_destroy(struct kref *kref)
 {
-	ceph_osdc_put_request(req);
+	struct rbd_img_request *img_request;
+	struct rbd_obj_request *obj_request;
+	struct rbd_obj_request *next_obj_request;
+
+	img_request = container_of(kref, struct rbd_img_request, kref);
+
+	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+		rbd_img_obj_request_del(img_request, obj_request);
+	rbd_assert(img_request->obj_request_count == 0);
+
+	if (img_request->write_request)
+		ceph_put_snap_context(img_request->snapc);
+
+	kfree(img_request);
 }
 
-/*
- * Do a synchronous ceph osd operation
- */
-static int rbd_req_sync_op(struct rbd_device *rbd_dev,
-			   struct ceph_snap_context *snapc,
-			   u64 snapid,
-			   int flags,
-			   struct ceph_osd_req_op *ops,
-			   const char *object_name,
-			   u64 ofs, u64 inbound_size,
-			   char *inbound,
-			   struct ceph_osd_request **linger_req,
-			   u64 *ver)
+static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
+					struct bio *bio_list)
 {
-	int ret;
-	struct page **pages;
-	int num_pages;
-
-	rbd_assert(ops != NULL);
+	struct rbd_device *rbd_dev = img_request->rbd_dev;
+	struct rbd_obj_request *obj_request = NULL;
+	struct rbd_obj_request *next_obj_request;
+	unsigned int bio_offset;
+	u64 image_offset;
+	u64 resid;
+	u16 opcode;
+
+	opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
+					      : CEPH_OSD_OP_READ;
+	bio_offset = 0;
+	image_offset = img_request->offset;
+	rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
+	resid = img_request->length;
+	while (resid) {
+		const char *object_name;
+		unsigned int clone_size;
+		struct ceph_osd_req_op *op;
+		u64 offset;
+		u64 length;
+
+		object_name = rbd_segment_name(rbd_dev, image_offset);
+		if (!object_name)
+			goto out_unwind;
+		offset = rbd_segment_offset(rbd_dev, image_offset);
+		length = rbd_segment_length(rbd_dev, image_offset, resid);
+		obj_request = rbd_obj_request_create(object_name,
+						offset, length,
+						OBJ_REQUEST_BIO);
+		kfree(object_name);	/* object request has its own copy */
+		if (!obj_request)
+			goto out_unwind;
+
+		rbd_assert(length <= (u64) UINT_MAX);
+		clone_size = (unsigned int) length;
+		obj_request->bio_list = bio_chain_clone_range(&bio_list,
+						&bio_offset, clone_size,
+						GFP_ATOMIC);
+		if (!obj_request->bio_list)
+			goto out_partial;
 
-	num_pages = calc_pages_for(ofs, inbound_size);
-	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-	if (IS_ERR(pages))
-		return PTR_ERR(pages);
+		/*
+		 * Build up the op to use in building the osd
+		 * request.  Note that the contents of the op are
+		 * copied by rbd_osd_req_create().
+		 */
+		op = rbd_osd_req_op_create(opcode, offset, length);
+		if (!op)
+			goto out_partial;
+		obj_request->osd_req = rbd_osd_req_create(rbd_dev,
+						img_request->write_request,
+						obj_request, op);
+		rbd_osd_req_op_destroy(op);
+		if (!obj_request->osd_req)
+			goto out_partial;
+		/* status and version are initially zero-filled */
+
+		rbd_img_obj_request_add(img_request, obj_request);
+
+		image_offset += length;
+		resid -= length;
+	}
 
-	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
-			  object_name, ofs, inbound_size, NULL,
-			  pages, num_pages,
-			  flags,
-			  ops,
-			  NULL, 0,
-			  NULL,
-			  linger_req, ver);
-	if (ret < 0)
-		goto done;
+	return 0;
 
-	if ((flags & CEPH_OSD_FLAG_READ) && inbound)
-		ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
+out_partial:
+	rbd_obj_request_put(obj_request);
+out_unwind:
+	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
+		rbd_obj_request_put(obj_request);
 
-done:
-	ceph_release_page_vector(pages, num_pages);
-	return ret;
+	return -ENOMEM;
 }
 
-/*
- * Do an asynchronous ceph osd operation
- */
-static int rbd_do_op(struct request *rq,
-		     struct rbd_device *rbd_dev,
-		     struct ceph_snap_context *snapc,
-		     u64 ofs, u64 len,
-		     struct bio *bio,
-		     struct rbd_req_coll *coll,
-		     int coll_index)
-{
-	char *seg_name;
-	u64 seg_ofs;
-	u64 seg_len;
-	int ret;
-	struct ceph_osd_req_op *ops;
-	u32 payload_len;
-	int opcode;
-	int flags;
-	u64 snapid;
-
-	seg_name = rbd_segment_name(rbd_dev, ofs);
-	if (!seg_name)
-		return -ENOMEM;
-	seg_len = rbd_segment_length(rbd_dev, ofs, len);
-	seg_ofs = rbd_segment_offset(rbd_dev, ofs);
-
-	if (rq_data_dir(rq) == WRITE) {
-		opcode = CEPH_OSD_OP_WRITE;
-		flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
-		snapid = CEPH_NOSNAP;
-		payload_len = seg_len;
-	} else {
-		opcode = CEPH_OSD_OP_READ;
-		flags = CEPH_OSD_FLAG_READ;
-		snapc = NULL;
-		snapid = rbd_dev->spec->snap_id;
-		payload_len = 0;
-	}
+static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
+{
+	struct rbd_img_request *img_request;
+	u32 which = obj_request->which;
+	bool more = true;
+
+	img_request = obj_request->img_request;
+	rbd_assert(img_request != NULL);
+	rbd_assert(img_request->rq != NULL);
+	rbd_assert(which != BAD_WHICH);
+	rbd_assert(which < img_request->obj_request_count);
+	rbd_assert(which >= img_request->next_completion);
+
+	spin_lock_irq(&img_request->completion_lock);
+	if (which != img_request->next_completion)
+		goto out;
 
-	ret = -ENOMEM;
-	ops = rbd_create_rw_ops(1, opcode, payload_len);
-	if (!ops)
-		goto done;
+	for_each_obj_request_from(img_request, obj_request) {
+		unsigned int xferred;
+		int result;
 
-	/* we've taken care of segment sizes earlier when we
-	   cloned the bios. We should never have a segment
-	   truncated at this point */
-	rbd_assert(seg_len == len);
-
-	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
-			     seg_name, seg_ofs, seg_len,
-			     bio,
-			     NULL, 0,
-			     flags,
-			     ops,
-			     coll, coll_index,
-			     rbd_req_cb, 0, NULL);
-
-	rbd_destroy_ops(ops);
-done:
-	kfree(seg_name);
-	return ret;
+		rbd_assert(more);
+		rbd_assert(which < img_request->obj_request_count);
+
+		if (!obj_request_done_test(obj_request))
+			break;
+
+		rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
+		xferred = (unsigned int) obj_request->xferred;
+		result = (int) obj_request->result;
+		if (result)
+			rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
+				img_request->write_request ? "write" : "read",
+				result, xferred);
+
+		more = blk_end_request(img_request->rq, result, xferred);
+		which++;
+	}
+	rbd_assert(more ^ (which == img_request->obj_request_count));
+	img_request->next_completion = which;
+out:
+	spin_unlock_irq(&img_request->completion_lock);
+
+	if (!more)
+		rbd_img_request_complete(img_request);
 }
 
-/*
- * Request sync osd read
- */
-static int rbd_req_sync_read(struct rbd_device *rbd_dev,
-			  u64 snapid,
-			  const char *object_name,
-			  u64 ofs, u64 len,
-			  char *buf,
-			  u64 *ver)
-{
-	struct ceph_osd_req_op *ops;
-	int ret;
+static int rbd_img_request_submit(struct rbd_img_request *img_request)
+{
+	struct rbd_device *rbd_dev = img_request->rbd_dev;
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_obj_request *obj_request;
 
-	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
-	if (!ops)
-		return -ENOMEM;
+	for_each_obj_request(img_request, obj_request) {
+		int ret;
 
-	ret = rbd_req_sync_op(rbd_dev, NULL,
-			       snapid,
-			       CEPH_OSD_FLAG_READ,
-			       ops, object_name, ofs, len, buf, NULL, ver);
-	rbd_destroy_ops(ops);
+		obj_request->callback = rbd_img_obj_callback;
+		ret = rbd_obj_request_submit(osdc, obj_request);
+		if (ret)
+			return ret;
+		/*
+		 * The image request has its own reference to each
+		 * of its object requests, so we can safely drop the
+		 * initial one here.
+		 */
+		rbd_obj_request_put(obj_request);
+	}
 
-	return ret;
+	return 0;
 }
 
-/*
- * Request sync osd watch
- */
-static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
-				   u64 ver,
-				   u64 notify_id)
+static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
+				   u64 ver, u64 notify_id)
 {
-	struct ceph_osd_req_op *ops;
+	struct rbd_obj_request *obj_request;
+	struct ceph_osd_req_op *op;
+	struct ceph_osd_client *osdc;
 	int ret;
 
-	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
-	if (!ops)
+	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+							OBJ_REQUEST_NODATA);
+	if (!obj_request)
 		return -ENOMEM;
 
-	ops[0].watch.ver = cpu_to_le64(ver);
-	ops[0].watch.cookie = notify_id;
-	ops[0].watch.flag = 0;
+	ret = -ENOMEM;
+	op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
+	if (!op)
+		goto out;
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+						obj_request, op);
+	rbd_osd_req_op_destroy(op);
+	if (!obj_request->osd_req)
+		goto out;
 
-	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
-			  rbd_dev->header_name, 0, 0, NULL,
-			  NULL, 0,
-			  CEPH_OSD_FLAG_READ,
-			  ops,
-			  NULL, 0,
-			  rbd_simple_req_cb, 0, NULL);
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	obj_request->callback = rbd_obj_request_put;
+	ret = rbd_obj_request_submit(osdc, obj_request);
+out:
+	if (ret)
+		rbd_obj_request_put(obj_request);
 
-	rbd_destroy_ops(ops);
 	return ret;
 }
 
@@ -1386,90 +1738,98 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 		(unsigned int) opcode);
 	rc = rbd_dev_refresh(rbd_dev, &hver);
 	if (rc)
-		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
-			   " update snaps: %d\n", rbd_dev->major, rc);
+		rbd_warn(rbd_dev, "got notification but failed to "
+			   " update snaps: %d\n", rc);
 
-	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
+	rbd_obj_notify_ack(rbd_dev, hver, notify_id);
 }
 
 /*
- * Request sync osd watch
+ * Request sync osd watch/unwatch.  The value of "start" determines
+ * whether a watch request is being initiated or torn down.
  */
-static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
 {
-	struct ceph_osd_req_op *ops;
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_obj_request *obj_request;
+	struct ceph_osd_req_op *op;
 	int ret;
 
-	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
-	if (!ops)
-		return -ENOMEM;
+	rbd_assert(start ^ !!rbd_dev->watch_event);
+	rbd_assert(start ^ !!rbd_dev->watch_request);
 
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
-				     (void *)rbd_dev, &rbd_dev->watch_event);
-	if (ret < 0)
-		goto fail;
+	if (start) {
+		ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+						&rbd_dev->watch_event);
+		if (ret < 0)
+			return ret;
+		rbd_assert(rbd_dev->watch_event != NULL);
+	}
 
-	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
-	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
-	ops[0].watch.flag = 1;
+	ret = -ENOMEM;
+	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+							OBJ_REQUEST_NODATA);
+	if (!obj_request)
+		goto out_cancel;
+
+	op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
+				rbd_dev->watch_event->cookie,
+				rbd_dev->header.obj_version, start);
+	if (!op)
+		goto out_cancel;
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
+							obj_request, op);
+	rbd_osd_req_op_destroy(op);
+	if (!obj_request->osd_req)
+		goto out_cancel;
+
+	if (start)
+		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+	else
+		ceph_osdc_unregister_linger_request(osdc,
+					rbd_dev->watch_request->osd_req);
+	ret = rbd_obj_request_submit(osdc, obj_request);
+	if (ret)
+		goto out_cancel;
+	ret = rbd_obj_request_wait(obj_request);
+	if (ret)
+		goto out_cancel;
+	ret = obj_request->result;
+	if (ret)
+		goto out_cancel;
 
-	ret = rbd_req_sync_op(rbd_dev, NULL,
-			      CEPH_NOSNAP,
-			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			      ops,
-			      rbd_dev->header_name,
-			      0, 0, NULL,
-			      &rbd_dev->watch_request, NULL);
+	/*
+	 * A watch request is set to linger, so the underlying osd
+	 * request won't go away until we unregister it.  We retain
+	 * a pointer to the object request during that time (in
+	 * rbd_dev->watch_request), so we'll keep a reference to
+	 * it.  We'll drop that reference (below) after we've
+	 * unregistered it.
+	 */
+	if (start) {
+		rbd_dev->watch_request = obj_request;
 
-	if (ret < 0)
-		goto fail_event;
+		return 0;
+	}
 
-	rbd_destroy_ops(ops);
-	return 0;
+	/* We have successfully torn down the watch request */
 
-fail_event:
+	rbd_obj_request_put(rbd_dev->watch_request);
+	rbd_dev->watch_request = NULL;
+out_cancel:
+	/* Cancel the event if we're tearing down, or on error */
 	ceph_osdc_cancel_event(rbd_dev->watch_event);
 	rbd_dev->watch_event = NULL;
-fail:
-	rbd_destroy_ops(ops);
-	return ret;
-}
-
-/*
- * Request sync osd unwatch
- */
-static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
-{
-	struct ceph_osd_req_op *ops;
-	int ret;
+	if (obj_request)
+		rbd_obj_request_put(obj_request);
 
-	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
-	if (!ops)
-		return -ENOMEM;
-
-	ops[0].watch.ver = 0;
-	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
-	ops[0].watch.flag = 0;
-
-	ret = rbd_req_sync_op(rbd_dev, NULL,
-			      CEPH_NOSNAP,
-			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			      ops,
-			      rbd_dev->header_name,
-			      0, 0, NULL, NULL, NULL);
-
-
-	rbd_destroy_ops(ops);
-	ceph_osdc_cancel_event(rbd_dev->watch_event);
-	rbd_dev->watch_event = NULL;
 	return ret;
 }
 
 /*
  * Synchronous osd object method call
  */
-static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
+static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
 			     const char *object_name,
 			     const char *class_name,
 			     const char *method_name,
@@ -1477,169 +1837,143 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
 			     size_t outbound_size,
 			     char *inbound,
 			     size_t inbound_size,
-			     int flags,
-			     u64 *ver)
+			     u64 *version)
 {
-	struct ceph_osd_req_op *ops;
-	int class_name_len = strlen(class_name);
-	int method_name_len = strlen(method_name);
-	int payload_size;
+	struct rbd_obj_request *obj_request;
+	struct ceph_osd_client *osdc;
+	struct ceph_osd_req_op *op;
+	struct page **pages;
+	u32 page_count;
 	int ret;
 
 	/*
-	 * Any input parameters required by the method we're calling
-	 * will be sent along with the class and method names as
-	 * part of the message payload.  That data and its size are
-	 * supplied via the indata and indata_len fields (named from
-	 * the perspective of the server side) in the OSD request
-	 * operation.
+	 * Method calls are ultimately read operations but they
+	 * don't involve object data (so no offset or length).
+	 * The result should placed into the inbound buffer
+	 * provided.  They also supply outbound data--parameters for
+	 * the object method.  Currently if this is present it will
+	 * be a snapshot id.
 	 */
-	payload_size = class_name_len + method_name_len + outbound_size;
-	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
-	if (!ops)
-		return -ENOMEM;
+	page_count = (u32) calc_pages_for(0, inbound_size);
+	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+	if (IS_ERR(pages))
+		return PTR_ERR(pages);
 
-	ops[0].cls.class_name = class_name;
-	ops[0].cls.class_len = (__u8) class_name_len;
-	ops[0].cls.method_name = method_name;
-	ops[0].cls.method_len = (__u8) method_name_len;
-	ops[0].cls.argc = 0;
-	ops[0].cls.indata = outbound;
-	ops[0].cls.indata_len = outbound_size;
+	ret = -ENOMEM;
+	obj_request = rbd_obj_request_create(object_name, 0, 0,
+							OBJ_REQUEST_PAGES);
+	if (!obj_request)
+		goto out;
 
-	ret = rbd_req_sync_op(rbd_dev, NULL,
-			       CEPH_NOSNAP,
-			       flags, ops,
-			       object_name, 0, inbound_size, inbound,
-			       NULL, ver);
+	obj_request->pages = pages;
+	obj_request->page_count = page_count;
 
-	rbd_destroy_ops(ops);
+	op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
+					method_name, outbound, outbound_size);
+	if (!op)
+		goto out;
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+						obj_request, op);
+	rbd_osd_req_op_destroy(op);
+	if (!obj_request->osd_req)
+		goto out;
 
-	dout("cls_exec returned %d\n", ret);
-	return ret;
-}
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	ret = rbd_obj_request_submit(osdc, obj_request);
+	if (ret)
+		goto out;
+	ret = rbd_obj_request_wait(obj_request);
+	if (ret)
+		goto out;
 
-static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
-{
-	struct rbd_req_coll *coll =
-			kzalloc(sizeof(struct rbd_req_coll) +
-			        sizeof(struct rbd_req_status) * num_reqs,
-				GFP_ATOMIC);
+	ret = obj_request->result;
+	if (ret < 0)
+		goto out;
+	ret = 0;
+	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
+	if (version)
+		*version = obj_request->version;
+out:
+	if (obj_request)
+		rbd_obj_request_put(obj_request);
+	else
+		ceph_release_page_vector(pages, page_count);
 
-	if (!coll)
-		return NULL;
-	coll->total = num_reqs;
-	kref_init(&coll->kref);
-	return coll;
+	return ret;
 }
 
-/*
- * block device queue callback
- */
-static void rbd_rq_fn(struct request_queue *q)
+static void rbd_request_fn(struct request_queue *q)
 {
 	struct rbd_device *rbd_dev = q->queuedata;
+	bool read_only = rbd_dev->mapping.read_only;
 	struct request *rq;
+	int result;
 
 	while ((rq = blk_fetch_request(q))) {
-		struct bio *bio;
-		bool do_write;
-		unsigned int size;
-		u64 ofs;
-		int num_segs, cur_seg = 0;
-		struct rbd_req_coll *coll;
-		struct ceph_snap_context *snapc;
-		unsigned int bio_offset;
-
-		dout("fetched request\n");
-
-		/* filter out block requests we don't understand */
-		if ((rq->cmd_type != REQ_TYPE_FS)) {
-			__blk_end_request_all(rq, 0);
-			continue;
-		}
+		bool write_request = rq_data_dir(rq) == WRITE;
+		struct rbd_img_request *img_request;
+		u64 offset;
+		u64 length;
+
+		/* Ignore any non-FS requests that filter through. */
 
-		/* deduce our operation (read, write) */
-		do_write = (rq_data_dir(rq) == WRITE);
-		if (do_write && rbd_dev->mapping.read_only) {
-			__blk_end_request_all(rq, -EROFS);
+		if (rq->cmd_type != REQ_TYPE_FS) {
+			__blk_end_request_all(rq, 0);
 			continue;
 		}
 
 		spin_unlock_irq(q->queue_lock);
 
-		down_read(&rbd_dev->header_rwsem);
+		/* Disallow writes to a read-only device */
 
-		if (!rbd_dev->exists) {
-			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
-			up_read(&rbd_dev->header_rwsem);
-			dout("request for non-existent snapshot");
-			spin_lock_irq(q->queue_lock);
-			__blk_end_request_all(rq, -ENXIO);
-			continue;
+		if (write_request) {
+			result = -EROFS;
+			if (read_only)
+				goto end_request;
+			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
 		}
 
-		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
-
-		up_read(&rbd_dev->header_rwsem);
-
-		size = blk_rq_bytes(rq);
-		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-		bio = rq->bio;
-
-		dout("%s 0x%x bytes at 0x%llx\n",
-		     do_write ? "write" : "read",
-		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
-
-		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
-		if (num_segs <= 0) {
-			spin_lock_irq(q->queue_lock);
-			__blk_end_request_all(rq, num_segs);
-			ceph_put_snap_context(snapc);
-			continue;
-		}
-		coll = rbd_alloc_coll(num_segs);
-		if (!coll) {
-			spin_lock_irq(q->queue_lock);
-			__blk_end_request_all(rq, -ENOMEM);
-			ceph_put_snap_context(snapc);
-			continue;
+		/*
+		 * Quit early if the mapped snapshot no longer
+		 * exists.  It's still possible the snapshot will
+		 * have disappeared by the time our request arrives
+		 * at the osd, but there's no sense in sending it if
+		 * we already know.
+		 */
+		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
+			dout("request for non-existent snapshot");
+			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
+			result = -ENXIO;
+			goto end_request;
 		}
 
-		bio_offset = 0;
-		do {
-			u64 limit = rbd_segment_length(rbd_dev, ofs, size);
-			unsigned int chain_size;
-			struct bio *bio_chain;
+		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
+		length = (u64) blk_rq_bytes(rq);
 
-			BUG_ON(limit > (u64) UINT_MAX);
-			chain_size = (unsigned int) limit;
-			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
+		result = -EINVAL;
+		if (WARN_ON(offset && length > U64_MAX - offset + 1))
+			goto end_request;	/* Shouldn't happen */
 
-			kref_get(&coll->kref);
+		result = -ENOMEM;
+		img_request = rbd_img_request_create(rbd_dev, offset, length,
+							write_request);
+		if (!img_request)
+			goto end_request;
 
-			/* Pass a cloned bio chain via an osd request */
-
-			bio_chain = bio_chain_clone_range(&bio,
-						&bio_offset, chain_size,
-						GFP_ATOMIC);
-			if (bio_chain)
-				(void) rbd_do_op(rq, rbd_dev, snapc,
-						ofs, chain_size,
-						bio_chain, coll, cur_seg);
-			else
-				rbd_coll_end_req_index(rq, coll, cur_seg,
-						       -ENOMEM, chain_size);
-			size -= chain_size;
-			ofs += chain_size;
-
-			cur_seg++;
-		} while (size > 0);
-		kref_put(&coll->kref, rbd_coll_release);
+		img_request->rq = rq;
 
+		result = rbd_img_request_fill_bio(img_request, rq->bio);
+		if (!result)
+			result = rbd_img_request_submit(img_request);
+		if (result)
+			rbd_img_request_put(img_request);
+end_request:
 		spin_lock_irq(q->queue_lock);
-
-		ceph_put_snap_context(snapc);
+		if (result < 0) {
+			rbd_warn(rbd_dev, "obj_request %s result %d\n",
+				write_request ? "write" : "read", result);
+			__blk_end_request_all(rq, result);
+		}
 	}
 }
 
@@ -1703,6 +2037,71 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
 	put_disk(disk);
 }
 
+static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
+				const char *object_name,
+				u64 offset, u64 length,
+				char *buf, u64 *version)
+
+{
+	struct ceph_osd_req_op *op;
+	struct rbd_obj_request *obj_request;
+	struct ceph_osd_client *osdc;
+	struct page **pages = NULL;
+	u32 page_count;
+	size_t size;
+	int ret;
+
+	page_count = (u32) calc_pages_for(offset, length);
+	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+	if (IS_ERR(pages))
+		ret = PTR_ERR(pages);
+
+	ret = -ENOMEM;
+	obj_request = rbd_obj_request_create(object_name, offset, length,
+							OBJ_REQUEST_PAGES);
+	if (!obj_request)
+		goto out;
+
+	obj_request->pages = pages;
+	obj_request->page_count = page_count;
+
+	op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
+	if (!op)
+		goto out;
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+						obj_request, op);
+	rbd_osd_req_op_destroy(op);
+	if (!obj_request->osd_req)
+		goto out;
+
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	ret = rbd_obj_request_submit(osdc, obj_request);
+	if (ret)
+		goto out;
+	ret = rbd_obj_request_wait(obj_request);
+	if (ret)
+		goto out;
+
+	ret = obj_request->result;
+	if (ret < 0)
+		goto out;
+
+	rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
+	size = (size_t) obj_request->xferred;
+	ceph_copy_from_page_vector(pages, buf, 0, size);
+	rbd_assert(size <= (size_t) INT_MAX);
+	ret = (int) size;
+	if (version)
+		*version = obj_request->version;
+out:
+	if (obj_request)
+		rbd_obj_request_put(obj_request);
+	else
+		ceph_release_page_vector(pages, page_count);
+
+	return ret;
+}
+
 /*
  * Read the complete header for the given rbd device.
  *
@@ -1741,24 +2140,20 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
 		if (!ondisk)
 			return ERR_PTR(-ENOMEM);
 
-		ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
-				       rbd_dev->header_name,
+		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
 				       0, size,
 				       (char *) ondisk, version);
-
 		if (ret < 0)
 			goto out_err;
 		if (WARN_ON((size_t) ret < size)) {
 			ret = -ENXIO;
-			pr_warning("short header read for image %s"
-					" (want %zd got %d)\n",
-				rbd_dev->spec->image_name, size, ret);
+			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
+				size, ret);
 			goto out_err;
 		}
 		if (!rbd_dev_ondisk_valid(ondisk)) {
 			ret = -ENXIO;
-			pr_warning("invalid header for image %s\n",
-				rbd_dev->spec->image_name);
+			rbd_warn(rbd_dev, "invalid header");
 			goto out_err;
 		}
 
@@ -1895,8 +2290,7 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 	disk->fops = &rbd_bd_ops;
 	disk->private_data = rbd_dev;
 
-	/* init rq */
-	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
+	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
 	if (!q)
 		goto out_disk;
 
@@ -2243,6 +2637,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 		return NULL;
 
 	spin_lock_init(&rbd_dev->lock);
+	rbd_dev->flags = 0;
 	INIT_LIST_HEAD(&rbd_dev->node);
 	INIT_LIST_HEAD(&rbd_dev->snaps);
 	init_rwsem(&rbd_dev->header_rwsem);
@@ -2250,6 +2645,13 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 	rbd_dev->spec = spec;
 	rbd_dev->rbd_client = rbdc;
 
+	/* Initialize the layout used for all rbd requests */
+
+	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
+	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
+	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
+
 	return rbd_dev;
 }
 
@@ -2360,12 +2762,11 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
 		__le64 size;
 	} __attribute__ ((packed)) size_buf = { 0 };
 
-	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_size",
 				(char *) &snapid, sizeof (snapid),
-				(char *) &size_buf, sizeof (size_buf),
-				CEPH_OSD_FLAG_READ, NULL);
-	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+				(char *) &size_buf, sizeof (size_buf), NULL);
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		return ret;
 
@@ -2396,15 +2797,13 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
 	if (!reply_buf)
 		return -ENOMEM;
 
-	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_object_prefix",
 				NULL, 0,
-				reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
-				CEPH_OSD_FLAG_READ, NULL);
-	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+				reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
-	ret = 0;    /* rbd_req_sync_exec() can return positive */
 
 	p = reply_buf;
 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2435,12 +2834,12 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
 	u64 incompat;
 	int ret;
 
-	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_features",
 				(char *) &snapid, sizeof (snapid),
 				(char *) &features_buf, sizeof (features_buf),
-				CEPH_OSD_FLAG_READ, NULL);
-	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+				NULL);
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		return ret;
 
@@ -2474,7 +2873,6 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 	void *end;
 	char *image_id;
 	u64 overlap;
-	size_t len = 0;
 	int ret;
 
 	parent_spec = rbd_spec_alloc();
@@ -2492,12 +2890,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 	}
 
 	snapid = cpu_to_le64(CEPH_NOSNAP);
-	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_parent",
 				(char *) &snapid, sizeof (snapid),
-				(char *) reply_buf, size,
-				CEPH_OSD_FLAG_READ, NULL);
-	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+				(char *) reply_buf, size, NULL);
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out_err;
 
@@ -2508,13 +2905,18 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
 	if (parent_spec->pool_id == CEPH_NOPOOL)
 		goto out;	/* No parent?  No problem. */
 
-	image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+	/* The ceph file layout needs to fit pool id in 32 bits */
+
+	ret = -EIO;
+	if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
+		goto out;
+
+	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
 	if (IS_ERR(image_id)) {
 		ret = PTR_ERR(image_id);
 		goto out_err;
 	}
 	parent_spec->image_id = image_id;
-	parent_spec->image_id_len = len;
 	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
 	ceph_decode_64_safe(&p, end, overlap, out_err);
 
@@ -2544,26 +2946,25 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
 
 	rbd_assert(!rbd_dev->spec->image_name);
 
-	image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
+	len = strlen(rbd_dev->spec->image_id);
+	image_id_size = sizeof (__le32) + len;
 	image_id = kmalloc(image_id_size, GFP_KERNEL);
 	if (!image_id)
 		return NULL;
 
 	p = image_id;
 	end = (char *) image_id + image_id_size;
-	ceph_encode_string(&p, end, rbd_dev->spec->image_id,
-				(u32) rbd_dev->spec->image_id_len);
+	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
 
 	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
 	reply_buf = kmalloc(size, GFP_KERNEL);
 	if (!reply_buf)
 		goto out;
 
-	ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+	ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
 				"rbd", "dir_get_name",
 				image_id, image_id_size,
-				(char *) reply_buf, size,
-				CEPH_OSD_FLAG_READ, NULL);
+				(char *) reply_buf, size, NULL);
 	if (ret < 0)
 		goto out;
 	p = reply_buf;
@@ -2602,8 +3003,11 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
 
 	osdc = &rbd_dev->rbd_client->client->osdc;
 	name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
-	if (!name)
-		return -EIO;	/* pool id too large (>= 2^31) */
+	if (!name) {
+		rbd_warn(rbd_dev, "there is no pool with id %llu",
+			rbd_dev->spec->pool_id);	/* Really a BUG() */
+		return -EIO;
+	}
 
 	rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
 	if (!rbd_dev->spec->pool_name)
@@ -2612,19 +3016,17 @@ static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
 	/* Fetch the image name; tolerate failure here */
 
 	name = rbd_dev_image_name(rbd_dev);
-	if (name) {
-		rbd_dev->spec->image_name_len = strlen(name);
+	if (name)
 		rbd_dev->spec->image_name = (char *) name;
-	} else {
-		pr_warning(RBD_DRV_NAME "%d "
-			"unable to get image name for image id %s\n",
-			rbd_dev->major, rbd_dev->spec->image_id);
-	}
+	else
+		rbd_warn(rbd_dev, "unable to get image name");
 
 	/* Look up the snapshot name. */
 
 	name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
 	if (!name) {
+		rbd_warn(rbd_dev, "no snapshot with id %llu",
+			rbd_dev->spec->snap_id);	/* Really a BUG() */
 		ret = -EIO;
 		goto out_err;
 	}
@@ -2665,12 +3067,11 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
 	if (!reply_buf)
 		return -ENOMEM;
 
-	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_snapcontext",
 				NULL, 0,
-				reply_buf, size,
-				CEPH_OSD_FLAG_READ, ver);
-	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+				reply_buf, size, ver);
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
 
@@ -2735,12 +3136,11 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
 		return ERR_PTR(-ENOMEM);
 
 	snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
-	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
 				"rbd", "get_snapshot_name",
 				(char *) &snap_id, sizeof (snap_id),
-				reply_buf, size,
-				CEPH_OSD_FLAG_READ, NULL);
-	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+				reply_buf, size, NULL);
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
 
@@ -2766,7 +3166,7 @@ out:
 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
 		u64 *snap_size, u64 *snap_features)
 {
-	__le64 snap_id;
+	u64 snap_id;
 	u8 order;
 	int ret;
 
@@ -2865,10 +3265,17 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 		if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
 			struct list_head *next = links->next;
 
-			/* Existing snapshot not in the new snap context */
-
+			/*
+			 * A previously-existing snapshot is not in
+			 * the new snap context.
+			 *
+			 * If the now missing snapshot is the one the
+			 * image is mapped to, clear its exists flag
+			 * so we can avoid sending any more requests
+			 * to it.
+			 */
 			if (rbd_dev->spec->snap_id == snap->id)
-				rbd_dev->exists = false;
+				clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 			rbd_remove_snap_dev(snap);
 			dout("%ssnap id %llu has been removed\n",
 				rbd_dev->spec->snap_id == snap->id ?
@@ -2983,22 +3390,6 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
 	device_unregister(&rbd_dev->dev);
 }
 
-static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
-{
-	int ret, rc;
-
-	do {
-		ret = rbd_req_sync_watch(rbd_dev);
-		if (ret == -ERANGE) {
-			rc = rbd_dev_refresh(rbd_dev, NULL);
-			if (rc < 0)
-				return rc;
-		}
-	} while (ret == -ERANGE);
-
-	return ret;
-}
-
 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
 
 /*
@@ -3138,11 +3529,9 @@ static inline char *dup_token(const char **buf, size_t *lenp)
 	size_t len;
 
 	len = next_token(buf);
-	dup = kmalloc(len + 1, GFP_KERNEL);
+	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
 	if (!dup)
 		return NULL;
-
-	memcpy(dup, *buf, len);
 	*(dup + len) = '\0';
 	*buf += len;
 
@@ -3210,8 +3599,10 @@ static int rbd_add_parse_args(const char *buf,
 	/* The first four tokens are required */
 
 	len = next_token(&buf);
-	if (!len)
-		return -EINVAL;	/* Missing monitor address(es) */
+	if (!len) {
+		rbd_warn(NULL, "no monitor address(es) provided");
+		return -EINVAL;
+	}
 	mon_addrs = buf;
 	mon_addrs_size = len + 1;
 	buf += len;
@@ -3220,8 +3611,10 @@ static int rbd_add_parse_args(const char *buf,
 	options = dup_token(&buf, NULL);
 	if (!options)
 		return -ENOMEM;
-	if (!*options)
-		goto out_err;	/* Missing options */
+	if (!*options) {
+		rbd_warn(NULL, "no options provided");
+		goto out_err;
+	}
 
 	spec = rbd_spec_alloc();
 	if (!spec)
@@ -3230,14 +3623,18 @@ static int rbd_add_parse_args(const char *buf,
 	spec->pool_name = dup_token(&buf, NULL);
 	if (!spec->pool_name)
 		goto out_mem;
-	if (!*spec->pool_name)
-		goto out_err;	/* Missing pool name */
+	if (!*spec->pool_name) {
+		rbd_warn(NULL, "no pool name provided");
+		goto out_err;
+	}
 
-	spec->image_name = dup_token(&buf, &spec->image_name_len);
+	spec->image_name = dup_token(&buf, NULL);
 	if (!spec->image_name)
 		goto out_mem;
-	if (!*spec->image_name)
-		goto out_err;	/* Missing image name */
+	if (!*spec->image_name) {
+		rbd_warn(NULL, "no image name provided");
+		goto out_err;
+	}
 
 	/*
 	 * Snapshot name is optional; default is to use "-"
@@ -3251,10 +3648,9 @@ static int rbd_add_parse_args(const char *buf,
 		ret = -ENAMETOOLONG;
 		goto out_err;
 	}
-	spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+	spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
 	if (!spec->snap_name)
 		goto out_mem;
-	memcpy(spec->snap_name, buf, len);
 	*(spec->snap_name + len) = '\0';
 
 	/* Initialize all rbd options to the defaults */
@@ -3323,7 +3719,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 	 * First, see if the format 2 image id file exists, and if
 	 * so, get the image's persistent id from it.
 	 */
-	size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
+	size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
 	object_name = kmalloc(size, GFP_NOIO);
 	if (!object_name)
 		return -ENOMEM;
@@ -3339,21 +3735,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
 		goto out;
 	}
 
-	ret = rbd_req_sync_exec(rbd_dev, object_name,
+	ret = rbd_obj_method_sync(rbd_dev, object_name,
 				"rbd", "get_id",
 				NULL, 0,
-				response, RBD_IMAGE_ID_LEN_MAX,
-				CEPH_OSD_FLAG_READ, NULL);
-	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+				response, RBD_IMAGE_ID_LEN_MAX, NULL);
+	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
-	ret = 0;    /* rbd_req_sync_exec() can return positive */
 
 	p = response;
 	rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
 						p + RBD_IMAGE_ID_LEN_MAX,
-						&rbd_dev->spec->image_id_len,
-						GFP_NOIO);
+						NULL, GFP_NOIO);
 	if (IS_ERR(rbd_dev->spec->image_id)) {
 		ret = PTR_ERR(rbd_dev->spec->image_id);
 		rbd_dev->spec->image_id = NULL;
@@ -3377,11 +3770,10 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
 	rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
 	if (!rbd_dev->spec->image_id)
 		return -ENOMEM;
-	rbd_dev->spec->image_id_len = 0;
 
 	/* Record the header object name for this rbd image. */
 
-	size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
+	size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
 	if (!rbd_dev->header_name) {
 		ret = -ENOMEM;
@@ -3427,7 +3819,7 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
 	 * Image id was filled in by the caller.  Record the header
 	 * object name for this rbd image.
 	 */
-	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
+	size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
 	if (!rbd_dev->header_name)
 		return -ENOMEM;
@@ -3542,7 +3934,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
 	if (ret)
 		goto err_out_bus;
 
-	ret = rbd_init_watch_dev(rbd_dev);
+	ret = rbd_dev_header_watch_sync(rbd_dev, 1);
 	if (ret)
 		goto err_out_bus;
 
@@ -3638,6 +4030,13 @@ static ssize_t rbd_add(struct bus_type *bus,
 		goto err_out_client;
 	spec->pool_id = (u64) rc;
 
+	/* The ceph file layout needs to fit pool id in 32 bits */
+
+	if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
+		rc = -EIO;
+		goto err_out_client;
+	}
+
 	rbd_dev = rbd_dev_create(rbdc, spec);
 	if (!rbd_dev)
 		goto err_out_client;
@@ -3691,15 +4090,8 @@ static void rbd_dev_release(struct device *dev)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	if (rbd_dev->watch_request) {
-		struct ceph_client *client = rbd_dev->rbd_client->client;
-
-		ceph_osdc_unregister_linger_request(&client->osdc,
-						    rbd_dev->watch_request);
-	}
 	if (rbd_dev->watch_event)
-		rbd_req_sync_unwatch(rbd_dev);
-
+		rbd_dev_header_watch_sync(rbd_dev, 0);
 
 	/* clean up and free blkdev */
 	rbd_free_disk(rbd_dev);
@@ -3743,10 +4135,14 @@ static ssize_t rbd_remove(struct bus_type *bus,
 		goto done;
 	}
 
-	if (rbd_dev->open_count) {
+	spin_lock_irq(&rbd_dev->lock);
+	if (rbd_dev->open_count)
 		ret = -EBUSY;
+	else
+		set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
+	spin_unlock_irq(&rbd_dev->lock);
+	if (ret < 0)
 		goto done;
-	}
 
 	rbd_remove_all_snaps(rbd_dev);
 	rbd_bus_del_dev(rbd_dev);
@@ -3786,6 +4182,11 @@ int __init rbd_init(void)
 {
 	int rc;
 
+	if (!libceph_compatible(NULL)) {
+		rbd_warn(NULL, "libceph incompatibility (quitting)");
+
+		return -EINVAL;
+	}
 	rc = rbd_sysfs_init();
 	if (rc)
 		return rc;