summary refs log tree commit diff
path: root/net
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2020-10-21 10:34:10 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2020-10-21 10:34:10 -0700
commited7cfefe4443dcc811e84b345a3fb122eeb47661 (patch)
treeab5458ee4134916e32312580448aa151ff95000b /net
parentc4d6fe7311762f2e03b3c27ad38df7c40c80cc93 (diff)
parent28e1581c3b4ea5f98530064a103c6217bedeea73 (diff)
downloadlinux-ed7cfefe4443dcc811e84b345a3fb122eeb47661.tar.gz
Merge tag 'ceph-for-5.10-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:

 - a patch that removes crush_workspace_mutex (myself). CRUSH
   computations are no longer serialized and can run in parallel.

 - a couple new filesystem client metrics for "ceph fs top" command
   (Xiubo Li)

 - a fix for a very old messenger bug that affected the filesystem,
   marked for stable (myself)

 - assorted fixups and cleanups throughout the codebase from Jeff and
   others.

* tag 'ceph-for-5.10-rc1' of git://github.com/ceph/ceph-client: (27 commits)
  libceph: clear con->out_msg on Policy::stateful_server faults
  libceph: format ceph_entity_addr nonces as unsigned
  libceph: fix ENTITY_NAME format suggestion
  libceph: move a dout in queue_con_delay()
  ceph: comment cleanups and clarifications
  ceph: break up send_cap_msg
  ceph: drop separate mdsc argument from __send_cap
  ceph: promote to unsigned long long before shifting
  ceph: don't SetPageError on readpage errors
  ceph: mark ceph_fmt_xattr() as printf-like for better type checking
  ceph: fold ceph_update_writeable_page into ceph_write_begin
  ceph: fold ceph_sync_writepages into writepage_nounlock
  ceph: fold ceph_sync_readpages into ceph_readpage
  ceph: don't call ceph_update_writeable_page from page_mkwrite
  ceph: break out writeback of incompatible snap context to separate function
  ceph: add a note explaining session reject error string
  libceph: switch to the new "osd blocklist add" command
  libceph, rbd, ceph: "blacklist" -> "blocklist"
  ceph: have ceph_writepages_start call pagevec_lookup_range_tag
  ceph: use kill_anon_super helper
  ...
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c13
-rw-r--r--net/ceph/mon_client.c69
-rw-r--r--net/ceph/osdmap.c166
3 files changed, 213 insertions, 35 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d4d7a0e52491..af0f1fa24937 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2016,11 +2016,11 @@ static int process_banner(struct ceph_connection *con)
 		   sizeof(con->peer_addr)) != 0 &&
 	    !(addr_is_blank(&con->actual_peer_addr) &&
 	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-		pr_warn("wrong peer, want %s/%d, got %s/%d\n",
+		pr_warn("wrong peer, want %s/%u, got %s/%u\n",
 			ceph_pr_addr(&con->peer_addr),
-			(int)le32_to_cpu(con->peer_addr.nonce),
+			le32_to_cpu(con->peer_addr.nonce),
 			ceph_pr_addr(&con->actual_peer_addr),
-			(int)le32_to_cpu(con->actual_peer_addr.nonce));
+			le32_to_cpu(con->actual_peer_addr.nonce));
 		con->error_msg = "wrong peer at address";
 		return -1;
 	}
@@ -2811,13 +2811,13 @@ static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
 		return -ENOENT;
 	}
 
+	dout("%s %p %lu\n", __func__, con, delay);
 	if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
 		dout("%s %p - already queued\n", __func__, con);
 		con->ops->put(con);
 		return -EBUSY;
 	}
 
-	dout("%s %p %lu\n", __func__, con, delay);
 	return 0;
 }
 
@@ -2998,6 +2998,11 @@ static void con_fault(struct ceph_connection *con)
 		ceph_msg_put(con->in_msg);
 		con->in_msg = NULL;
 	}
+	if (con->out_msg) {
+		BUG_ON(con->out_msg->con != con);
+		ceph_msg_put(con->out_msg);
+		con->out_msg = NULL;
+	}
 
 	/* Requeue anything that hasn't been acked */
 	list_splice_init(&con->out_sent, &con->out_queue);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index d633a0aeaa55..c4cf2529d08b 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -896,8 +896,9 @@ bad:
 	ceph_msg_dump(msg);
 }
 
