summary refs log tree commit diff
path: root/net/ceph
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-01-30 07:54:34 -0600
committerAlex Elder <elder@inktank.com>2013-01-30 07:54:34 -0600
commit969e5aa3b0162a02c4f287d48ff58ca2145acf1b (patch)
tree1af8e8e47e7352c6d3b4abfdb4aea6bd9458666f /net/ceph
parent949db153b6466c6f7cad5a427ecea94985927311 (diff)
parent1ec3911dbd19076bcdfe5540096ff67f91a6ec02 (diff)
downloadlinux-969e5aa3b0162a02c4f287d48ff58ca2145acf1b.tar.gz
Merge branch 'testing' of github.com:ceph/ceph-client into v3.8-rc5-testing
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/crush/mapper.c15
-rw-r--r--net/ceph/osd_client.c206
-rw-r--r--net/ceph/osdmap.c43
3 files changed, 103 insertions, 161 deletions
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 35fce755ce10..cbd06a91941c 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in
  * @outpos: our position in that vector
  * @firstn: true if choosing "first n" items, false if choosing "indep"
  * @recurse_to_leaf: true if we want one device under each item of given type
+ * @descend_once: true if we should only try one descent before giving up
  * @out2: second output vector for leaf items (if @recurse_to_leaf)
  */
 static int crush_choose(const struct crush_map *map,
@@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map,
 			int x, int numrep, int type,
 			int *out, int outpos,
 			int firstn, int recurse_to_leaf,
-			int *out2)
+			int descend_once, int *out2)
 {
 	int rep;
 	unsigned int ftotal, flocal;
@@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map,
 				}
 
 				reject = 0;
