summary refs log tree commit diff
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2020-08-12 12:51:31 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2020-08-12 12:51:31 -0700
commit7c2a69f610e64c8dec6a06a66e721f4ce1dd783a (patch)
tree3ba35aaae571f7ea9315ba62e58ea0d7814dfd0b /fs
parent7a02c8d45bbf65cf432292c2032147fa7529de58 (diff)
parent02e37571f9e79022498fd0525c073b07e9d9ac69 (diff)
downloadlinux-7c2a69f610e64c8dec6a06a66e721f4ce1dd783a.tar.gz
Merge tag 'ceph-for-5.9-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "Xiubo has completed his work on filesystem client metrics, they are
  sent to all available MDSes once per second now.

  Other than that, we have a lot of fixes and cleanups all around the
  filesystem, including a tweak to cut down on MDS request resends in
  multi-MDS setups from Yanhu and fixups for SELinux symlink labeling
  and MClientSession message decoding from Jeff"

* tag 'ceph-for-5.9-rc1' of git://github.com/ceph/ceph-client: (22 commits)
  ceph: handle zero-length feature mask in session messages
  ceph: use frag's MDS in either mode
  ceph: move sb->wb_pagevec_pool to be a global mempool
  ceph: set sec_context xattr on symlink creation
  ceph: remove redundant initialization of variable mds
  ceph: fix use-after-free for fsc->mdsc
  ceph: remove unused variables in ceph_mdsmap_decode()
  ceph: delete repeated words in fs/ceph/
  ceph: send client provided metric flags in client metadata
  ceph: periodically send perf metrics to MDSes
  ceph: check the sesion state and return false in case it is closed
  libceph: replace HTTP links with HTTPS ones
  ceph: remove unnecessary cast in kfree()
  libceph: just have osd_req_op_init() return a pointer
  ceph: do not access the kiocb after aio requests
  ceph: clean up and optimize ceph_check_delayed_caps()
  ceph: fix potential mdsc use-after-free crash
  ceph: switch to WARN_ON_ONCE in encode_supported_features()
  ceph: add global total_caps to count the mdsc's total caps number
  ceph: add check_session_state() helper and make it global
  ...
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/Kconfig2
-rw-r--r--fs/ceph/addr.c23
-rw-r--r--fs/ceph/caps.c12
-rw-r--r--fs/ceph/debugfs.c16
-rw-r--r--fs/ceph/dir.c4
-rw-r--r--fs/ceph/file.c5
-rw-r--r--fs/ceph/mds_client.c184
-rw-r--r--fs/ceph/mds_client.h7
-rw-r--r--fs/ceph/mdsmap.c10
-rw-r--r--fs/ceph/metric.c149
-rw-r--r--fs/ceph/metric.h91
-rw-r--r--fs/ceph/super.c64
-rw-r--r--fs/ceph/super.h6
-rw-r--r--fs/ceph/xattr.c12
14 files changed, 482 insertions, 103 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index cf235f6eacf9..471e40156065 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -13,7 +13,7 @@ config CEPH_FS
 	  scalable file system designed to provide high performance,
 	  reliable access to petabytes of storage.
 
-	  More information at http://ceph.newdream.net/.
+	  More information at https://ceph.io/.
 
 	  If unsure, say N.
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 01ad09733ac7..6ea761c84494 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -862,8 +862,7 @@ static void writepages_finish(struct ceph_osd_request *req)
 
 	osd_data = osd_req_op_extent_osd_data(req, 0);
 	if (osd_data->pages_from_pool)
-		mempool_free(osd_data->pages,
-			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
+		mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
 	else
 		kfree(osd_data->pages);
 	ceph_osdc_put_request(req);
@@ -955,10 +954,10 @@ retry:
 		int num_ops = 0, op_idx;
 		unsigned i, pvec_pages, max_pages, locked_pages = 0;
 		struct page **pages = NULL, **data_pages;
-		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
 		struct page *page;
 		pgoff_t strip_unit_end = 0;
 		u64 offset = 0, len = 0;
+		bool from_pool = false;
 
 		max_pages = wsize >> PAGE_SHIFT;
 
@@ -1057,16 +1056,16 @@ get_more_pages:
 						      sizeof(*pages),
 						      GFP_NOFS);
 				if (!pages) {
-					pool = fsc->wb_pagevec_pool;
-					pages = mempool_alloc(pool, GFP_NOFS);
+					from_pool = true;
+					pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
 					BUG_ON(!pages);
 				}
 
 				len = 0;
 			} else if (page->index !=
 				   (offset + len) >> PAGE_SHIFT) {
-				if (num_ops >= (pool ?  CEPH_OSD_SLAB_OPS :
-							CEPH_OSD_MAX_OPS)) {
+				if (num_ops >= (from_pool ?  CEPH_OSD_SLAB_OPS :
+							     CEPH_OSD_MAX_OPS)) {
 					redirty_page_for_writepage(wbc, page);
 					unlock_page(page);
 					break;
@@ -1161,7 +1160,7 @@ new_request:
 				     offset, len);
 				osd_req_op_extent_osd_data_pages(req, op_idx,
 							data_pages, len, 0,
-							!!pool, false);
+							from_pool, false);
 				osd_req_op_extent_update(req, op_idx, len);
 
 				len = 0;
@@ -1188,12 +1187,12 @@ new_request:
 		dout("writepages got pages at %llu~%llu\n", offset, len);
 
 		osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
-						 0, !!pool, false);
+						 0, from_pool, false);
 		osd_req_op_extent_update(req, op_idx, len);
 
 		BUG_ON(op_idx + 1 != req->r_num_ops);
 