-int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
-			    struct ceph_entity_addr *client_addr)
+static __printf(2, 0)
+int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
+			 va_list ap)
 {
 	struct ceph_mon_generic_request *req;
 	struct ceph_mon_command *h;
@@ -925,29 +926,65 @@ int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
 	h->monhdr.session_mon_tid = 0;
 	h->fsid = monc->monmap->fsid;
 	h->num_strs = cpu_to_le32(1);
-	len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
-		                 \"blacklistop\": \"add\", \
-				 \"addr\": \"%pISpc/%u\" }",
-		      &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
+	len = vsprintf(h->str, fmt, ap);
 	h->str_len = cpu_to_le32(len);
 	send_generic_request(monc, req);
 	mutex_unlock(&monc->mutex);
 
 	ret = wait_generic_request(req);
-	if (!ret)
-		/*
-		 * Make sure we have the osdmap that includes the blacklist
-		 * entry.  This is needed to ensure that the OSDs pick up the
-		 * new blacklist before processing any future requests from
-		 * this client.
-		 */
-		ret = ceph_wait_for_latest_osdmap(monc->client, 0);
-
 out:
 	put_generic_request(req);
 	return ret;
 }
-EXPORT_SYMBOL(ceph_monc_blacklist_add);
+
+static __printf(2, 3)
+int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
+{
+	va_list ap;
+	int ret;
+
+	va_start(ap, fmt);
+	ret = do_mon_command_vargs(monc, fmt, ap);
+	va_end(ap);
+	return ret;
+}
+
+int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
+			    struct ceph_entity_addr *client_addr)
+{
+	int ret;
+
+	ret = do_mon_command(monc,
+			     "{ \"prefix\": \"osd blocklist\", \
+				\"blocklistop\": \"add\", \
+				\"addr\": \"%pISpc/%u\" }",
+			     &client_addr->in_addr,
+			     le32_to_cpu(client_addr->nonce));
+	if (ret == -EINVAL) {
+		/*
+		 * The monitor returns EINVAL on an unrecognized command.
+		 * Try the legacy command -- it is exactly the same except
+		 * for the name.
+		 */
+		ret = do_mon_command(monc,
+				     "{ \"prefix\": \"osd blacklist\", \
+					\"blacklistop\": \"add\", \
+					\"addr\": \"%pISpc/%u\" }",
+				     &client_addr->in_addr,
+				     le32_to_cpu(client_addr->nonce));
+	}
+	if (ret)
+		return ret;
+
+	/*
+	 * Make sure we have the osdmap that includes the blocklist
+	 * entry.  This is needed to ensure that the OSDs pick up the
+	 * new blocklist before processing any future requests from
+	 * this client.
+	 */
+	return ceph_wait_for_latest_osdmap(monc->client, 0);
+}
+EXPORT_SYMBOL(ceph_monc_blocklist_add);
 
 /*
  * Resend pending generic requests.
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 96c25f5e064a..fa08c15be0c0 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -965,6 +965,143 @@ bad:
 }
 
 /*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+	struct crush_work *work;
+	size_t work_size;
+
+	WARN_ON(!c->working_size);
+	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+	dout("%s work_size %zu bytes\n", __func__, work_size);
+
+	work = ceph_kvmalloc(work_size, GFP_NOIO);
+	if (!work)
+		return NULL;
+
+	INIT_LIST_HEAD(&work->item);
+	crush_init_workspace(c, work);
+	return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+	WARN_ON(!list_empty(&work->item));
+	kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+	INIT_LIST_HEAD(&wsm->idle_ws);
+	spin_lock_init(&wsm->ws_lock);
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+	init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+				  struct crush_work *work)
+{
+	WARN_ON(!list_empty(&wsm->idle_ws));
+
+	list_add(&work->item, &wsm->idle_ws);
+	atomic_set(&wsm->total_ws, 1);
+	wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+	struct crush_work *work;
+
+	while (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		free_workspace(work);
+	}
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one.  If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+					const struct crush_map *c)
+{
+	struct crush_work *work;
+	int cpus = num_online_cpus();
+
+again:
+	spin_lock(&wsm->ws_lock);
+	if (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		wsm->free_ws--;
+		spin_unlock(&wsm->ws_lock);
+		return work;
+
+	}
+	if (atomic_read(&wsm->total_ws) > cpus) {
+		DEFINE_WAIT(wait);
+
+		spin_unlock(&wsm->ws_lock);
+		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+			schedule();
+		finish_wait(&wsm->ws_wait, &wait);
+		goto again;
+	}
+	atomic_inc(&wsm->total_ws);
+	spin_unlock(&wsm->ws_lock);
+
+	work = alloc_workspace(c);
+	if (!work) {
+		atomic_dec(&wsm->total_ws);
+		wake_up(&wsm->ws_wait);
+
+		/*
+		 * Do not return the error but go back to waiting.  We
+		 * have the inital workspace and the CRUSH computation
+		 * time is bounded so we will get it eventually.
+		 */
+		WARN_ON(atomic_read(&wsm->total_ws) < 1);
+		goto again;
+	}
+	return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+			  struct crush_work *work)
+{
+	spin_lock(&wsm->ws_lock);
+	if (wsm->free_ws <= num_online_cpus()) {
+		list_add(&work->item, &wsm->idle_ws);
+		wsm->free_ws++;
+		spin_unlock(&wsm->ws_lock);
+		goto wake;
+	}
+	spin_unlock(&wsm->ws_lock);
+
+	free_workspace(work);
+	atomic_dec(&wsm->total_ws);
+wake:
+	if (wq_has_sleeper(&wsm->ws_wait))
+		wake_up(&wsm->ws_wait);
+}
+
+/*
  * osd map
  */
 struct ceph_osdmap *ceph_osdmap_alloc(void)