-				if (recurse_to_leaf) {
+				if (!collide && recurse_to_leaf) {
 					if (item < 0) {
 						if (crush_choose(map,
 							 map->buckets[-1-item],
@@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map,
 							 x, outpos+1, 0,
 							 out2, outpos,
 							 firstn, 0,
+							 map->chooseleaf_descend_once,
 							 NULL) <= outpos)
 							/* didn't get leaf */
 							reject = 1;
@@ -422,7 +424,10 @@ reject:
 					ftotal++;
 					flocal++;
 
-					if (collide && flocal <= map->choose_local_tries)
+					if (reject && descend_once)
+						/* let outer call try again */
+						skip_rep = 1;
+					else if (collide && flocal <= map->choose_local_tries)
 						/* retry locally a few times */
 						retry_bucket = 1;
 					else if (map->choose_local_fallback_tries > 0 &&
@@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map,
 	int i, j;
 	int numrep;
 	int firstn;
+	const int descend_once = 0;
 
 	if ((__u32)ruleno >= map->max_rules) {
 		dprintk(" bad ruleno %d\n", ruleno);
@@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map,
 						      curstep->arg2,
 						      o+osize, j,
 						      firstn,
-						      recurse_to_leaf, c+osize);
+						      recurse_to_leaf,
+						      descend_once, c+osize);
 			}
 
 			if (recurse_to_leaf)
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index eb9a44478764..500ae8b49321 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -32,52 +32,43 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
 static void __send_request(struct ceph_osd_client *osdc,
 			   struct ceph_osd_request *req);
 
-static int op_needs_trail(int op)
-{
-	switch (op) {
-	case CEPH_OSD_OP_GETXATTR:
-	case CEPH_OSD_OP_SETXATTR:
-	case CEPH_OSD_OP_CMPXATTR:
-	case CEPH_OSD_OP_CALL:
-	case CEPH_OSD_OP_NOTIFY:
-		return 1;
-	default:
-		return 0;
-	}
-}
-
 static int op_has_extent(int op)
 {
 	return (op == CEPH_OSD_OP_READ ||
 		op == CEPH_OSD_OP_WRITE);
 }
 
-int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
-			struct ceph_file_layout *layout,
-			u64 snapid,
+int ceph_calc_raw_layout(struct ceph_file_layout *layout,
 			u64 off, u64 *plen, u64 *bno,
 			struct ceph_osd_request *req,
 			struct ceph_osd_req_op *op)
 {
-	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 	u64 orig_len = *plen;
 	u64 objoff, objlen;    /* extent in object */
 	int r;
 
-	reqhead->snapid = cpu_to_le64(snapid);
-
 	/* object extent? */
-	r = ceph_calc_file_object_mapping(layout, off, plen, bno,
+	r = ceph_calc_file_object_mapping(layout, off, orig_len, bno,
 					  &objoff, &objlen);
 	if (r < 0)
 		return r;
-	if (*plen < orig_len)
+	if (objlen < orig_len) {
+		*plen = objlen;
 		dout(" skipping last %llu, final file extent %llu~%llu\n",
 		     orig_len - *plen, off, *plen);
+	}
 
 	if (op_has_extent(op->op)) {
+		u32 osize = le32_to_cpu(layout->fl_object_size);
 		op->extent.offset = objoff;
 		op->extent.length = objlen;
+		if (op->extent.truncate_size <= off - objoff) {
+			op->extent.truncate_size = 0;
+		} else {
+			op->extent.truncate_size -= off - objoff;
+			if (op->extent.truncate_size > osize)
+				op->extent.truncate_size = osize;
+		}
 	}
 	req->r_num_pages = calc_pages_for(off, *plen);
 	req->r_page_alignment = off & ~PAGE_MASK;
@@ -115,8 +106,7 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static int calc_layout(struct ceph_osd_client *osdc,
-		       struct ceph_vino vino,
+static int calc_layout(struct ceph_vino vino,
 		       struct ceph_file_layout *layout,
 		       u64 off, u64 *plen,
 		       struct ceph_osd_request *req,
@@ -125,8 +115,7 @@ static int calc_layout(struct ceph_osd_client *osdc,
 	u64 bno;
 	int r;
 
-	r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-				 plen, &bno, req, op);
+	r = ceph_calc_raw_layout(layout, off, plen, &bno, req, op);
 	if (r < 0)
 		return r;
 
@@ -163,10 +152,7 @@ void ceph_osdc_release_request(struct kref *kref)
 		bio_put(req->r_bio);
 #endif
 	ceph_put_snap_context(req->r_snapc);
-	if (req->r_trail) {
-		ceph_pagelist_release(req->r_trail);
-		kfree(req->r_trail);
-	}
+	ceph_pagelist_release(&req->r_trail);
 	if (req->r_mempool)
 		mempool_free(req, req->r_osdc->req_mempool);
 	else
@@ -174,34 +160,14 @@ void ceph_osdc_release_request(struct kref *kref)
 }
 EXPORT_SYMBOL(ceph_osdc_release_request);
 
-static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
-{
-	int i = 0;
-
-	if (needs_trail)
-		*needs_trail = 0;
-	while (ops[i].op) {
-		if (needs_trail && op_needs_trail(ops[i].op))
-			*needs_trail = 1;
-		i++;
-	}
-
-	return i;
-}
-
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
-					       int flags,
 					       struct ceph_snap_context *snapc,
-					       struct ceph_osd_req_op *ops,
+					       unsigned int num_op,
 					       bool use_mempool,
-					       gfp_t gfp_flags,
-					       struct page **pages,
-					       struct bio *bio)
+					       gfp_t gfp_flags)
 {
 	struct ceph_osd_request *req;
 	struct ceph_msg *msg;
-	int needs_trail;
-	int num_op = get_num_ops(ops, &needs_trail);
 	size_t msg_size = sizeof(struct ceph_osd_request_head);
 
 	msg_size += num_op*sizeof(struct ceph_osd_op);
@@ -228,10 +194,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	INIT_LIST_HEAD(&req->r_req_lru_item);
 	INIT_LIST_HEAD(&req->r_osd_item);
 
-	req->r_flags = flags;
-
-	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
-
 	/* create reply message */
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
@@ -244,15 +206,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	}
 	req->r_reply = msg;
 
-	/* allocate space for the trailing data */
-	if (needs_trail) {
-		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
-		if (!req->r_trail) {
-			ceph_osdc_put_request(req);
-			return NULL;
-		}
-		ceph_pagelist_init(req->r_trail);
-	}
+	ceph_pagelist_init(&req->r_trail);
 
 	/* create request message; allow space for oid */
 	msg_size += MAX_OBJ_NAME_SIZE;
@@ -270,13 +224,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
 
 	req->r_request = msg;
-	req->r_pages = pages;
-#ifdef CONFIG_BLOCK
-	if (bio) {
-		req->r_bio = bio;
-		bio_get(req->r_bio);
-	}
-#endif
 
 	return req;
 }
@@ -304,29 +251,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 	case CEPH_OSD_OP_GETXATTR:
 	case CEPH_OSD_OP_SETXATTR:
 	case CEPH_OSD_OP_CMPXATTR:
-		BUG_ON(!req->r_trail);
-
 		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
 		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
 		dst->xattr.cmp_op = src->xattr.cmp_op;
 		dst->xattr.cmp_mode = src->xattr.cmp_mode;
-		ceph_pagelist_append(req->r_trail, src->xattr.name,
+		ceph_pagelist_append(&req->r_trail, src->xattr.name,
 				     src->xattr.name_len);
-		ceph_pagelist_append(req->r_trail, src->xattr.val,
+		ceph_pagelist_append(&req->r_trail, src->xattr.val,
 				     src->xattr.value_len);
 		break;
 	case CEPH_OSD_OP_CALL:
-		BUG_ON(!req->r_trail);
-
 		dst->cls.class_len = src->cls.class_len;
 		dst->cls.method_len = src->cls.method_len;
 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
 
-		ceph_pagelist_append(req->r_trail, src->cls.class_name,
+		ceph_pagelist_append(&req->r_trail, src->cls.class_name,
 				     src->cls.class_len);
-		ceph_pagelist_append(req->r_trail, src->cls.method_name,
+		ceph_pagelist_append(&req->r_trail, src->cls.method_name,
 				     src->cls.method_len);
-		ceph_pagelist_append(req->r_trail, src->cls.indata,
+		ceph_pagelist_append(&req->r_trail, src->cls.indata,
 				     src->cls.indata_len);
 		break;
 	case CEPH_OSD_OP_ROLLBACK:
@@ -339,11 +282,9 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
 			__le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
 			__le32 timeout = cpu_to_le32(src->watch.timeout);
 
-			BUG_ON(!req->r_trail);
-
-			ceph_pagelist_append(req->r_trail,
+			ceph_pagelist_append(&req->r_trail,
 						&prot_ver, sizeof(prot_ver));
-			ceph_pagelist_append(req->r_trail,
+			ceph_pagelist_append(&req->r_trail,
 						&timeout, sizeof(timeout));
 		}
 	case CEPH_OSD_OP_NOTIFY_ACK:
@@ -365,25 +306,25 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
  *
  */
 void ceph_osdc_build_request(struct ceph_osd_request *req,
-			     u64 off, u64 *plen,
+			     u64 off, u64 len, unsigned int num_op,
 			     struct ceph_osd_req_op *src_ops,
-			     struct ceph_snap_context *snapc,
-			     struct timespec *mtime,
-			     const char *oid,
-			     int oid_len)
+			     struct ceph_snap_context *snapc, u64 snap_id,
+			     struct timespec *mtime)
 {
 	struct ceph_msg *msg = req->r_request;
 	struct ceph_osd_request_head *head;
 	struct ceph_osd_req_op *src_op;
 	struct ceph_osd_op *op;
 	void *p;
-	int num_op = get_num_ops(src_ops, NULL);
 	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
 	int flags = req->r_flags;
 	u64 data_len = 0;
 	int i;
 
+	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
+
 	head = msg->front.iov_base;
+	head->snapid = cpu_to_le64(snap_id);
 	op = (void *)(head + 1);
 	p = (void *)(op + num_op);
 
@@ -393,23 +334,19 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 	head->flags = cpu_to_le32(flags);
 	if (flags & CEPH_OSD_FLAG_WRITE)
 		ceph_encode_timespec(&head->mtime, mtime);
+	BUG_ON(num_op > (unsigned int) ((u16) -1));
 	head->num_ops = cpu_to_le16(num_op);
 
-
 	/* fill in oid */
-	head->object_len = cpu_to_le32(oid_len);
-	memcpy(p, oid, oid_len);
-	p += oid_len;
+	head->object_len = cpu_to_le32(req->r_oid_len);
+	memcpy(p, req->r_oid, req->r_oid_len);
+	p += req->r_oid_len;
 
 	src_op = src_ops;
-	while (src_op->op) {
-		osd_req_encode_op(req, op, src_op);
-		src_op++;
-		op++;
-	}
+	while (num_op--)
+		osd_req_encode_op(req, op++, src_op++);
 
-	if (req->r_trail)
-		data_len += req->r_trail->length;
+	data_len += req->r_trail.length;
 
 	if (snapc) {
 		head->snap_seq = cpu_to_le64(snapc->seq);
@@ -422,7 +359,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
 
 	if (flags & CEPH_OSD_FLAG_WRITE) {
 		req->r_request->hdr.data_off = cpu_to_le16(off);
-		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
+		req->r_request->hdr.data_len = cpu_to_le32(len + data_len);
 	} else if (data_len) {
 		req->r_request->hdr.data_off = 0;
 		req->r_request->hdr.data_len = cpu_to_le32(data_len);
@@ -462,31 +399,30 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 					       bool use_mempool, int num_reply,
 					       int page_align)
 {
-	struct ceph_osd_req_op ops[3];
+	struct ceph_osd_req_op ops[2];
 	struct ceph_osd_request *req;
+	unsigned int num_op = 1;
 	int r;
 
+	memset(&ops, 0, sizeof ops);
+
 	ops[0].op = opcode;
 	ops[0].extent.truncate_seq = truncate_seq;
 	ops[0].extent.truncate_size = truncate_size;
-	ops[0].payload_len = 0;
 
 	if (do_sync) {
 		ops[1].op = CEPH_OSD_OP_STARTSYNC;
-		ops[1].payload_len = 0;
-		ops[2].op = 0;
-	} else
-		ops[1].op = 0;
-
-	req = ceph_osdc_alloc_request(osdc, flags,
-					 snapc, ops,
-					 use_mempool,
-					 GFP_NOFS, NULL, NULL);
+		num_op++;
+	}
+
+	req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
+					GFP_NOFS);
 	if (!req)
 		return ERR_PTR(-ENOMEM);
+	req->r_flags = flags;
 
 	/* calculate max write size */
-	r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+	r = calc_layout(vino, layout, off, plen, req, ops);
 	if (r < 0)
 		return ERR_PTR(r);
 	req->r_file_layout = *layout;  /* keep a copy */
@@ -496,10 +432,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 	req->r_num_pages = calc_pages_for(page_align, *plen);
 	req->r_page_alignment = page_align;
 
-	ceph_osdc_build_request(req, off, plen, ops,
-				snapc,
-				mtime,
-				req->r_oid, req->r_oid_len);
+	ceph_osdc_build_request(req, off, *plen, num_op, ops,
+				snapc, vino.snap, mtime);
 
 	return req;
 }
@@ -739,31 +673,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-	struct ceph_osd_request *req;
-	int ret = 0;
+	struct ceph_entity_addr *peer_addr;
 
 	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 	if (list_empty(&osd->o_requests) &&
 	    list_empty(&osd->o_linger_requests)) {
 		__remove_osd(osdc, osd);
-		ret = -ENODEV;
-	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
-			  &osd->o_con.peer_addr,
-			  sizeof(osd->o_con.peer_addr)) == 0 &&
-		   !ceph_con_opened(&osd->o_con)) {
+
+		return -ENODEV;
+	}
+
+	peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
+			!ceph_con_opened(&osd->o_con)) {
+		struct ceph_osd_request *req;
+
 		dout(" osd addr hasn't changed and connection never opened,"
 		     " letting msgr retry");
 		/* touch each r_stamp for handle_timeout()'s benfit */
 		list_for_each_entry(req, &osd->o_requests, r_osd_item)
 			req->r_stamp = jiffies;
-		ret = -EAGAIN;
-	} else {
-		ceph_con_close(&osd->o_con);
-		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
-			      &osdc->osdmap->osd_addr[osd->o_osd]);
-		osd->o_incarnation++;
+
+		return -EAGAIN;
 	}
-	return ret;
+
+	ceph_con_close(&osd->o_con);
+	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
+	osd->o_incarnation++;
+
+	return 0;
 }
 
 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
@@ -1706,7 +1644,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 #ifdef CONFIG_BLOCK
 	req->r_request->bio = req->r_bio;
 #endif
-	req->r_request->trail = req->r_trail;
+	req->r_request->trail = &req->r_trail;
 
 	register_request(osdc, req);
 
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index de73214b5d26..3c61e21611d3 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -13,26 +13,18 @@
 
 char *ceph_osdmap_state_str(char *str, int len, int state)
 {
-	int flag = 0;
-
 	if (!len)
-		goto done;
-
-	*str = '\0';
-	if (state) {
-		if (state & CEPH_OSD_EXISTS) {
-			snprintf(str, len, "exists");
-			flag = 1;
-		}
-		if (state & CEPH_OSD_UP) {
-			snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""),
-				 "up");
-			flag = 1;
-		}
-	} else {
+		return str;
+
+	if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP))
+		snprintf(str, len, "exists, up");
+	else if (state & CEPH_OSD_EXISTS)
+		snprintf(str, len, "exists");
+	else if (state & CEPH_OSD_UP)
+		snprintf(str, len, "up");
+	else
 		snprintf(str, len, "doesn't exist");
-	}
-done:
+
 	return str;
 }
 
