summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-02-28 15:36:09 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2017-02-28 15:36:09 -0800
commitb2deee2dc06db7cdf99b84346e69bdb9db9baa85 (patch)
treeceb073fa12c1a9804761ec8ce8911a517b007ed6 /net
parentd4f4cf77b37eaea58ef863a4cbc95dad3880b524 (diff)
parent54ea0046b6fe36ec18e82d282a29a18da6cdea0f (diff)
downloadlinux-b2deee2dc06db7cdf99b84346e69bdb9db9baa85.tar.gz
Merge tag 'ceph-for-4.11-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "This time around we have:

   - support for rbd data-pool feature, which enables rbd images on
     erasure-coded pools (myself). CEPH_PG_MAX_SIZE has been bumped to
     allow erasure-coded profiles with k+m up to 32.

   - a patch for ceph_d_revalidate() performance regression introduced
     in 4.9, along with some cleanups in the area (Jeff Layton)

   - a set of fixes for unsafe ->d_parent accesses in CephFS (Jeff
     Layton)

   - buffered reads are now processed in rsize windows instead of rasize
     windows (Andreas Gerstmayr). The new default for rsize mount option
     is 64M.

   - ack vs commit distinction is gone, greatly simplifying ->fsync()
     and MOSDOpReply handling code (myself)

  ... also a few filesystem bug fixes from Zheng, a CRUSH sync up (CRUSH
  computations are still serialized though) and several minor fixes and
  cleanups all over"

* tag 'ceph-for-4.11-rc1' of git://github.com/ceph/ceph-client: (52 commits)
  libceph, rbd, ceph: WRITE | ONDISK -> WRITE
  libceph: get rid of ack vs commit
  ceph: remove special ack vs commit behavior
  ceph: tidy some white space in get_nonsnap_parent()
  crush: fix dprintk compilation
  crush: do is_out test only if we do not collide
  ceph: remove req from unsafe list when unregistering it
  rbd: constify device_type structure
  rbd: kill obj_request->object_name and rbd_segment_name_cache
  rbd: store and use obj_request->object_no
  rbd: RBD_V{1,2}_DATA_FORMAT macros
  rbd: factor out __rbd_osd_req_create()
  rbd: set offset and length outside of rbd_obj_request_create()
  rbd: support for data-pool feature
  rbd: introduce rbd_init_layout()
  rbd: use rbd_obj_bytes() more
  rbd: remove now unused rbd_obj_request_wait() and helpers
  rbd: switch rbd_obj_method_sync() to ceph_osdc_call()
  libceph: pass reply buffer length through ceph_osdc_call()
  rbd: do away with obj_request in rbd_obj_read_sync()
  ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/cls_lock_client.c14
-rw-r--r--net/ceph/crush/crush.c5
-rw-r--r--net/ceph/crush/mapper.c227
-rw-r--r--net/ceph/crypto.c1
-rw-r--r--net/ceph/osd_client.c130
-rw-r--r--net/ceph/osdmap.c101
-rw-r--r--net/ceph/snapshot.c2
7 files changed, 261 insertions, 219 deletions
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index 50f040fdb2a9..b9233b990399 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -69,8 +69,8 @@ int ceph_cls_lock(struct ceph_osd_client *osdc,
 	dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
 	     __func__, lock_name, type, cookie, tag, desc, flags);
 	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
-			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			     lock_op_page, lock_op_buf_size, NULL, NULL);
+			     CEPH_OSD_FLAG_WRITE, lock_op_page,
+			     lock_op_buf_size, NULL, NULL);
 
 	dout("%s: status %d\n", __func__, ret);
 	__free_page(lock_op_page);
@@ -117,8 +117,8 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc,
 
 	dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
 	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
-			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			     unlock_op_page, unlock_op_buf_size, NULL, NULL);
+			     CEPH_OSD_FLAG_WRITE, unlock_op_page,
+			     unlock_op_buf_size, NULL, NULL);
 
 	dout("%s: status %d\n", __func__, ret);
 	__free_page(unlock_op_page);
@@ -170,8 +170,8 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
 	dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
 	     cookie, ENTITY_NAME(*locker));
 	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
-			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			     break_op_page, break_op_buf_size, NULL, NULL);
+			     CEPH_OSD_FLAG_WRITE, break_op_page,
+			     break_op_buf_size, NULL, NULL);
 
 	dout("%s: status %d\n", __func__, ret);
 	__free_page(break_op_page);
@@ -278,7 +278,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc,
 	int get_info_op_buf_size;
 	int name_len = strlen(lock_name);
 	struct page *get_info_op_page, *reply_page;
-	size_t reply_len;
+	size_t reply_len = PAGE_SIZE;
 	void *p, *end;
 	int ret;
 
diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c
index 80d7c3a97cb8..5bf94c04f645 100644
--- a/net/ceph/crush/crush.c
+++ b/net/ceph/crush/crush.c
@@ -45,7 +45,6 @@ int crush_get_bucket_item_weight(const struct crush_bucket *b, int p)
 
 void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b)
 {
-	kfree(b->h.perm);
 	kfree(b->h.items);
 	kfree(b);
 }
@@ -54,14 +53,12 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b)
 {
 	kfree(b->item_weights);
 	kfree(b->sum_weights);
-	kfree(b->h.perm);
 	kfree(b->h.items);
 	kfree(b);
 }
 
 void crush_destroy_bucket_tree(struct crush_bucket_tree *b)
 {
-	kfree(b->h.perm);
 	kfree(b->h.items);
 	kfree(b->node_weights);
 	kfree(b);
@@ -71,7 +68,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b)
 {
 	kfree(b->straws);
 	kfree(b->item_weights);
-	kfree(b->h.perm);
 	kfree(b->h.items);
 	kfree(b);
 }
@@ -79,7 +75,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b)
 void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b)
 {
 	kfree(b->item_weights);
-	kfree(b->h.perm);
 	kfree(b->h.items);
 	kfree(b);
 }
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 130ab407c5ec..b5cd8c21bfdf 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -54,7 +54,6 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size
 	return -1;
 }
 
-
 /*
  * bucket choose methods
  *
@@ -72,59 +71,60 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size
  * Since this is expensive, we optimize for the r=0 case, which
  * captures the vast majority of calls.
  */