-		pool = NULL;
+		from_pool = false;
 		if (i < locked_pages) {
 			BUG_ON(num_ops <= req->r_num_ops);
 			num_ops -= req->r_num_ops;
@@ -1204,8 +1203,8 @@ new_request:
 			pages = kmalloc_array(locked_pages, sizeof(*pages),
 					      GFP_NOFS);
 			if (!pages) {
-				pool = fsc->wb_pagevec_pool;
-				pages = mempool_alloc(pool, GFP_NOFS);
+				from_pool = true;
+				pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
 				BUG_ON(!pages);
 			}
 			memcpy(pages, data_pages + i,
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 972c13aa4225..55ccccf77cea 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -668,6 +668,7 @@ void ceph_add_cap(struct inode *inode,
 		spin_lock(&session->s_cap_lock);
 		list_add_tail(&cap->session_caps, &session->s_caps);
 		session->s_nr_caps++;
+		atomic64_inc(&mdsc->metric.total_caps);
 		spin_unlock(&session->s_cap_lock);
 	} else {
 		spin_lock(&session->s_cap_lock);
@@ -1161,6 +1162,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 	} else {
 		list_del_init(&cap->session_caps);
 		session->s_nr_caps--;
+		atomic64_dec(&mdsc->metric.total_caps);
 		cap->session = NULL;
 		removed = 1;
 	}
@@ -4187,10 +4189,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 	struct ceph_inode_info *ci;
 
 	dout("check_delayed_caps\n");
-	while (1) {
-		spin_lock(&mdsc->cap_delay_lock);
-		if (list_empty(&mdsc->cap_delay_list))
-			break;
+	spin_lock(&mdsc->cap_delay_lock);
+	while (!list_empty(&mdsc->cap_delay_list)) {
 		ci = list_first_entry(&mdsc->cap_delay_list,
 				      struct ceph_inode_info,
 				      i_cap_delay_list);
@@ -4200,13 +4200,13 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 		list_del_init(&ci->i_cap_delay_list);
 
 		inode = igrab(&ci->vfs_inode);
-		spin_unlock(&mdsc->cap_delay_lock);
-
 		if (inode) {
+			spin_unlock(&mdsc->cap_delay_lock);
 			dout("check_delayed_caps on %p\n", inode);
 			ceph_check_caps(ci, 0, NULL);
 			/* avoid calling iput_final() in tick thread */
 			ceph_async_iput(inode);
+			spin_lock(&mdsc->cap_delay_lock);
 		}
 	}
 	spin_unlock(&mdsc->cap_delay_lock);
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 070ed8481340..97539b497e4c 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -145,7 +145,7 @@ static int metric_show(struct seq_file *s, void *p)
 	struct ceph_fs_client *fsc = s->private;
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_client_metric *m = &mdsc->metric;
-	int i, nr_caps = 0;
+	int nr_caps = 0;
 	s64 total, sum, avg, min, max, sq;
 
 	seq_printf(s, "item          total       avg_lat(us)     min_lat(us)     max_lat(us)     stdev(us)\n");
@@ -190,17 +190,7 @@ static int metric_show(struct seq_file *s, void *p)
 		   percpu_counter_sum(&m->d_lease_mis),
 		   percpu_counter_sum(&m->d_lease_hit));
 
-	mutex_lock(&mdsc->mutex);
-	for (i = 0; i < mdsc->max_sessions; i++) {
-		struct ceph_mds_session *s;
-
-		s = __ceph_lookup_mds_session(mdsc, i);
-		if (!s)
-			continue;
-		nr_caps += s->s_nr_caps;
-		ceph_put_mds_session(s);
-	}
-	mutex_unlock(&mdsc->mutex);
+	nr_caps = atomic64_read(&m->total_caps);
 	seq_printf(s, "%-14s%-16d%-16lld%lld\n", "caps", nr_caps,
 		   percpu_counter_sum(&m->i_caps_mis),
 		   percpu_counter_sum(&m->i_caps_hit));
@@ -272,7 +262,7 @@ static int mds_sessions_show(struct seq_file *s, void *ptr)
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_auth_client *ac = fsc->client->monc.auth;
 	struct ceph_options *opt = fsc->client->options;
-	int mds = -1;
+	int mds;
 
 	mutex_lock(&mdsc->mutex);
 
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 39f5311404b0..060bdcc5ce32 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -930,6 +930,10 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
 	req->r_num_caps = 2;
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+	if (as_ctx.pagelist) {
+		req->r_pagelist = as_ctx.pagelist;
+		as_ctx.pagelist = NULL;
+	}
 	err = ceph_mdsc_do_request(mdsc, dir, req);
 	if (!err && !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 160644ddaeed..d51c3f2fdca0 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1538,6 +1538,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
 	struct inode *inode = file_inode(filp);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct page *pinned_page = NULL;
+	bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
 	ssize_t ret;
 	int want, got = 0;
 	int retry_op = 0, read = 0;
@@ -1546,7 +1547,7 @@ again:
 	dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
 	     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
 
-	if (iocb->ki_flags & IOCB_DIRECT)
+	if (direct_lock)
 		ceph_start_io_direct(inode);
 	else
 		ceph_start_io_read(inode);
@@ -1603,7 +1604,7 @@ again:
 	}
 	ceph_put_cap_refs(ci, got);
 
-	if (iocb->ki_flags & IOCB_DIRECT)
+	if (direct_lock)
 		ceph_end_io_direct(inode);
 	else
 		ceph_end_io_read(inode);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a50497142e59..4a26862d7667 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1103,8 +1103,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 				     frag.frag, mds);
 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
 				    CEPH_MDS_STATE_ACTIVE) {
-					if (mode == USE_ANY_MDS &&
-					    !ceph_mdsmap_is_laggy(mdsc->mdsmap,
+					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
 								  mds))
 						goto out;
 				}
