summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2016-04-28 16:07:24 +0200
committerIlya Dryomov <idryomov@gmail.com>2016-05-26 00:36:28 +0200
commitfe5da05e979830b43b115d8a18ead521d507c783 (patch)
tree6afb8cacfd090d9d3b46a0a9976930957bb2e6cf /net
parent85e084feb47349d62989efe1713a8723af95f4ea (diff)
downloadlinux-fe5da05e979830b43b115d8a18ead521d507c783.tar.gz
libceph: redo callbacks and factor out MOSDOpReply decoding
If you specify ACK | ONDISK and set ->r_unsafe_callback, both
->r_callback and ->r_unsafe_callback(true) are called on ack.  This is
very confusing.  Redo this so that only one of them is called:

    ->r_unsafe_callback(true), on ack
    ->r_unsafe_callback(false), on commit

or

    ->r_callback, on ack|commit

Decode everything in decode_MOSDOpReply() to reduce clutter.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/osd_client.c362
1 files changed, 209 insertions, 153 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 2a30c0bb3045..baf2844b00d6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1693,6 +1693,14 @@ static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
 	return 0;
 }
 
+static void __complete_request(struct ceph_osd_request *req)
+{
+	if (req->r_callback)
+		req->r_callback(req);
+	else
+		complete_all(&req->r_completion);
+}
+
 /*
  * Timeout callback, called every N seconds when 1 or more osd
  * requests has been active for more than N seconds.  When this
@@ -1875,107 +1883,76 @@ e_inval:
 	goto out;
 }
 
-static void complete_request(struct ceph_osd_request *req)
-{
-	complete_all(&req->r_safe_completion);  /* fsync waiter */
-}
+struct MOSDOpReply {
+	struct ceph_pg pgid;
+	u64 flags;
+	int result;
+	u32 epoch;
+	int num_ops;
+	u32 outdata_len[CEPH_OSD_MAX_OPS];
+	s32 rval[CEPH_OSD_MAX_OPS];
+	int retry_attempt;
+	struct ceph_eversion replay_version;
+	u64 user_version;
+	struct ceph_request_redirect redirect;
+};
 
-/*
- * handle osd op reply.  either call the callback if it is specified,
- * or do the completion to wake up the waiting thread.
- */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
 {
-	void *p, *end;
-	struct ceph_osd_request *req;
-	struct ceph_request_redirect redir;
-	u64 tid;
-	int object_len;
-	unsigned int numops;
-	int payload_len, flags;
-	s32 result;
-	s32 retry_attempt;
-	struct ceph_pg pg;
-	int err;
-	u32 reassert_epoch;
-	u64 reassert_version;
-	u32 osdmap_epoch;
-	int already_completed;
-	u32 bytes;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front.iov_len;
+	u16 version = le16_to_cpu(msg->hdr.version);
+	struct ceph_eversion bad_replay_version;
 	u8 decode_redir;
-	unsigned int i;
-
-	tid = le64_to_cpu(msg->hdr.tid);
-	dout("handle_reply %p tid %llu\n", msg, tid);
-
-	p = msg->front.iov_base;
-	end = p + msg->front.iov_len;
+	u32 len;
+	int ret;
+	int i;
 
-	ceph_decode_need(&p, end, 4, bad);
-	object_len = ceph_decode_32(&p);
-	ceph_decode_need(&p, end, object_len, bad);
-	p += object_len;
+	ceph_decode_32_safe(&p, end, len, e_inval);
+	ceph_decode_need(&p, end, len, e_inval);
+	p += len; /* skip oid */
 
-	err = ceph_decode_pgid(&p, end, &pg);
-	if (err)
-		goto bad;
+	ret = ceph_decode_pgid(&p, end, &m->pgid);
+	if (ret)
+		return ret;
 
-	ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
-	flags = ceph_decode_64(&p);
-	result = ceph_decode_32(&p);
-	reassert_epoch = ceph_decode_32(&p);
-	reassert_version = ceph_decode_64(&p);
-	osdmap_epoch = ceph_decode_32(&p);
+	ceph_decode_64_safe(&p, end, m->flags, e_inval);
+	ceph_decode_32_safe(&p, end, m->result, e_inval);
+	ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
+	memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
+	p += sizeof(bad_replay_version);
+	ceph_decode_32_safe(&p, end, m->epoch, e_inval);
 
-	/* lookup */
-	down_read(&osdc->map_sem);
-	mutex_lock(&osdc->request_mutex);
-	req = lookup_request(&osdc->requests, tid);
-	if (req == NULL) {
-		dout("handle_reply tid %llu dne\n", tid);
-		goto bad_mutex;
-	}
-	ceph_osdc_get_request(req);
+	ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
+	if (m->num_ops > ARRAY_SIZE(m->outdata_len))
+		goto e_inval;
 
-	dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
-	     req, result);
-
-	ceph_decode_need(&p, end, 4, bad_put);
-	numops = ceph_decode_32(&p);
-	if (numops > CEPH_OSD_MAX_OPS)
-		goto bad_put;
-	if (numops != req->r_num_ops)
-		goto bad_put;
-	payload_len = 0;
-	ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
-	for (i = 0; i < numops; i++) {
+	ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
+			 e_inval);
+	for (i = 0; i < m->num_ops; i++) {
 		struct ceph_osd_op *op = p;
-		int len;
 
-		len = le32_to_cpu(op->payload_len);
-		req->r_ops[i].outdata_len = len;
-		dout(" op %d has %d bytes\n", i, len);
-		payload_len += len;
+		m->outdata_len[i] = le32_to_cpu(op->payload_len);
 		p += sizeof(*op);
 	}