@@ -170,6 +162,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
         c->choose_local_tries = 2;
         c->choose_local_fallback_tries = 5;
         c->choose_total_tries = 19;
+	c->chooseleaf_descend_once = 0;
 
 	ceph_decode_need(p, end, 4*sizeof(u32), bad);
 	magic = ceph_decode_32(p);
@@ -336,6 +329,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
         dout("crush decode tunable choose_total_tries = %d",
              c->choose_total_tries);
 
+	ceph_decode_need(p, end, sizeof(u32), done);
+	c->chooseleaf_descend_once = ceph_decode_32(p);
+	dout("crush decode tunable chooseleaf_descend_once = %d",
+	     c->chooseleaf_descend_once);
+
 done:
 	dout("crush_decode success\n");
 	return c;
@@ -1010,7 +1008,7 @@ bad:
  * pass a stride back to the caller.
  */
 int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
-				   u64 off, u64 *plen,
+				   u64 off, u64 len,
 				   u64 *ono,
 				   u64 *oxoff, u64 *oxlen)
 {
@@ -1021,7 +1019,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
 	u32 su_per_object;
 	u64 t, su_offset;
 
-	dout("mapping %llu~%llu  osize %u fl_su %u\n", off, *plen,
+	dout("mapping %llu~%llu  osize %u fl_su %u\n", off, len,
 	     osize, su);
 	if (su == 0 || sc == 0)
 		goto invalid;
@@ -1054,11 +1052,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout,
 
 	/*
 	 * Calculate the length of the extent being written to the selected
-	 * object. This is the minimum of the full length requested (plen) or
+	 * object. This is the minimum of the full length requested (len) or
 	 * the remainder of the current stripe being written to.
 	 */
-	*oxlen = min_t(u64, *plen, su - su_offset);
-	*plen = *oxlen;
+	*oxlen = min_t(u64, len, su - su_offset);
 
 	dout(" obj extent %llu~%llu\n", *oxoff, *oxlen);
 	return 0;