@@ -1168,7 +1167,7 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 
 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
-static void encode_supported_features(void **p, void *end)
+static int encode_supported_features(void **p, void *end)
 {
 	static const size_t count = ARRAY_SIZE(feature_bits);
 
@@ -1176,16 +1175,64 @@ static void encode_supported_features(void **p, void *end)
 		size_t i;
 		size_t size = FEATURE_BYTES(count);
 
-		BUG_ON(*p + 4 + size > end);
+		if (WARN_ON_ONCE(*p + 4 + size > end))
+			return -ERANGE;
+
 		ceph_encode_32(p, size);
 		memset(*p, 0, size);
 		for (i = 0; i < count; i++)
 			((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
 		*p += size;
 	} else {
-		BUG_ON(*p + 4 > end);
+		if (WARN_ON_ONCE(*p + 4 > end))
+			return -ERANGE;
+
 		ceph_encode_32(p, 0);
 	}
+
+	return 0;
+}
+
+static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
+#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
+static int encode_metric_spec(void **p, void *end)
+{
+	static const size_t count = ARRAY_SIZE(metric_bits);
+
+	/* header */
+	if (WARN_ON_ONCE(*p + 2 > end))
+		return -ERANGE;
+
+	ceph_encode_8(p, 1); /* version */
+	ceph_encode_8(p, 1); /* compat */
+
+	if (count > 0) {
+		size_t i;
+		size_t size = METRIC_BYTES(count);
+
+		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
+			return -ERANGE;
+
+		/* metric spec info length */
+		ceph_encode_32(p, 4 + size);
+
+		/* metric spec */
+		ceph_encode_32(p, size);
+		memset(*p, 0, size);
+		for (i = 0; i < count; i++)
+			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
+		*p += size;
+	} else {
+		if (WARN_ON_ONCE(*p + 4 + 4 > end))
+			return -ERANGE;
+
+		/* metric spec info length */
+		ceph_encode_32(p, 4);
+		/* metric spec */
+		ceph_encode_32(p, 0);
+	}
+
+	return 0;
 }
 
 /*
@@ -1203,6 +1250,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
 	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
 	size_t size, count;
 	void *p, *end;
+	int ret;
 
 	const char* metadata[][2] = {
 		{"hostname", mdsc->nodename},
@@ -1227,12 +1275,19 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
 		size = FEATURE_BYTES(count);
 	extra_bytes += 4 + size;
 
+	/* metric spec */
+	size = 0;
+	count = ARRAY_SIZE(metric_bits);
+	if (count > 0)
+		size = METRIC_BYTES(count);
+	extra_bytes += 2 + 4 + 4 + size;
+
 	/* Allocate the message */
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
 			   GFP_NOFS, false);
 	if (!msg) {
 		pr_err("create_session_msg ENOMEM creating msg\n");
-		return NULL;
+		return ERR_PTR(-ENOMEM);
 	}
 	p = msg->front.iov_base;
 	end = p + msg->front.iov_len;
@@ -1245,9 +1300,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
 	 * Serialize client metadata into waiting buffer space, using
 	 * the format that userspace expects for map<string, string>
 	 *
-	 * ClientSession messages with metadata are v3
+	 * ClientSession messages with metadata are v4
 	 */
-	msg->hdr.version = cpu_to_le16(3);
+	msg->hdr.version = cpu_to_le16(4);
 	msg->hdr.compat_version = cpu_to_le16(1);
 
 	/* The write pointer, following the session_head structure */
@@ -1269,7 +1324,20 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
 		p += val_len;
 	}
 
-	encode_supported_features(&p, end);
+	ret = encode_supported_features(&p, end);
+	if (ret) {
+		pr_err("encode_supported_features failed!\n");
+		ceph_msg_put(msg);
+		return ERR_PTR(ret);
+	}
+
+	ret = encode_metric_spec(&p, end);
+	if (ret) {
+		pr_err("encode_metric_spec failed!\n");
+		ceph_msg_put(msg);
+		return ERR_PTR(ret);
+	}
+
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
@@ -1297,8 +1365,8 @@ static int __open_session(struct ceph_mds_client *mdsc,
 
 	/* send connect message */
 	msg = create_session_open_msg(mdsc, session->s_seq);
-	if (!msg)
-		return -ENOMEM;
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
 	ceph_con_send(&session->s_con, msg);
 	return 0;
 }
@@ -1312,6 +1380,7 @@ static struct ceph_mds_session *
 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
 {
 	struct ceph_mds_session *session;
+	int ret;
 
 	session = __ceph_lookup_mds_session(mdsc, target);
 	if (!session) {
@@ -1320,8 +1389,11 @@ __open_export_target_session(struct ceph_mds_client *mdsc, int target)
 			return session;
 	}
 	if (session->s_state == CEPH_MDS_SESSION_NEW ||
-	    session->s_state == CEPH_MDS_SESSION_CLOSING)
-		__open_session(mdsc, session);
+	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
+		ret = __open_session(mdsc, session);
+		if (ret)
+			return ERR_PTR(ret);
+	}
 
 	return session;
 }