-	bytes = le32_to_cpu(msg->hdr.data_len);
-	if (payload_len != bytes) {
-		pr_warn("sum of op payload lens %d != data_len %d\n",
-			payload_len, bytes);
-		goto bad_put;
-	}
 
-	ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
-	retry_attempt = ceph_decode_32(&p);
-	for (i = 0; i < numops; i++)
-		req->r_ops[i].rval = ceph_decode_32(&p);
+	ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
+	for (i = 0; i < m->num_ops; i++)
+		ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
 
-	if (le16_to_cpu(msg->hdr.version) >= 6) {
-		p += 8 + 4; /* skip replay_version */
-		p += 8; /* skip user_version */
+	if (version >= 5) {
+		ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
+		memcpy(&m->replay_version, p, sizeof(m->replay_version));
+		p += sizeof(m->replay_version);
+		ceph_decode_64_safe(&p, end, m->user_version, e_inval);
+	} else {
+		m->replay_version = bad_replay_version; /* struct */
+		m->user_version = le64_to_cpu(m->replay_version.version);
+	}
 
-		if (le16_to_cpu(msg->hdr.version) >= 7)
-			ceph_decode_8_safe(&p, end, decode_redir, bad_put);
+	if (version >= 6) {
+		if (version >= 7)
+			ceph_decode_8_safe(&p, end, decode_redir, e_inval);
 		else
 			decode_redir = 1;
 	} else {
@@ -1983,19 +1960,96 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 	}
 
 	if (decode_redir) {
-		err = ceph_redirect_decode(&p, end, &redir);
-		if (err)
-			goto bad_put;
+		ret = ceph_redirect_decode(&p, end, &m->redirect);
+		if (ret)
+			return ret;
 	} else {
-		redir.oloc.pool = -1;
+		ceph_oloc_init(&m->redirect.oloc);
 	}
 
-	if (!ceph_oloc_empty(&redir.oloc)) {
-		dout("redirect pool %lld\n", redir.oloc.pool);
+	return 0;
+
+e_inval:
+	return -EINVAL;
+}
+
+/*
+ * We are done with @req if
+ *   - @m is a safe reply, or
+ *   - @m is an unsafe reply and we didn't want a safe one
+ */
+static bool done_request(const struct ceph_osd_request *req,
+			 const struct MOSDOpReply *m)
+{
+	return (m->result < 0 ||
+		(m->flags & CEPH_OSD_FLAG_ONDISK) ||
+		!(req->r_flags & CEPH_OSD_FLAG_ONDISK));
+}
 
+/*
+ * handle osd op reply.  either call the callback if it is specified,
+ * or do the completion to wake up the waiting thread.
+ *
+ * ->r_unsafe_callback is set?	yes			no
+ *
+ * first reply is OK (needed	r_cb/r_completion,	r_cb/r_completion,
+ * any or needed/got safe)	r_safe_completion	r_safe_completion
+ *
+ * first reply is unsafe	r_unsafe_cb(true)	(nothing)
+ *
+ * when we get the safe reply	r_unsafe_cb(false),	r_cb/r_completion,
+ *				r_safe_completion	r_safe_completion
+ */
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+	struct ceph_osd_request *req;
+	struct MOSDOpReply m;
+	u64 tid = le64_to_cpu(msg->hdr.tid);
+	u32 data_len = 0;
+	bool already_acked;
+	int ret;
+	int i;
+
+	dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
+	down_read(&osdc->map_sem);
+	mutex_lock(&osdc->request_mutex);
+	req = lookup_request(&osdc->requests, tid);
+	if (!req) {
+		dout("%s no tid %llu\n", __func__, tid);
+		goto out_unlock;
+	}
+	ceph_osdc_get_request(req);
+
+	ret = decode_MOSDOpReply(msg, &m);
+	if (ret) {
+		pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
+		       req->r_tid, ret);
+		ceph_msg_dump(msg);
+		goto fail_request;
+	}
+	dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
+	     __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
+	     m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
+	     le64_to_cpu(m.replay_version.version), m.user_version);
+
+	if (m.retry_attempt >= 0) {
+		if (m.retry_attempt != req->r_attempts - 1) {
+			dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
+			     req, req->r_tid, m.retry_attempt,
+			     req->r_attempts - 1);
+			goto out_put;
+		}
+	} else {
+		WARN_ON(1); /* MOSDOpReply v4 is assumed */
+	}
+
+	if (!ceph_oloc_empty(&m.redirect.oloc)) {
+		dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
+		     m.redirect.oloc.pool);
 		__unregister_request(osdc, req);
 