-static int bucket_perm_choose(struct crush_bucket *bucket,
+static int bucket_perm_choose(const struct crush_bucket *bucket,
+			      struct crush_work_bucket *work,
 			      int x, int r)
 {
 	unsigned int pr = r % bucket->size;
 	unsigned int i, s;
 
 	/* start a new permutation if @x has changed */
-	if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) {
+	if (work->perm_x != (__u32)x || work->perm_n == 0) {
 		dprintk("bucket %d new x=%d\n", bucket->id, x);
-		bucket->perm_x = x;
+		work->perm_x = x;
 
 		/* optimize common r=0 case */
 		if (pr == 0) {
 			s = crush_hash32_3(bucket->hash, x, bucket->id, 0) %
 				bucket->size;
-			bucket->perm[0] = s;
-			bucket->perm_n = 0xffff;   /* magic value, see below */
+			work->perm[0] = s;
+			work->perm_n = 0xffff;   /* magic value, see below */
 			goto out;
 		}
 
 		for (i = 0; i < bucket->size; i++)
-			bucket->perm[i] = i;
-		bucket->perm_n = 0;
-	} else if (bucket->perm_n == 0xffff) {
+			work->perm[i] = i;
+		work->perm_n = 0;
+	} else if (work->perm_n == 0xffff) {
 		/* clean up after the r=0 case above */
 		for (i = 1; i < bucket->size; i++)
-			bucket->perm[i] = i;
-		bucket->perm[bucket->perm[0]] = 0;
-		bucket->perm_n = 1;
+			work->perm[i] = i;
+		work->perm[work->perm[0]] = 0;
+		work->perm_n = 1;
 	}
 
 	/* calculate permutation up to pr */
-	for (i = 0; i < bucket->perm_n; i++)
-		dprintk(" perm_choose have %d: %d\n", i, bucket->perm[i]);
-	while (bucket->perm_n <= pr) {
-		unsigned int p = bucket->perm_n;
+	for (i = 0; i < work->perm_n; i++)
+		dprintk(" perm_choose have %d: %d\n", i, work->perm[i]);
+	while (work->perm_n <= pr) {
+		unsigned int p = work->perm_n;
 		/* no point in swapping the final entry */
 		if (p < bucket->size - 1) {
 			i = crush_hash32_3(bucket->hash, x, bucket->id, p) %
 				(bucket->size - p);
 			if (i) {
-				unsigned int t = bucket->perm[p + i];
-				bucket->perm[p + i] = bucket->perm[p];
-				bucket->perm[p] = t;
+				unsigned int t = work->perm[p + i];
+				work->perm[p + i] = work->perm[p];
+				work->perm[p] = t;
 			}
 			dprintk(" perm_choose swap %d with %d\n", p, p+i);
 		}
-		bucket->perm_n++;
+		work->perm_n++;
 	}
 	for (i = 0; i < bucket->size; i++)
-		dprintk(" perm_choose  %d: %d\n", i, bucket->perm[i]);
+		dprintk(" perm_choose  %d: %d\n", i, work->perm[i]);
 
-	s = bucket->perm[pr];
+	s = work->perm[pr];
 out:
 	dprintk(" perm_choose %d sz=%d x=%d r=%d (%d) s=%d\n", bucket->id,
 		bucket->size, x, r, pr, s);
@@ -132,14 +132,14 @@ out:
 }
 
 /* uniform */
-static int bucket_uniform_choose(struct crush_bucket_uniform *bucket,
-				 int x, int r)
+static int bucket_uniform_choose(const struct crush_bucket_uniform *bucket,
+				 struct crush_work_bucket *work, int x, int r)
 {
-	return bucket_perm_choose(&bucket->h, x, r);
+	return bucket_perm_choose(&bucket->h, work, x, r);
 }
 
 /* list */
-static int bucket_list_choose(struct crush_bucket_list *bucket,
+static int bucket_list_choose(const struct crush_bucket_list *bucket,
 			      int x, int r)
 {
 	int i;
@@ -155,8 +155,9 @@ static int bucket_list_choose(struct crush_bucket_list *bucket,
 		w *= bucket->sum_weights[i];
 		w = w >> 16;
 		/*dprintk(" scaled %llx\n", w);*/
-		if (w < bucket->item_weights[i])
+		if (w < bucket->item_weights[i]) {
 			return bucket->h.items[i];
+		}
 	}
 
 	dprintk("bad list sums for bucket %d\n", bucket->h.id);
@@ -192,7 +193,7 @@ static int terminal(int x)
 	return x & 1;
 }
 
-static int bucket_tree_choose(struct crush_bucket_tree *bucket,
+static int bucket_tree_choose(const struct crush_bucket_tree *bucket,
 			      int x, int r)
 {
 	int n;
@@ -224,7 +225,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket,
 
 /* straw */
 
-static int bucket_straw_choose(struct crush_bucket_straw *bucket,
+static int bucket_straw_choose(const struct crush_bucket_straw *bucket,
 			       int x, int r)
 {
 	__u32 i;
@@ -301,7 +302,7 @@ static __u64 crush_ln(unsigned int xin)
  *
  */
 
-static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket,
+static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
 				int x, int r)
 {
 	unsigned int i, high = 0;
@@ -344,37 +345,42 @@ static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket,
 			high_draw = draw;
 		}
 	}
+
 	return bucket->h.items[high];
 }
 
 
-static int crush_bucket_choose(struct crush_bucket *in, int x, int r)
+static int crush_bucket_choose(const struct crush_bucket *in,
+			       struct crush_work_bucket *work,
+			       int x, int r)
 {
 	dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
 	BUG_ON(in->size == 0);
 	switch (in->alg) {
 	case CRUSH_BUCKET_UNIFORM:
-		return bucket_uniform_choose((struct crush_bucket_uniform *)in,
-					  x, r);
+		return bucket_uniform_choose(
+			(const struct crush_bucket_uniform *)in,
+			work, x, r);
 	case CRUSH_BUCKET_LIST:
-		return bucket_list_choose((struct crush_bucket_list *)in,
+		return bucket_list_choose((const struct crush_bucket_list *)in,
 					  x, r);
 	case CRUSH_BUCKET_TREE:
-		return bucket_tree_choose((struct crush_bucket_tree *)in,
+		return bucket_tree_choose((const struct crush_bucket_tree *)in,
 					  x, r);
 	case CRUSH_BUCKET_STRAW:
-		return bucket_straw_choose((struct crush_bucket_straw *)in,
-					   x, r);
+		return bucket_straw_choose(
+			(const struct crush_bucket_straw *)in,
+			x, r);
 	case CRUSH_BUCKET_STRAW2:
-		return bucket_straw2_choose((struct crush_bucket_straw2 *)in,
-					    x, r);
+		return bucket_straw2_choose(
+			(const struct crush_bucket_straw2 *)in,
+			x, r);
 	default:
 		dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
 		return in->items[0];
 	}
 }
 
-
 /*
  * true if device is marked "out" (failed, fully offloaded)
  * of the cluster
@@ -416,7 +422,8 @@ static int is_out(const struct crush_map *map,
  * @parent_r: r value passed from the parent
  */
 static int crush_choose_firstn(const struct crush_map *map,
-			       struct crush_bucket *bucket,
+			       struct crush_work *work,
+			       const struct crush_bucket *bucket,
 			       const __u32 *weight, int weight_max,
 			       int x, int numrep, int type,
 			       int *out, int outpos,
@@ -434,7 +441,7 @@ static int crush_choose_firstn(const struct crush_map *map,
 	int rep;
 	unsigned int ftotal, flocal;
 	int retry_descent, retry_bucket, skip_rep;
-	struct crush_bucket *in = bucket;
+	const struct crush_bucket *in = bucket;
 	int r;
 	int i;
 	int item = 0;
@@ -473,9 +480,13 @@ static int crush_choose_firstn(const struct crush_map *map,
 				if (local_fallback_retries > 0 &&
 				    flocal >= (in->size>>1) &&
 				    flocal > local_fallback_retries)
-					item = bucket_perm_choose(in, x, r);
+					item = bucket_perm_choose(
+						in, work->work[-1-in->id],
+						x, r);
 				else
-					item = crush_bucket_choose(in, x, r);
+					item = crush_bucket_choose(
+						in, work->work[-1-in->id],
+						x, r);
 				if (item >= map->max_devices) {
 					dprintk("   bad item %d\n", item);
 					skip_rep = 1;
@@ -518,19 +529,21 @@ static int crush_choose_firstn(const struct crush_map *map,
 							sub_r = r >> (vary_r-1);
 						else
 							sub_r = 0;
-						if (crush_choose_firstn(map,
-							 map->buckets[-1-item],
-							 weight, weight_max,
-							 x, stable ? 1 : outpos+1, 0,
-							 out2, outpos, count,
-							 recurse_tries, 0,
-							 local_retries,
-							 local_fallback_retries,
-							 0,
-							 vary_r,
-							 stable,
-							 NULL,
-							 sub_r) <= outpos)
+						if (crush_choose_firstn(
+							    map,
+							    work,
+							    map->buckets[-1-item],
+							    weight, weight_max,
+							    x, stable ? 1 : outpos+1, 0,
+							    out2, outpos, count,
+							    recurse_tries, 0,
+							    local_retries,
+							    local_fallback_retries,
+							    0,
+							    vary_r,
+							    stable,
+							    NULL,
+							    sub_r) <= outpos)
 							/* didn't get leaf */
 							reject = 1;
 					} else {
@@ -539,14 +552,12 @@ static int crush_choose_firstn(const struct crush_map *map,
 					}
 				}
 
-				if (!reject) {
+				if (!reject && !collide) {
 					/* out? */
 					if (itemtype == 0)
 						reject = is_out(map, weight,
 								weight_max,
 								item, x);
-					else
-						reject = 0;
 				}
 
 reject:
@@ -600,7 +611,8 @@ reject:
  *
  */
 static void crush_choose_indep(const struct crush_map *map,
-			       struct crush_bucket *bucket,
+			       struct crush_work *work,
+			       const struct crush_bucket *bucket,
 			       const __u32 *weight, int weight_max,
 			       int x, int left, int numrep, int type,
 			       int *out, int outpos,
@@ -610,7 +622,7 @@ static void crush_choose_indep(const struct crush_map *map,
 			       int *out2,
 			       int parent_r)
 {
-	struct crush_bucket *in = bucket;
+	const struct crush_bucket *in = bucket;
 	int endpos = outpos + left;
 	int rep;
 	unsigned int ftotal;
@@ -678,7 +690,9 @@ static void crush_choose_indep(const struct crush_map *map,
 					break;
 				}
 
-				item = crush_bucket_choose(in, x, r);
+				item = crush_bucket_choose(
+					in, work->work[-1-in->id],
+					x, r);
 				if (item >= map->max_devices) {
 					dprintk("   bad item %d\n", item);
 					out[rep] = CRUSH_ITEM_NONE;
@@ -724,13 +738,15 @@ static void crush_choose_indep(const struct crush_map *map,
 
 				if (recurse_to_leaf) {
 					if (item < 0) {
-						crush_choose_indep(map,
-						   map->buckets[-1-item],
-						   weight, weight_max,
-						   x, 1, numrep, 0,
-						   out2, rep,
-						   recurse_tries, 0,
-						   0, NULL, r);
+						crush_choose_indep(
+							map,
+							work,
+							map->buckets[-1-item],
+							weight, weight_max,
+							x, 1, numrep, 0,
+							out2, rep,
+							recurse_tries, 0,
+							0, NULL, r);
 						if (out2[rep] == CRUSH_ITEM_NONE) {
 							/* placed nothing; no leaf */
 							break;
@@ -781,6 +797,53 @@ static void crush_choose_indep(const struct crush_map *map,
 #endif
 }
 
+
+/*
+ * This takes a chunk of memory and sets it up to be a shiny new
+ * working area for a CRUSH placement computation. It must be called
+ * on any newly allocated memory before passing it in to
+ * crush_do_rule. It may be used repeatedly after that, so long as the
+ * map has not changed. If the map /has/ changed, you must make sure
+ * the working size is no smaller than what was allocated and re-run
+ * crush_init_workspace.
+ *
+ * If you do retain the working space between calls to crush, make it
+ * thread-local.
+ */
+void crush_init_workspace(const struct crush_map *map, void *v)
+{
+	struct crush_work *w = v;
+	__s32 b;
+
+	/*
+	 * We work by moving through the available space and setting
+	 * values and pointers as we go.
+	 *
+	 * It's a bit like Forth's use of the 'allot' word since we
+	 * set the pointer first and then reserve the space for it to
+	 * point to by incrementing the point.
+	 */
+	v += sizeof(struct crush_work *);
+	w->work = v;
+	v += map->max_buckets * sizeof(struct crush_work_bucket *);
+	for (b = 0; b < map->max_buckets; ++b) {
+		if (!map->buckets[b])
+			continue;
+
+		w->work[b] = v;
+		switch (map->buckets[b]->alg) {
+		default:
+			v += sizeof(struct crush_work_bucket);
+			break;
+		}
+		w->work[b]->perm_x = 0;
+		w->work[b]->perm_n = 0;
+		w->work[b]->perm = v;
+		v += map->buckets[b]->size * sizeof(__u32);
+	}
+	BUG_ON(v - (void *)w != map->working_size);
+}
+
 /**
  * crush_do_rule - calculate a mapping with the given input and rule
  * @map: the crush_map
@@ -790,24 +853,25 @@ static void crush_choose_indep(const struct crush_map *map,
  * @result_max: maximum result size
  * @weight: weight vector (for map leaves)
  * @weight_max: size of weight vector
- * @scratch: scratch vector for private use; must be >= 3 * result_max
+ * @cwin: pointer to at least crush_work_size() bytes of memory
  */
 int crush_do_rule(const struct crush_map *map,
 		  int ruleno, int x, int *result, int result_max,
 		  const __u32 *weight, int weight_max,
-		  int *scratch)
+		  void *cwin)
 {
 	int result_len;
-	int *a = scratch;
-	int *b = scratch + result_max;
-	int *c = scratch + result_max*2;
+	struct crush_work *cw = cwin;
+	int *a = cwin + map->working_size;
+	int *b = a + result_max;
+	int *c = b + result_max;
+	int *w = a;
+	int *o = b;
 	int recurse_to_leaf;
-	int *w;
 	int wsize = 0;
-	int *o;
 	int osize;
 	int *tmp;
-	struct crush_rule *rule;
+	const struct crush_rule *rule;
 	__u32 step;
 	int i, j;
 	int numrep;
@@ -835,12 +899,10 @@ int crush_do_rule(const struct crush_map *map,
 
 	rule = map->rules[ruleno];
 	result_len = 0;
-	w = a;
-	o = b;
 
 	for (step = 0; step < rule->len; step++) {
 		int firstn = 0;
-		struct crush_rule_step *curstep = &rule->steps[step];
+		const struct crush_rule_step *curstep = &rule->steps[step];
 
 		switch (curstep->op) {
 		case CRUSH_RULE_TAKE:
@@ -936,6 +998,7 @@ int crush_do_rule(const struct crush_map *map,
 						recurse_tries = choose_tries;
 					osize += crush_choose_firstn(
 						map,
+						cw,
 						map->buckets[bno],
 						weight, weight_max,
 						x, numrep,
@@ -956,6 +1019,7 @@ int crush_do_rule(const struct crush_map *map,
 						    numrep : (result_max-osize));
 					crush_choose_indep(
 						map,
+						cw,
 						map->buckets[bno],
 						weight, weight_max,
 						x, out_size, numrep,
@@ -997,5 +1061,6 @@ int crush_do_rule(const struct crush_map *map,
 			break;
 		}
 	}
+
 	return result_len;
 }
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index 292e33bd916e..85747b7f91a9 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -3,6 +3,7 @@
 
 #include <linux/err.h>
 #include <linux/scatterlist.h>
+#include <linux/sched.h>
 #include <linux/slab.h>
 #include <crypto/aes.h>
 #include <crypto/skcipher.h>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index f3378ba1a828..b65bbf9f45eb 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -460,7 +460,6 @@ static void request_init(struct ceph_osd_request *req)
 
 	kref_init(&req->r_kref);
 	init_completion(&req->r_completion);
-	init_completion(&req->r_done_completion);
 	RB_CLEAR_NODE(&req->r_node);
 	RB_CLEAR_NODE(&req->r_mc_node);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
@@ -672,7 +671,8 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
 	BUG_ON(length > previous);
 
 	op->extent.length = length;
-	op->indata_len -= previous - length;
+	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
+		op->indata_len -= previous - length;
 }
 EXPORT_SYMBOL(osd_req_op_extent_update);
 
@@ -1636,7 +1636,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
 	bool need_send = false;
 	bool promoted = false;
 
-	WARN_ON(req->r_tid || req->r_got_reply);
+	WARN_ON(req->r_tid);
 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
 
 again:
@@ -1704,17 +1704,10 @@ promote:
 
 static void account_request(struct ceph_osd_request *req)
 {
-	unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+	WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
+	WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
 
-	if (req->r_flags & CEPH_OSD_FLAG_READ) {
-		WARN_ON(req->r_flags & mask);
-		req->r_flags |= CEPH_OSD_FLAG_ACK;
-	} else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
-		WARN_ON(!(req->r_flags & mask));
-	else
-		WARN_ON(1);
-
-	WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
+	req->r_flags |= CEPH_OSD_FLAG_ONDISK;
 	atomic_inc(&req->r_osdc->num_requests);
 }
 
@@ -1749,15 +1742,15 @@ static void finish_request(struct ceph_osd_request *req)
 
 static void __complete_request(struct ceph_osd_request *req)
 {
-	if (req->r_callback)
+	if (req->r_callback) {
+		dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
+		     req->r_tid, req->r_callback, req->r_result);
 		req->r_callback(req);
-	else
-		complete_all(&req->r_completion);
+	}
 }
 
 /*
- * Note that this is open-coded in handle_reply(), which has to deal
- * with ack vs commit, dup acks, etc.
+ * This is open-coded in handle_reply().
  */
 static void complete_request(struct ceph_osd_request *req, int err)
 {
@@ -1766,7 +1759,7 @@ static void complete_request(struct ceph_osd_request *req, int err)
 	req->r_result = err;
 	finish_request(req);
 	__complete_request(req);
-	complete_all(&req->r_done_completion);
+	complete_all(&req->r_completion);
 	ceph_osdc_put_request(req);
 }
 
@@ -1792,7 +1785,7 @@ static void cancel_request(struct ceph_osd_request *req)
 
 	cancel_map_check(req);
 	finish_request(req);
-	complete_all(&req->r_done_completion);
+	complete_all(&req->r_completion);
 	ceph_osdc_put_request(req);
 }
 
@@ -2169,7 +2162,6 @@ static void linger_commit_cb(struct ceph_osd_request *req)
 	mutex_lock(&lreq->lock);
 	dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
 	     lreq->linger_id, req->r_result);
-	WARN_ON(!__linger_registered(lreq));
 	linger_reg_commit_complete(lreq, req->r_result);
 	lreq->committed = true;
 
@@ -2785,31 +2777,8 @@ e_inval:
 }
 
 /*
- * We are done with @req if
- *   - @m is a safe reply, or
- *   - @m is an unsafe reply and we didn't want a safe one
- */
-static bool done_request(const struct ceph_osd_request *req,
-			 const struct MOSDOpReply *m)
-{
-	return (m->result < 0 ||
-		(m->flags & CEPH_OSD_FLAG_ONDISK) ||
-		!(req->r_flags & CEPH_OSD_FLAG_ONDISK));
-}
-
-/*
- * handle osd op reply.  either call the callback if it is specified,
- * or do the completion to wake up the waiting thread.
- *
- * ->r_unsafe_callback is set?	yes			no
- *
- * first reply is OK (needed	r_cb/r_completion,	r_cb/r_completion,
- * any or needed/got safe)	r_done_completion	r_done_completion
- *
- * first reply is unsafe	r_unsafe_cb(true)	(nothing)
- *
- * when we get the safe reply	r_unsafe_cb(false),	r_cb/r_completion,
- *				r_done_completion	r_done_completion
+ * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
+ * specified.
  */
 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 {
@@ -2818,7 +2787,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 	struct MOSDOpReply m;
 	u64 tid = le64_to_cpu(msg->hdr.tid);
 	u32 data_len = 0;
-	bool already_acked;
 	int ret;
 	int i;
 
@@ -2897,50 +2865,22 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
 		goto fail_request;
 	}
-	dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__,
-	     req, req->r_tid, req->r_got_reply, m.result, data_len);
-
-	already_acked = req->r_got_reply;
-	if (!already_acked) {
-		req->r_result = m.result ?: data_len;
-		req->r_replay_version = m.replay_version; /* struct */
-		req->r_got_reply = true;
-	} else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
-		dout("req %p tid %llu dup ack\n", req, req->r_tid);
-		goto out_unlock_session;
-	}
-
-	if (done_request(req, &m)) {
-		finish_request(req);
-		if (req->r_linger) {
-			WARN_ON(req->r_unsafe_callback);
-			dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
-			__complete_request(req);
-		}
-	}
+	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
+	     req, req->r_tid, m.result, data_len);
 
+	/*
+	 * Since we only ever request ONDISK, we should only ever get
+	 * one (type of) reply back.
+	 */
+	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
+	req->r_result = m.result ?: data_len;
+	finish_request(req);
 	mutex_unlock(&osd->lock);
 	up_read(&osdc->lock);
 
-	if (done_request(req, &m)) {
-		if (already_acked && req->r_unsafe_callback) {
-			dout("req %p tid %llu safe-cb\n", req, req->r_tid);
-			req->r_unsafe_callback(req, false);
-		} else if (!req->r_linger) {
-			dout("req %p tid %llu cb\n", req, req->r_tid);
-			__complete_request(req);
-		}
-		complete_all(&req->r_done_completion);
-		ceph_osdc_put_request(req);
-	} else {
-		if (req->r_unsafe_callback) {
-			dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
-			req->r_unsafe_callback(req, true);
-		} else {
-			WARN_ON(1);
-		}
-	}
-
+	__complete_request(req);
+	complete_all(&req->r_completion);
+	ceph_osdc_put_request(req);
 	return;
 
 fail_request:
@@ -3540,7 +3480,7 @@ again:
 			up_read(&osdc->lock);
 			dout("%s waiting on req %p tid %llu last_tid %llu\n",
 			     __func__, req, req->r_tid, last_tid);
-			wait_for_completion(&req->r_done_completion);
+			wait_for_completion(&req->r_completion);
 			ceph_osdc_put_request(req);
 			goto again;
 		}
@@ -3599,7 +3539,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc,
 
 	ceph_oid_copy(&lreq->t.base_oid, oid);
 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
-	lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
 	lreq->mtime = CURRENT_TIME;
 
 	lreq->reg_req = alloc_linger_request(lreq);
@@ -3657,7 +3597,7 @@ int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
 
 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
-	req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+	req->r_flags = CEPH_OSD_FLAG_WRITE;
 	req->r_mtime = CURRENT_TIME;
 	osd_req_op_watch_init(req, 0, lreq->linger_id,
 			      CEPH_OSD_WATCH_OP_UNWATCH);
@@ -4022,7 +3962,7 @@ EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
  * Execute an OSD class method on an object.
  *
  * @flags: CEPH_OSD_FLAG_*
- * @resp_len: out param for reply length
+ * @resp_len: in/out param for reply length
  */
 int ceph_osdc_call(struct ceph_osd_client *osdc,
 		   struct ceph_object_id *oid,
@@ -4035,6 +3975,9 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
 	struct ceph_osd_request *req;
 	int ret;
 
+	if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
+		return -E2BIG;
+
 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
 	if (!req)
 		return -ENOMEM;
@@ -4053,7 +3996,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
 						  0, false, false);
 	if (resp_page)
 		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
-						   PAGE_SIZE, 0, false, false);
+						   *resp_len, 0, false, false);
 
 	ceph_osdc_start_request(osdc, req, false);
 	ret = ceph_osdc_wait_request(osdc, req);
@@ -4220,8 +4163,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 	int page_align = off & ~PAGE_MASK;
 
 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
-				    CEPH_OSD_OP_WRITE,
-				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
+				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
 				    snapc, truncate_seq, truncate_size,
 				    true);
 	if (IS_ERR(req))
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index d2436880b305..6824c0ec8373 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -153,6 +153,32 @@ bad:
         return -EINVAL;
 }
 
+static void crush_finalize(struct crush_map *c)
+{
+	__s32 b;
+
+	/* Space for the array of pointers to per-bucket workspace */
+	c->working_size = sizeof(struct crush_work) +
+	    c->max_buckets * sizeof(struct crush_work_bucket *);
+
+	for (b = 0; b < c->max_buckets; b++) {
+		if (!c->buckets[b])
+			continue;
+
+		switch (c->buckets[b]->alg) {
+		default:
+			/*
+			 * The base case, permutation variables and
+			 * the pointer to the permutation array.
+			 */
+			c->working_size += sizeof(struct crush_work_bucket);
+			break;
+		}
+		/* Every bucket has a permutation array. */
+		c->working_size += c->buckets[b]->size * sizeof(__u32);
+	}
+}
+
 static struct crush_map *crush_decode(void *pbyval, void *end)
 {
 	struct crush_map *c;
@@ -246,10 +272,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 		b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS);
 		if (b->items == NULL)
 			goto badmem;
-		b->perm = kcalloc(b->size, sizeof(u32), GFP_NOFS);
-		if (b->perm == NULL)
-			goto badmem;
-		b->perm_n = 0;
 
 		ceph_decode_need(p, end, b->size*sizeof(u32), bad);
 		for (j = 0; j < b->size; j++)
@@ -368,6 +390,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end)
 	dout("crush decode tunable chooseleaf_stable = %d\n",
 	     c->chooseleaf_stable);
 
+	crush_finalize(c);
+
 done:
 	dout("crush_decode success\n");
 	return c;
@@ -719,7 +743,7 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 	map->pool_max = -1;
 	map->pg_temp = RB_ROOT;
 	map->primary_temp = RB_ROOT;
-	mutex_init(&map->crush_scratch_mutex);
+	mutex_init(&map->crush_workspace_mutex);
 
 	return map;
 }
@@ -753,6 +777,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
 	kfree(map->osd_weight);
 	kfree(map->osd_addr);
 	kfree(map->osd_primary_affinity);
+	kfree(map->crush_workspace);
 	kfree(map);
 }
 
@@ -808,6 +833,31 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
 	return 0;
 }
 
+static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
+{
+	void *workspace;
+	size_t work_size;
+
+	if (IS_ERR(crush))
+		return PTR_ERR(crush);
+
+	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
+	dout("%s work_size %zu bytes\n", __func__, work_size);
+	workspace = kmalloc(work_size, GFP_NOIO);
+	if (!workspace) {
+		crush_destroy(crush);
+		return -ENOMEM;
+	}
+	crush_init_workspace(crush, workspace);
+
+	if (map->crush)
+		crush_destroy(map->crush);
+	kfree(map->crush_workspace);
+	map->crush = crush;
+	map->crush_workspace = workspace;
+	return 0;
+}
+
 #define OSDMAP_WRAPPER_COMPAT_VER	7
 #define OSDMAP_CLIENT_DATA_COMPAT_VER	1
 
@@ -1214,13 +1264,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map)
 
 	/* crush */
 	ceph_decode_32_safe(p, end, len, e_inval);
-	map->crush = crush_decode(*p, min(*p + len, end));
-	if (IS_ERR(map->crush)) {
-		err = PTR_ERR(map->crush);
-		map->crush = NULL;
+	err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end)));
+	if (err)
 		goto bad;
-	}
-	*p += len;
 
 	/* ignore the rest */
 	*p = end;
@@ -1375,7 +1421,6 @@ e_inval:
 struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 					     struct ceph_osdmap *map)
 {
-	struct crush_map *newcrush = NULL;
 	struct ceph_fsid fsid;
 	u32 epoch = 0;
 	struct ceph_timespec modified;
@@ -1414,12 +1459,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 	/* new crush? */
 	ceph_decode_32_safe(p, end, len, e_inval);
 	if (len > 0) {
-		newcrush = crush_decode(*p, min(*p+len, end));
-		if (IS_ERR(newcrush)) {
-			err = PTR_ERR(newcrush);
-			newcrush = NULL;
+		err = osdmap_set_crush(map,
+				       crush_decode(*p, min(*p + len, end)));
+		if (err)
 			goto bad;
-		}
 		*p += len;
 	}
 
@@ -1439,12 +1482,6 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
 
 	map->epoch++;
 	map->modified = modified;
-	if (newcrush) {
-		if (map->crush)
-			crush_destroy(map->crush);
-		map->crush = newcrush;
-		newcrush = NULL;
-	}
 
 	/* new_pools */
 	err = decode_new_pools(p, end, map);
@@ -1505,8 +1542,6 @@ bad:
 	print_hex_dump(KERN_DEBUG, "osdmap: ",
 		       DUMP_PREFIX_OFFSET, 16, 1,
 		       start, end - start, true);
-	if (newcrush)
-		crush_destroy(newcrush);
 	return ERR_PTR(err);
 }
 