@@ -1485,6 +1557,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
 			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
+			atomic64_dec(&session->s_mdsc->metric.total_caps);
 			if (cap->queue_release)
 				__ceph_queue_cap_release(session, cap);
 			else
@@ -1785,8 +1858,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
 /*
  * send a session close request
  */
-static int request_close_session(struct ceph_mds_client *mdsc,
-				 struct ceph_mds_session *session)
+static int request_close_session(struct ceph_mds_session *session)
 {
 	struct ceph_msg *msg;
 
@@ -1809,7 +1881,7 @@ static int __close_session(struct ceph_mds_client *mdsc,
 	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
 		return 0;
 	session->s_state = CEPH_MDS_SESSION_CLOSING;
-	return request_close_session(mdsc, session);
+	return request_close_session(session);
 }
 
 static bool drop_negative_children(struct dentry *dentry)
@@ -2520,7 +2592,12 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 		ceph_encode_copy(&p, &ts, sizeof(ts));
 	}
 
-	BUG_ON(p > end);
+	if (WARN_ON_ONCE(p > end)) {
+		ceph_msg_put(msg);
+		msg = ERR_PTR(-ERANGE);
+		goto out_free2;
+	}
+
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
@@ -2756,7 +2833,9 @@ static void __do_request(struct ceph_mds_client *mdsc,
 		}
 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
 		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
-			__open_session(mdsc, session);
+			err = __open_session(mdsc, session);
+			if (err)
+				goto out_session;
 			/* retry the same mds later */
 			if (random)
 				req->r_resend_mds = mds;
@@ -3279,8 +3358,10 @@ static void handle_session(struct ceph_mds_session *session,
 			goto bad;
 		/* version >= 3, feature bits */
 		ceph_decode_32_safe(&p, end, len, bad);
-		ceph_decode_64_safe(&p, end, features, bad);
-		p += len - sizeof(features);
+		if (len) {
+			ceph_decode_64_safe(&p, end, features, bad);
+			p += len - sizeof(features);
+		}
 	}
 
 	mutex_lock(&mdsc->mutex);
@@ -3310,6 +3391,8 @@ static void handle_session(struct ceph_mds_session *session,
 		session->s_state = CEPH_MDS_SESSION_OPEN;
 		session->s_features = features;
 		renewed_caps(mdsc, session, 0);
+		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
+			metric_schedule_delayed(&mdsc->metric);
 		wake = 1;
 		if (mdsc->stopping)
 			__close_session(mdsc, session);
@@ -4263,6 +4346,30 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc)
 	ceph_force_reconnect(fsc->sb);
 }
 
+bool check_session_state(struct ceph_mds_session *s)
+{
+	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
+		dout("resending session close request for mds%d\n",
+				s->s_mds);
+		request_close_session(s);
+		return false;
+	}
+	if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
+		if (s->s_state == CEPH_MDS_SESSION_OPEN) {
+			s->s_state = CEPH_MDS_SESSION_HUNG;
+			pr_info("mds%d hung\n", s->s_mds);
+		}
+	}
+	if (s->s_state == CEPH_MDS_SESSION_NEW ||
+	    s->s_state == CEPH_MDS_SESSION_RESTARTING ||
+	    s->s_state == CEPH_MDS_SESSION_CLOSED ||
+	    s->s_state == CEPH_MDS_SESSION_REJECTED)
+		/* this mds is failed or recovering, just wait */
+		return false;
+
+	return true;
+}
+
 /*
  * delayed work -- periodically trim expired leases, renew caps with mds
  */
@@ -4283,6 +4390,9 @@ static void delayed_work(struct work_struct *work)
 
 	dout("mdsc delayed_work\n");
 
+	if (mdsc->stopping)
+		return;
+
 	mutex_lock(&mdsc->mutex);
 	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
 	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
@@ -4294,23 +4404,8 @@ static void delayed_work(struct work_struct *work)
 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
 		if (!s)
 			continue;
-		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
-			dout("resending session close request for mds%d\n",
-			     s->s_mds);
-			request_close_session(mdsc, s);
-			ceph_put_mds_session(s);
-			continue;
-		}
-		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
-			if (s->s_state == CEPH_MDS_SESSION_OPEN) {
-				s->s_state = CEPH_MDS_SESSION_HUNG;
-				pr_info("mds%d hung\n", s->s_mds);
-			}
-		}
-		if (s->s_state == CEPH_MDS_SESSION_NEW ||
-		    s->s_state == CEPH_MDS_SESSION_RESTARTING ||
-		    s->s_state == CEPH_MDS_SESSION_REJECTED) {
-			/* this mds is failed or recovering, just wait */
+
+		if (!check_session_state(s)) {
 			ceph_put_mds_session(s);
 			continue;
 		}
@@ -4359,7 +4454,6 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 		goto err_mdsc;
 	}
 
-	fsc->mdsc = mdsc;
 	init_completion(&mdsc->safe_umount_waiters);
 	init_waitqueue_head(&mdsc->session_close_wq);
 	INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -4414,6 +4508,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 
 	strscpy(mdsc->nodename, utsname()->nodename,
 		sizeof(mdsc->nodename));
+
+	fsc->mdsc = mdsc;
 	return 0;
 
 err_mdsmap:
@@ -4657,7 +4753,16 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
 {
 	dout("stop\n");
-	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
+	/*
+	 * Make sure the delayed work stopped before releasing
+	 * the resources.
+	 *
+	 * Because the cancel_delayed_work_sync() will only
+	 * guarantee that the work finishes executing. But the
+	 * delayed work will re-arm itself again after that.
+	 */
+	flush_delayed_work(&mdsc->delayed_work);
+
 	if (mdsc->mdsmap)
 		ceph_mdsmap_destroy(mdsc->mdsmap);
 	kfree(mdsc->sessions);
@@ -4680,6 +4785,7 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
 
 	ceph_metric_destroy(&mdsc->metric);
 
+	flush_delayed_work(&mdsc->metric.delayed_work);
 	fsc->mdsc = NULL;
 	kfree(mdsc);
 	dout("mdsc_destroy %p done\n", mdsc);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 5e0c4073a6be..bc9e95937d7c 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -18,6 +18,7 @@
 #include <linux/ceph/auth.h>
 
 #include "metric.h"
+#include "super.h"
 
 /* The first 8 bits are reserved for old ceph releases */
 enum ceph_feature_type {
@@ -27,8 +28,9 @@ enum ceph_feature_type {
 	CEPHFS_FEATURE_LAZY_CAP_WANTED,
 	CEPHFS_FEATURE_MULTI_RECONNECT,
 	CEPHFS_FEATURE_DELEG_INO,
+	CEPHFS_FEATURE_METRIC_COLLECT,
 
-	CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_DELEG_INO,
+	CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_METRIC_COLLECT,
 };
 
 /*
@@ -42,6 +44,7 @@ enum ceph_feature_type {
 	CEPHFS_FEATURE_LAZY_CAP_WANTED,		\
 	CEPHFS_FEATURE_MULTI_RECONNECT,		\
 	CEPHFS_FEATURE_DELEG_INO,		\
+	CEPHFS_FEATURE_METRIC_COLLECT,		\
 						\
 	CEPHFS_FEATURE_MAX,			\
 }
@@ -476,6 +479,8 @@ struct ceph_mds_client {
 
 extern const char *ceph_mds_op_name(int op);
 
+extern bool check_session_state(struct ceph_mds_session *s);
+
 extern struct ceph_mds_session *
 __ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
 
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 889627817e52..e4aba6c6d3b5 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -120,7 +120,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 	const void *start = *p;
 	int i, j, n;
 	int err;
-	u8 mdsmap_v, mdsmap_cv;
+	u8 mdsmap_v;
 	u16 mdsmap_ev;
 
 	m = kzalloc(sizeof(*m), GFP_NOFS);
@@ -129,7 +129,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 
 	ceph_decode_need(p, end, 1 + 1, bad);
 	mdsmap_v = ceph_decode_8(p);
-	mdsmap_cv = ceph_decode_8(p);
+	*p += sizeof(u8);			/* mdsmap_cv */
 	if (mdsmap_v >= 4) {
 	       u32 mdsmap_len;
 	       ceph_decode_32_safe(p, end, mdsmap_len, bad);
@@ -174,7 +174,6 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 		u64 global_id;
 		u32 namelen;
 		s32 mds, inc, state;
-		u64 state_seq;
 		u8 info_v;
 		void *info_end = NULL;
 		struct ceph_entity_addr addr;
@@ -189,9 +188,8 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 		info_v= ceph_decode_8(p);
 		if (info_v >= 4) {
 			u32 info_len;
-			u8 info_cv;
 			ceph_decode_need(p, end, 1 + sizeof(u32), bad);
-			info_cv = ceph_decode_8(p);
+			*p += sizeof(u8);	/* info_cv */
 			info_len = ceph_decode_32(p);
 			info_end = *p + info_len;
 			if (info_end > end)
@@ -210,7 +208,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 		mds = ceph_decode_32(p);
 		inc = ceph_decode_32(p);
 		state = ceph_decode_32(p);
-		state_seq = ceph_decode_64(p);
+		*p += sizeof(u64);		/* state_seq */
 		err = ceph_decode_entity_addr(p, end, &addr);
 		if (err)
 			goto corrupt;
diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c
index 9217f35bc2b9..2466b261fba2 100644
--- a/fs/ceph/metric.c
+++ b/fs/ceph/metric.c
@@ -1,10 +1,150 @@
 /* SPDX-License-Identifier: GPL-2.0 */
+#include <linux/ceph/ceph_debug.h>
 
 #include <linux/types.h>
 #include <linux/percpu_counter.h>
 #include <linux/math64.h>
 
 #include "metric.h"
+#include "mds_client.h"
+
+static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *s)
+{
+	struct ceph_metric_head *head;
+	struct ceph_metric_cap *cap;
+	struct ceph_metric_read_latency *read;
+	struct ceph_metric_write_latency *write;
+	struct ceph_metric_metadata_latency *meta;
+	struct ceph_client_metric *m = &mdsc->metric;
+	u64 nr_caps = atomic64_read(&m->total_caps);
+	struct ceph_msg *msg;
+	struct timespec64 ts;
+	s64 sum;
+	s32 items = 0;
+	s32 len;
+
+	len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
+	      + sizeof(*meta);
+
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
+	if (!msg) {
+		pr_err("send metrics to mds%d, failed to allocate message\n",
+		       s->s_mds);
+		return false;
+	}
+
+	head = msg->front.iov_base;
+
+	/* encode the cap metric */
+	cap = (struct ceph_metric_cap *)(head + 1);
+	cap->type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO);
+	cap->ver = 1;
+	cap->compat = 1;
+	cap->data_len = cpu_to_le32(sizeof(*cap) - 10);
+	cap->hit = cpu_to_le64(percpu_counter_sum(&mdsc->metric.i_caps_hit));
+	cap->mis = cpu_to_le64(percpu_counter_sum(&mdsc->metric.i_caps_mis));
+	cap->total = cpu_to_le64(nr_caps);
+	items++;
+
+	/* encode the read latency metric */
+	read = (struct ceph_metric_read_latency *)(cap + 1);
+	read->type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
+	read->ver = 1;
+	read->compat = 1;
+	read->data_len = cpu_to_le32(sizeof(*read) - 10);
+	sum = m->read_latency_sum;
+	jiffies_to_timespec64(sum, &ts);
+	read->sec = cpu_to_le32(ts.tv_sec);
+	read->nsec = cpu_to_le32(ts.tv_nsec);
+	items++;
+
+	/* encode the write latency metric */
+	write = (struct ceph_metric_write_latency *)(read + 1);
+	write->type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
+	write->ver = 1;
+	write->compat = 1;
+	write->data_len = cpu_to_le32(sizeof(*write) - 10);
+	sum = m->write_latency_sum;
+	jiffies_to_timespec64(sum, &ts);
+	write->sec = cpu_to_le32(ts.tv_sec);
+	write->nsec = cpu_to_le32(ts.tv_nsec);
+	items++;
+
+	/* encode the metadata latency metric */
+	meta = (struct ceph_metric_metadata_latency *)(write + 1);
+	meta->type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
+	meta->ver = 1;
+	meta->compat = 1;
+	meta->data_len = cpu_to_le32(sizeof(*meta) - 10);
+	sum = m->metadata_latency_sum;
+	jiffies_to_timespec64(sum, &ts);
+	meta->sec = cpu_to_le32(ts.tv_sec);
+	meta->nsec = cpu_to_le32(ts.tv_nsec);
+	items++;
+
+	put_unaligned_le32(items, &head->num);
+	msg->front.iov_len = len;
+	msg->hdr.version = cpu_to_le16(1);
+	msg->hdr.compat_version = cpu_to_le16(1);
+	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+	dout("client%llu send metrics to mds%d\n",
+	     ceph_client_gid(mdsc->fsc->client), s->s_mds);
+	ceph_con_send(&s->s_con, msg);
+
+	return true;
+}
+
+
+static void metric_get_session(struct ceph_mds_client *mdsc)
+{
+	struct ceph_mds_session *s;
+	int i;
+
+	mutex_lock(&mdsc->mutex);
+	for (i = 0; i < mdsc->max_sessions; i++) {
+		s = __ceph_lookup_mds_session(mdsc, i);
+		if (!s)
+			continue;
+
+		/*
+		 * Skip it if MDS doesn't support the metric collection,
+		 * or the MDS will close the session's socket connection
+		 * directly when it get this message.
+		 */
+		if (check_session_state(s) &&
+		    test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &s->s_features)) {
+			mdsc->metric.session = s;
+			break;
+		}
+
+		ceph_put_mds_session(s);
+	}
+	mutex_unlock(&mdsc->mutex);
+}
+
+static void metric_delayed_work(struct work_struct *work)
+{
+	struct ceph_client_metric *m =
+		container_of(work, struct ceph_client_metric, delayed_work.work);
+	struct ceph_mds_client *mdsc =
+		container_of(m, struct ceph_mds_client, metric);
+
+	if (mdsc->stopping)
+		return;
+
+	if (!m->session || !check_session_state(m->session)) {
+		if (m->session) {
+			ceph_put_mds_session(m->session);
+			m->session = NULL;
+		}
+		metric_get_session(mdsc);
+	}
+	if (m->session) {
+		ceph_mdsc_send_metrics(mdsc, m->session);
+		metric_schedule_delayed(m);
+	}
+}
 
 int ceph_metric_init(struct ceph_client_metric *m)
 {
@@ -22,6 +162,7 @@ int ceph_metric_init(struct ceph_client_metric *m)
 	if (ret)
 		goto err_d_lease_mis;
 
+	atomic64_set(&m->total_caps, 0);
 	ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL);
 	if (ret)
 		goto err_i_caps_hit;
@@ -51,6 +192,9 @@ int ceph_metric_init(struct ceph_client_metric *m)
 	m->total_metadatas = 0;
 	m->metadata_latency_sum = 0;
 
+	m->session = NULL;
+	INIT_DELAYED_WORK(&m->delayed_work, metric_delayed_work);
+
 	return 0;
 
 err_i_caps_mis:
@@ -72,6 +216,11 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
 	percpu_counter_destroy(&m->i_caps_hit);
 	percpu_counter_destroy(&m->d_lease_mis);
 	percpu_counter_destroy(&m->d_lease_hit);
+
+	cancel_delayed_work_sync(&m->delayed_work);
+
+	if (m->session)
+		ceph_put_mds_session(m->session);
 }
 
 static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h