-		ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc);
+		ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
 
 		/*
 		 * Start redirect requests with nofail=true.  If
@@ -2005,85 +2059,85 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 		 * successfully.  In the future we might want to follow
 		 * original request's nofail setting here.
 		 */
-		err = __ceph_osdc_start_request(osdc, req, true);
-		BUG_ON(err);
+		ret = __ceph_osdc_start_request(osdc, req, true);
+		BUG_ON(ret);
 
-		goto out_unlock;
+		goto out_put;
 	}
 
-	already_completed = req->r_got_reply;
-	if (!req->r_got_reply) {
-		req->r_result = result;
-		dout("handle_reply result %d bytes %d\n", req->r_result,
-		     bytes);
-		if (req->r_result == 0)
-			req->r_result = bytes;
-
-		/* in case this is a write and we need to replay, */
-		req->r_replay_version.epoch = cpu_to_le32(reassert_epoch);
-		req->r_replay_version.version = cpu_to_le64(reassert_version);
-
-		req->r_got_reply = 1;
-	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
-		dout("handle_reply tid %llu dup ack\n", tid);
-		goto out_unlock;
+	if (m.num_ops != req->r_num_ops) {
+		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
+		       req->r_num_ops, req->r_tid);
+		goto fail_request;
 	}
-
-	dout("handle_reply tid %llu flags %d\n", tid, flags);
-
-	if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
-		__register_linger_request(osdc, req);
-
-	/* either this is a read, or we got the safe response */
-	if (result < 0 ||
-	    (flags & CEPH_OSD_FLAG_ONDISK) ||
-	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
+	for (i = 0; i < req->r_num_ops; i++) {
+		dout(" req %p tid %llu op %d rval %d len %u\n", req,
+		     req->r_tid, i, m.rval[i], m.outdata_len[i]);
+		req->r_ops[i].rval = m.rval[i];
+		req->r_ops[i].outdata_len = m.outdata_len[i];
+		data_len += m.outdata_len[i];
+	}
+	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
+		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
+		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
+		goto fail_request;
+	}
+	dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
+	     req, req->r_tid, req->r_got_reply, m.result, data_len);
+
+	already_acked = req->r_got_reply;
+	if (!already_acked) {
+		req->r_result = m.result ?: data_len;
+		req->r_replay_version = m.replay_version; /* struct */
+		req->r_got_reply = true;
+	} else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
+		dout("req %p tid %llu dup ack\n", req, req->r_tid);
+		goto out_put;
+	}
+
+	if (done_request(req, &m)) {
 		__unregister_request(osdc, req);
+		if (req->r_linger) {
+			WARN_ON(req->r_unsafe_callback);
+			__register_linger_request(osdc, req);
+		}
+	}
 
 	mutex_unlock(&osdc->request_mutex);
 	up_read(&osdc->map_sem);
 
-	if (!already_completed) {
-		if (req->r_unsafe_callback &&
-		    result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK))
-			req->r_unsafe_callback(req, true);
-		if (req->r_callback)
-			req->r_callback(req);
-		else
-			complete_all(&req->r_completion);
-	}
-
-	if (flags & CEPH_OSD_FLAG_ONDISK) {
-		if (req->r_unsafe_callback && already_completed)
+	if (done_request(req, &m)) {
+		if (already_acked && req->r_unsafe_callback) {
+			dout("req %p tid %llu safe-cb\n", req, req->r_tid);
 			req->r_unsafe_callback(req, false);
-		complete_request(req);
+		} else {
+			dout("req %p tid %llu cb\n", req, req->r_tid);
+			__complete_request(req);
+		}
+	} else {
+		if (req->r_unsafe_callback) {
+			dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
+			req->r_unsafe_callback(req, true);
+		} else {
+			WARN_ON(1);
+		}
 	}
+	if (m.flags & CEPH_OSD_FLAG_ONDISK)
+		complete_all(&req->r_safe_completion);
 
-out:
-	dout("req=%p req->r_linger=%d\n", req, req->r_linger);
 	ceph_osdc_put_request(req);
 	return;
-out_unlock:
-	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
-	goto out;
 
-bad_put:
+fail_request:
 	req->r_result = -EIO;
 	__unregister_request(osdc, req);
-	if (req->r_callback)
-		req->r_callback(req);
-	else
-		complete_all(&req->r_completion);
-	complete_request(req);
+	__complete_request(req);
+	complete_all(&req->r_safe_completion);
+out_put:
 	ceph_osdc_put_request(req);
-bad_mutex:
+out_unlock:
 	mutex_unlock(&osdc->request_mutex);
 	up_read(&osdc->map_sem);
-bad:
-	pr_err("corrupt osd_op_reply got %d %d\n",
-	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
-	ceph_msg_dump(msg);
 }
 
 static void reset_changed_osds(struct ceph_osd_client *osdc)
@@ -2591,7 +2645,9 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
 	if (rc < 0) {
 		dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid);
 		ceph_osdc_cancel_request(req);
-		complete_request(req);
+
+		/* kludge - need to to wake ceph_osdc_sync() */
+		complete_all(&req->r_safe_completion);
 		return rc;
 	}