@@ -1942,10 +1977,10 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 
 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
 
-	mutex_lock(&map->crush_scratch_mutex);
+	mutex_lock(&map->crush_workspace_mutex);
 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-			  weight, weight_max, map->crush_scratch_ary);
-	mutex_unlock(&map->crush_scratch_mutex);
+			  weight, weight_max, map->crush_workspace);
+	mutex_unlock(&map->crush_workspace_mutex);
 
 	return r;
 }
@@ -1978,8 +2013,14 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
 		return;
 	}
 
-	len = do_crush(osdmap, ruleno, pps, raw->osds,
-		       min_t(int, pi->size, ARRAY_SIZE(raw->osds)),
+	if (pi->size > ARRAY_SIZE(raw->osds)) {
+		pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n",
+		       pi->id, pi->crush_ruleset, pi->type, pi->size,
+		       ARRAY_SIZE(raw->osds));
+		return;
+	}
+
+	len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size,
 		       osdmap->osd_weight, osdmap->max_osd);
 	if (len < 0) {
 		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
diff --git a/net/ceph/snapshot.c b/net/ceph/snapshot.c
index 154683f5f14c..705414e78ae0 100644
--- a/net/ceph/snapshot.c
+++ b/net/ceph/snapshot.c
@@ -18,8 +18,6 @@
  * 02110-1301, USA.
  */
 
-#include <stddef.h>
-
 #include <linux/types.h>
 #include <linux/export.h>
 #include <linux/ceph/libceph.h>