index ccd81285a450..1d0959d669d7 100644
--- a/fs/ceph/metric.h
+++ b/fs/ceph/metric.h
@@ -6,12 +6,91 @@
 #include <linux/percpu_counter.h>
 #include <linux/ktime.h>
 
+extern bool disable_send_metrics;
+
+enum ceph_metric_type {
+	CLIENT_METRIC_TYPE_CAP_INFO,
+	CLIENT_METRIC_TYPE_READ_LATENCY,
+	CLIENT_METRIC_TYPE_WRITE_LATENCY,
+	CLIENT_METRIC_TYPE_METADATA_LATENCY,
+	CLIENT_METRIC_TYPE_DENTRY_LEASE,
+
+	CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_DENTRY_LEASE,
+};
+
+/*
+ * This will always have the highest metric bit value
+ * as the last element of the array.
+ */
+#define CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED {	\
+	CLIENT_METRIC_TYPE_CAP_INFO,		\
+	CLIENT_METRIC_TYPE_READ_LATENCY,	\
+	CLIENT_METRIC_TYPE_WRITE_LATENCY,	\
+	CLIENT_METRIC_TYPE_METADATA_LATENCY,	\
+						\
+	CLIENT_METRIC_TYPE_MAX,			\
+}
+
+/* metric caps header */
+struct ceph_metric_cap {
+	__le32 type;     /* ceph metric type */
+
+	__u8  ver;
+	__u8  compat;
+
+	__le32 data_len; /* length of sizeof(hit + mis + total) */
+	__le64 hit;
+	__le64 mis;
+	__le64 total;
+} __packed;
+
+/* metric read latency header */
+struct ceph_metric_read_latency {
+	__le32 type;     /* ceph metric type */
+
+	__u8  ver;
+	__u8  compat;
+
+	__le32 data_len; /* length of sizeof(sec + nsec) */
+	__le32 sec;
+	__le32 nsec;
+} __packed;
+
+/* metric write latency header */
+struct ceph_metric_write_latency {
+	__le32 type;     /* ceph metric type */
+
+	__u8  ver;
+	__u8  compat;
+
+	__le32 data_len; /* length of sizeof(sec + nsec) */
+	__le32 sec;
+	__le32 nsec;
+} __packed;
+
+/* metric metadata latency header */
+struct ceph_metric_metadata_latency {
+	__le32 type;     /* ceph metric type */
+
+	__u8  ver;
+	__u8  compat;
+
+	__le32 data_len; /* length of sizeof(sec + nsec) */
+	__le32 sec;
+	__le32 nsec;
+} __packed;
+
+struct ceph_metric_head {
+	__le32 num;	/* the number of metrics that will be sent */
+} __packed;
+
 /* This is the global metrics */
 struct ceph_client_metric {
 	atomic64_t            total_dentries;
 	struct percpu_counter d_lease_hit;
 	struct percpu_counter d_lease_mis;
 
+	atomic64_t            total_caps;
 	struct percpu_counter i_caps_hit;
 	struct percpu_counter i_caps_mis;
 
@@ -35,8 +114,20 @@ struct ceph_client_metric {
 	ktime_t metadata_latency_sq_sum;
 	ktime_t metadata_latency_min;
 	ktime_t metadata_latency_max;
+
+	struct ceph_mds_session *session;
+	struct delayed_work delayed_work;  /* delayed work */
 };
 
+static inline void metric_schedule_delayed(struct ceph_client_metric *m)
+{
+	if (disable_send_metrics)
+		return;
+
+	/* per second */
+	schedule_delayed_work(&m->delayed_work, round_jiffies_relative(HZ));
+}
+
 extern int ceph_metric_init(struct ceph_client_metric *m);
 extern void ceph_metric_destroy(struct ceph_client_metric *m);
 
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index c9784eb1159a..7ec0e6d03d10 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -27,6 +27,9 @@
 #include <linux/ceph/auth.h>
 #include <linux/ceph/debugfs.h>
 
+static DEFINE_SPINLOCK(ceph_fsc_lock);
+static LIST_HEAD(ceph_fsc_list);
+
 /*
  * Ceph superblock operations
  *
@@ -634,8 +637,6 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 					struct ceph_options *opt)
 {
 	struct ceph_fs_client *fsc;
-	int page_count;
-	size_t size;
 	int err;
 
 	fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
@@ -683,18 +684,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	if (!fsc->cap_wq)
 		goto fail_inode_wq;
 
-	/* set up mempools */
-	err = -ENOMEM;
-	page_count = fsc->mount_options->wsize >> PAGE_SHIFT;
-	size = sizeof (struct page *) * (page_count ? page_count : 1);
-	fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
-	if (!fsc->wb_pagevec_pool)
-		goto fail_cap_wq;
+	spin_lock(&ceph_fsc_lock);
+	list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list);
+	spin_unlock(&ceph_fsc_lock);
 
 	return fsc;
 