@@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 	map->primary_temp = RB_ROOT;
 	map->pg_upmap = RB_ROOT;
 	map->pg_upmap_items = RB_ROOT;
-	mutex_init(&map->crush_workspace_mutex);
+
+	init_workspace_manager(&map->crush_wsm);
 
 	return map;
 }
@@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 void ceph_osdmap_destroy(struct ceph_osdmap *map)
 {
 	dout("osdmap_destroy %p\n", map);
+
 	if (map->crush)
 		crush_destroy(map->crush);
+	cleanup_workspace_manager(&map->crush_wsm);
+
 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
 		struct ceph_pg_mapping *pg =
 			rb_entry(rb_first(&map->pg_temp),
@@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
 	kvfree(map->osd_weight);
 	kvfree(map->osd_addr);
 	kvfree(map->osd_primary_affinity);
-	kvfree(map->crush_workspace);
 	kfree(map);
 }
 
@@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
 
 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 {
-	void *workspace;
-	size_t work_size;
+	struct crush_work *work;
 
 	if (IS_ERR(crush))
 		return PTR_ERR(crush);
 
-	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
-	dout("%s work_size %zu bytes\n", __func__, work_size);
-	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
-	if (!workspace) {
+	work = alloc_workspace(crush);
+	if (!work) {
 		crush_destroy(crush);
 		return -ENOMEM;
 	}
-	crush_init_workspace(crush, workspace);
 
 	if (map->crush)
 		crush_destroy(map->crush);
-	kvfree(map->crush_workspace);
+	cleanup_workspace_manager(&map->crush_wsm);
 	map->crush = crush;
-	map->crush_workspace = workspace;
+	add_initial_workspace(&map->crush_wsm, work);
 	return 0;
 }
 
@@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		    s64 choose_args_index)
 {
 	struct crush_choose_arg_map *arg_map;
+	struct crush_work *work;
 	int r;
 
 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
 						CEPH_DEFAULT_CHOOSE_ARGS);
 
-	mutex_lock(&map->crush_workspace_mutex);
+	work = get_workspace(&map->crush_wsm, map->crush);
 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-			  weight, weight_max, map->crush_workspace,
+			  weight, weight_max, work,
 			  arg_map ? arg_map->args : NULL);
-	mutex_unlock(&map->crush_workspace_mutex);
-
+	put_workspace(&map->crush_wsm, work);
 	return r;
 }