-fail_cap_wq:
-	destroy_workqueue(fsc->cap_wq);
 fail_inode_wq:
 	destroy_workqueue(fsc->inode_wq);
 fail_client:
@@ -717,12 +712,14 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
 {
 	dout("destroy_fs_client %p\n", fsc);
 
+	spin_lock(&ceph_fsc_lock);
+	list_del(&fsc->metric_wakeup);
+	spin_unlock(&ceph_fsc_lock);
+
 	ceph_mdsc_destroy(fsc);
 	destroy_workqueue(fsc->inode_wq);
 	destroy_workqueue(fsc->cap_wq);
 
-	mempool_destroy(fsc->wb_pagevec_pool);
-
 	destroy_mount_options(fsc->mount_options);
 
 	ceph_destroy_client(fsc->client);
@@ -741,6 +738,7 @@ struct kmem_cache *ceph_dentry_cachep;
 struct kmem_cache *ceph_file_cachep;
 struct kmem_cache *ceph_dir_file_cachep;
 struct kmem_cache *ceph_mds_request_cachep;
+mempool_t *ceph_wb_pagevec_pool;
 
 static void ceph_inode_init_once(void *foo)
 {
@@ -785,6 +783,10 @@ static int __init init_caches(void)
 	if (!ceph_mds_request_cachep)
 		goto bad_mds_req;
 
+	ceph_wb_pagevec_pool = mempool_create_kmalloc_pool(10, CEPH_MAX_WRITE_SIZE >> PAGE_SHIFT);
+	if (!ceph_wb_pagevec_pool)
+		goto bad_pagevec_pool;
+
 	error = ceph_fscache_register();
 	if (error)
 		goto bad_fscache;
@@ -793,6 +795,8 @@ static int __init init_caches(void)
 
 bad_fscache:
 	kmem_cache_destroy(ceph_mds_request_cachep);
+bad_pagevec_pool:
+	mempool_destroy(ceph_wb_pagevec_pool);
 bad_mds_req:
 	kmem_cache_destroy(ceph_dir_file_cachep);
 bad_dir_file:
@@ -823,12 +827,13 @@ static void destroy_caches(void)
 	kmem_cache_destroy(ceph_file_cachep);
 	kmem_cache_destroy(ceph_dir_file_cachep);
 	kmem_cache_destroy(ceph_mds_request_cachep);
+	mempool_destroy(ceph_wb_pagevec_pool);
 
 	ceph_fscache_unregister();
 }
 
 /*
- * ceph_umount_begin - initiate forced umount.  Tear down down the
+ * ceph_umount_begin - initiate forced umount.  Tear down the
  * mount, skipping steps that may hang while waiting for server(s).
  */
 static void ceph_umount_begin(struct super_block *sb)
@@ -1282,6 +1287,37 @@ static void __exit exit_ceph(void)
 	destroy_caches();
 }
 
+static int param_set_metrics(const char *val, const struct kernel_param *kp)
+{
+	struct ceph_fs_client *fsc;
+	int ret;
+
+	ret = param_set_bool(val, kp);
+	if (ret) {
+		pr_err("Failed to parse sending metrics switch value '%s'\n",
+		       val);
+		return ret;
+	} else if (!disable_send_metrics) {
+		// wake up all the mds clients
+		spin_lock(&ceph_fsc_lock);
+		list_for_each_entry(fsc, &ceph_fsc_list, metric_wakeup) {
+			metric_schedule_delayed(&fsc->mdsc->metric);
+		}
+		spin_unlock(&ceph_fsc_lock);
+	}
+
+	return 0;
+}
+
+static const struct kernel_param_ops param_ops_metrics = {
+	.set = param_set_metrics,
+	.get = param_get_bool,
+};
+
+bool disable_send_metrics = false;
+module_param_cb(disable_send_metrics, &param_ops_metrics, &disable_send_metrics, 0644);
+MODULE_PARM_DESC(disable_send_metrics, "Enable sending perf metrics to ceph cluster (default: on)");
+
 module_init(init_ceph);
 module_exit(exit_ceph);
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 5a6cdd39bc10..4c3c964b1c54 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -101,6 +101,8 @@ struct ceph_mount_options {
 struct ceph_fs_client {
 	struct super_block *sb;
 
+	struct list_head metric_wakeup;
+
 	struct ceph_mount_options *mount_options;
 	struct ceph_client *client;
 
@@ -116,8 +118,6 @@ struct ceph_fs_client {
 
 	struct ceph_mds_client *mdsc;
 
-	/* writeback */
-	mempool_t *wb_pagevec_pool;
 	atomic_long_t writeback_count;
 
 	struct workqueue_struct *inode_wq;
@@ -353,7 +353,7 @@ struct ceph_inode_info {
 	unsigned i_dirty_caps, i_flushing_caps;     /* mask of dirtied fields */
 
 	/*
-	 * Link to the the auth cap's session's s_cap_dirty list. s_cap_dirty
+	 * Link to the auth cap's session's s_cap_dirty list. s_cap_dirty
 	 * is protected by the mdsc->cap_dirty_lock, but each individual item
 	 * is also protected by the inode's i_ceph_lock. Walking s_cap_dirty
 	 * requires the mdsc->cap_dirty_lock. List presence for an item can
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 71ee34d160c3..3a733ac33d9b 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -497,10 +497,10 @@ static int __set_xattr(struct ceph_inode_info *ci,
 		kfree(*newxattr);
 		*newxattr = NULL;
 		if (xattr->should_free_val)
-			kfree((void *)xattr->val);
+			kfree(xattr->val);
 
 		if (update_xattr) {
-			kfree((void *)name);
+			kfree(name);
 			name = xattr->name;
 		}
 		ci->i_xattrs.names_size -= xattr->name_len;
@@ -566,9 +566,9 @@ static void __free_xattr(struct ceph_inode_xattr *xattr)
 	BUG_ON(!xattr);
 
 	if (xattr->should_free_name)
-		kfree((void *)xattr->name);
+		kfree(xattr->name);
 	if (xattr->should_free_val)
-		kfree((void *)xattr->val);
+		kfree(xattr->val);
 
 	kfree(xattr);
 }
@@ -582,9 +582,9 @@ static int __remove_xattr(struct ceph_inode_info *ci,
 	rb_erase(&xattr->node, &ci->i_xattrs.index);
 
 	if (xattr->should_free_name)
-		kfree((void *)xattr->name);
+		kfree(xattr->name);
 	if (xattr->should_free_val)
-		kfree((void *)xattr->val);
+		kfree(xattr->val);
 
 	ci->i_xattrs.names_size -= xattr->name_len;
 	ci->i_xattrs.vals_size -= xattr->val_len;