summary refs log tree commit diff
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2021-07-09 09:52:13 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2021-07-09 09:52:13 -0700
commit47a7ce62889a52841bcc8cec98dd3bf45af3b4f0 (patch)
tree7c2c6f6f83bf95c181a3c9499ddaf6705d37e86a
parent96890bc2eaa1f6bfc1b194e0f0815a10824352a4 (diff)
parent4c18347238ab5a4ee0e71ca765460d84c75a26b5 (diff)
downloadlinux-47a7ce62889a52841bcc8cec98dd3bf45af3b4f0.tar.gz
Merge tag 'ceph-for-5.14-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "We have new filesystem client metrics for reporting I/O sizes from
  Xiubo, two patchsets from Jeff that begin to untangle some heavyweight
  blocking locks in the filesystem and a bunch of code cleanups"

* tag 'ceph-for-5.14-rc1' of git://github.com/ceph/ceph-client:
  ceph: take reference to req->r_parent at point of assignment
  ceph: eliminate ceph_async_iput()
  ceph: don't take s_mutex in ceph_flush_snaps
  ceph: don't take s_mutex in try_flush_caps
  ceph: don't take s_mutex or snap_rwsem in ceph_check_caps
  ceph: eliminate session->s_gen_ttl_lock
  ceph: allow ceph_put_mds_session to take NULL or ERR_PTR
  ceph: clean up locking annotation for ceph_get_snap_realm and __lookup_snap_realm
  ceph: add some lockdep assertions around snaprealm handling
  ceph: decoding error in ceph_update_snap_realm should return -EIO
  ceph: add IO size metrics support
  ceph: update and rename __update_latency helper to __update_stdev
  ceph: simplify the metrics struct
  libceph: fix doc warnings in cls_lock_client.c
  libceph: remove unnecessary ret variable in ceph_auth_init()
  libceph: fix some spelling mistakes
  libceph: kill ceph_none_authorizer::reply_buf
  ceph: make ceph_queue_cap_snap static
  ceph: make ceph_netfs_read_ops static
  ceph: remove bogus checks and WARN_ONs from ceph_set_page_dirty
-rw-r--r--fs/ceph/addr.c26
-rw-r--r--fs/ceph/caps.c125
-rw-r--r--fs/ceph/debugfs.c37
-rw-r--r--fs/ceph/dir.c16
-rw-r--r--fs/ceph/export.c1
-rw-r--r--fs/ceph/file.c24
-rw-r--r--fs/ceph/inode.c38
-rw-r--r--fs/ceph/mds_client.c54
-rw-r--r--fs/ceph/mds_client.h6
-rw-r--r--fs/ceph/metric.c167
-rw-r--r--fs/ceph/metric.h89
-rw-r--r--fs/ceph/quota.c9
-rw-r--r--fs/ceph/snap.c45
-rw-r--r--fs/ceph/super.h3
-rw-r--r--net/ceph/auth.c7
-rw-r--r--net/ceph/auth_none.c4
-rw-r--r--net/ceph/auth_none.h1
-rw-r--r--net/ceph/cls_lock_client.c12
18 files changed, 312 insertions, 352 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index c1570fada3d8..a1e2813731d1 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -82,10 +82,6 @@ static int ceph_set_page_dirty(struct page *page)
 	struct inode *inode;
 	struct ceph_inode_info *ci;
 	struct ceph_snap_context *snapc;
-	int ret;
-
-	if (unlikely(!mapping))
-		return !TestSetPageDirty(page);
 
 	if (PageDirty(page)) {
 		dout("%p set_page_dirty %p idx %lu -- already dirty\n",
@@ -130,11 +126,7 @@ static int ceph_set_page_dirty(struct page *page)
 	BUG_ON(PagePrivate(page));
 	attach_page_private(page, snapc);
 
-	ret = __set_page_dirty_nobuffers(page);
-	WARN_ON(!PageLocked(page));
-	WARN_ON(!page->mapping);
-
-	return ret;
+	return __set_page_dirty_nobuffers(page);
 }
 
 /*
@@ -226,7 +218,7 @@ static void finish_netfs_read(struct ceph_osd_request *req)
 	int err = req->r_result;
 
 	ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
-				 req->r_end_latency, err);
+				 req->r_end_latency, osd_data->length, err);
 
 	dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
 	     subreq->len, i_size_read(req->r_inode));
@@ -313,7 +305,7 @@ static void ceph_readahead_cleanup(struct address_space *mapping, void *priv)
 		ceph_put_cap_refs(ci, got);
 }
 
-const struct netfs_read_request_ops ceph_netfs_read_ops = {
+static const struct netfs_read_request_ops ceph_netfs_read_ops = {
 	.init_rreq		= ceph_init_rreq,
 	.is_cache_enabled	= ceph_is_cache_enabled,
 	.begin_cache_operation	= ceph_begin_cache_operation,
@@ -560,7 +552,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 		err = ceph_osdc_wait_request(osdc, req);
 
 	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
-				  req->r_end_latency, err);
+				  req->r_end_latency, len, err);
 
 	ceph_osdc_put_request(req);
 	if (err == 0)
@@ -635,6 +627,7 @@ static void writepages_finish(struct ceph_osd_request *req)
 	struct ceph_snap_context *snapc = req->r_snapc;
 	struct address_space *mapping = inode->i_mapping;
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	unsigned int len = 0;
 	bool remove_page;
 
 	dout("writepages_finish %p rc %d\n", inode, rc);
@@ -647,9 +640,6 @@ static void writepages_finish(struct ceph_osd_request *req)
 		ceph_clear_error_write(ci);
 	}
 
-	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
-				  req->r_end_latency, rc);
-
 	/*
 	 * We lost the cache cap, need to truncate the page before
 	 * it is unlocked, otherwise we'd truncate it later in the
@@ -666,6 +656,7 @@ static void writepages_finish(struct ceph_osd_request *req)
 
 		osd_data = osd_req_op_extent_osd_data(req, i);
 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+		len += osd_data->length;
 		num_pages = calc_pages_for((u64)osd_data->alignment,
 					   (u64)osd_data->length);
 		total_pages += num_pages;
@@ -696,6 +687,9 @@ static void writepages_finish(struct ceph_osd_request *req)
 		release_pages(osd_data->pages, num_pages);
 	}
 
+	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
+				  req->r_end_latency, len, rc);
+
 	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
 
 	osd_data = osd_req_op_extent_osd_data(req, 0);
@@ -1711,7 +1705,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
 	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
-				  req->r_end_latency, err);
+				  req->r_end_latency, len, err);
 
 out_put:
 	ceph_osdc_put_request(req);
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index a5e93b185515..7bdefd0c789a 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -645,9 +645,7 @@ void ceph_add_cap(struct inode *inode,
 	dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
 	     session->s_mds, cap_id, ceph_cap_string(issued), seq);
 
-	spin_lock(&session->s_gen_ttl_lock);
-	gen = session->s_cap_gen;
-	spin_unlock(&session->s_gen_ttl_lock);
+	gen = atomic_read(&session->s_cap_gen);
 
 	cap = __get_cap_for_mds(ci, mds);
 	if (!cap) {
@@ -785,10 +783,8 @@ static int __cap_is_valid(struct ceph_cap *cap)
 	unsigned long ttl;
 	u32 gen;
 
-	spin_lock(&cap->session->s_gen_ttl_lock);
-	gen = cap->session->s_cap_gen;
+	gen = atomic_read(&cap->session->s_cap_gen);
 	ttl = cap->session->s_cap_ttl;
-	spin_unlock(&cap->session->s_gen_ttl_lock);
 
 	if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
 		dout("__cap_is_valid %p cap %p issued %s "
@@ -1182,7 +1178,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 	 * s_cap_gen while session is in the reconnect state.
 	 */
 	if (queue_release &&
-	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
+	    (!session->s_cap_reconnect ||
+	     cap->cap_gen == atomic_read(&session->s_cap_gen))) {
 		cap->queue_release = 1;
 		if (removed) {
 			__ceph_queue_cap_release(session, cap);
@@ -1534,7 +1531,7 @@ static inline int __send_flush_snap(struct inode *inode,
  * asynchronously back to the MDS once sync writes complete and dirty
  * data is written out.
  *
- * Called under i_ceph_lock.  Takes s_mutex as needed.
+ * Called under i_ceph_lock.
  */
 static void __ceph_flush_snaps(struct ceph_inode_info *ci,
 			       struct ceph_mds_session *session)
@@ -1656,7 +1653,6 @@ retry:
 	mds = ci->i_auth_cap->session->s_mds;
 	if (session && session->s_mds != mds) {
 		dout(" oops, wrong session %p mutex\n", session);
-		mutex_unlock(&session->s_mutex);
 		ceph_put_mds_session(session);
 		session = NULL;
 	}
@@ -1665,10 +1661,6 @@ retry:
 		mutex_lock(&mdsc->mutex);
 		session = __ceph_lookup_mds_session(mdsc, mds);
 		mutex_unlock(&mdsc->mutex);
-		if (session) {
-			dout(" inverting session/ino locks on %p\n", session);
-			mutex_lock(&session->s_mutex);
-		}
 		goto retry;
 	}
 
@@ -1680,12 +1672,10 @@ retry:
 out:
 	spin_unlock(&ci->i_ceph_lock);
 
-	if (psession) {
+	if (psession)
 		*psession = session;
-	} else if (session) {
-		mutex_unlock(&session->s_mutex);
+	else
 		ceph_put_mds_session(session);
-	}
 	/* we flushed them all; remove this inode from the queue */
 	spin_lock(&mdsc->snap_flush_lock);
 	list_del_init(&ci->i_snap_flush_item);
@@ -1915,7 +1905,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 	struct ceph_cap *cap;
 	u64 flush_tid, oldest_flush_tid;
 	int file_wanted, used, cap_used;
-	int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
 	int issued, implemented, want, retain, revoking, flushing = 0;
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
 			   to avoid an infinite loop on retry */
@@ -1923,14 +1912,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 	bool queue_invalidate = false;
 	bool tried_invalidate = false;
 
+	if (session)
+		ceph_get_mds_session(session);
+
 	spin_lock(&ci->i_ceph_lock);
 	if (ci->i_ceph_flags & CEPH_I_FLUSH)
 		flags |= CHECK_CAPS_FLUSH;
-
-	goto retry_locked;
 retry:
-	spin_lock(&ci->i_ceph_lock);
-retry_locked:
 	/* Caps wanted by virtue of active open files. */
 	file_wanted = __ceph_caps_file_wanted(ci);
 
@@ -2010,7 +1998,7 @@ retry_locked:
 			ci->i_rdcache_revoking = ci->i_rdcache_gen;
 		}
 		tried_invalidate = true;
-		goto retry_locked;
+		goto retry;
 	}
 
 	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
@@ -2024,8 +2012,6 @@ retry_locked:
 		    ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
 			continue;
 
-		/* NOTE: no side-effects allowed, until we take s_mutex */
-
 		/*
 		 * If we have an auth cap, we don't need to consider any
 		 * overlapping caps as used.
@@ -2088,37 +2074,8 @@ retry_locked:
 			continue;     /* nope, all good */
 
 ack:
-		if (session && session != cap->session) {
-			dout("oops, wrong session %p mutex\n", session);
-			mutex_unlock(&session->s_mutex);
-			session = NULL;
-		}
-		if (!session) {
-			session = cap->session;
-			if (mutex_trylock(&session->s_mutex) == 0) {
-				dout("inverting session/ino locks on %p\n",
-				     session);
-				session = ceph_get_mds_session(session);
-				spin_unlock(&ci->i_ceph_lock);
-				if (took_snap_rwsem) {
-					up_read(&mdsc->snap_rwsem);
-					took_snap_rwsem = 0;
-				}
-				if (session) {
-					mutex_lock(&session->s_mutex);
-					ceph_put_mds_session(session);
-				} else {
-					/*
-					 * Because we take the reference while
-					 * holding the i_ceph_lock, it should
-					 * never be NULL. Throw a warning if it
-					 * ever is.
-					 */
-					WARN_ON_ONCE(true);
-				}
-				goto retry;
-			}
-		}
+		ceph_put_mds_session(session);
+		session = ceph_get_mds_session(cap->session);
 
 		/* kick flushing and flush snaps before sending normal
 		 * cap message */
@@ -2130,20 +2087,7 @@ ack:
 			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
 				__ceph_flush_snaps(ci, session);
 
-			goto retry_locked;
-		}
-
-		/* take snap_rwsem after session mutex */
-		if (!took_snap_rwsem) {
-			if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
-				dout("inverting snap/in locks on %p\n",
-				     inode);
-				spin_unlock(&ci->i_ceph_lock);
-				down_read(&mdsc->snap_rwsem);
-				took_snap_rwsem = 1;
-				goto retry;
-			}
-			took_snap_rwsem = 1;
+			goto retry;
 		}
 
 		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
@@ -2165,9 +2109,10 @@ ack:
 
 		__prep_cap(&arg, cap, CEPH_CAP_OP_UPDATE, mflags, cap_used,
 			   want, retain, flushing, flush_tid, oldest_flush_tid);
-		spin_unlock(&ci->i_ceph_lock);
 
+		spin_unlock(&ci->i_ceph_lock);
 		__send_cap(&arg, ci);
+		spin_lock(&ci->i_ceph_lock);
 
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
@@ -2182,13 +2127,9 @@ ack:
 
 	spin_unlock(&ci->i_ceph_lock);
 
+	ceph_put_mds_session(session);
 	if (queue_invalidate)
 		ceph_queue_invalidate(inode);
-
-	if (session)
-		mutex_unlock(&session->s_mutex);
-	if (took_snap_rwsem)
-		up_read(&mdsc->snap_rwsem);
 }
 
 /*
@@ -2198,26 +2139,17 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_mds_session *session = NULL;
 	int flushing = 0;
 	u64 flush_tid = 0, oldest_flush_tid = 0;
 
-retry:
 	spin_lock(&ci->i_ceph_lock);
 retry_locked:
 	if (ci->i_dirty_caps && ci->i_auth_cap) {
 		struct ceph_cap *cap = ci->i_auth_cap;
 		struct cap_msg_args arg;
+		struct ceph_mds_session *session = cap->session;
 
-		if (session != cap->session) {
-			spin_unlock(&ci->i_ceph_lock);
-			if (session)
-				mutex_unlock(&session->s_mutex);
-			session = cap->session;
-			mutex_lock(&session->s_mutex);
-			goto retry;
-		}
-		if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) {
+		if (session->s_state < CEPH_MDS_SESSION_OPEN) {
 			spin_unlock(&ci->i_ceph_lock);
 			goto out;
 		}
@@ -2254,9 +2186,6 @@ retry_locked:
 		spin_unlock(&ci->i_ceph_lock);
 	}
 out:
-	if (session)
-		mutex_unlock(&session->s_mutex);
-
 	*ptid = flush_tid;
 	return flushing;
 }
@@ -3213,8 +3142,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 	if (complete_capsnap)
 		wake_up_all(&ci->i_cap_wq);
 	while (put-- > 0) {
-		/* avoid calling iput_final() in osd dispatch threads */
-		ceph_async_iput(inode);
+		iput(inode);
 	}
 }
 
@@ -3288,7 +3216,7 @@ static void handle_cap_grant(struct inode *inode,
 	u64 size = le64_to_cpu(grant->size);
 	u64 max_size = le64_to_cpu(grant->max_size);
 	unsigned char check_caps = 0;
-	bool was_stale = cap->cap_gen < session->s_cap_gen;
+	bool was_stale = cap->cap_gen < atomic_read(&session->s_cap_gen);
 	bool wake = false;
 	bool writeback = false;
 	bool queue_trunc = false;
@@ -3340,7 +3268,7 @@ static void handle_cap_grant(struct inode *inode,
 	}
 
 	/* side effects now are allowed */
-	cap->cap_gen = session->s_cap_gen;
+	cap->cap_gen = atomic_read(&session->s_cap_gen);
 	cap->seq = seq;
 
 	__check_cap_issue(ci, cap, newcaps);
@@ -3553,13 +3481,12 @@ static void handle_cap_grant(struct inode *inode,
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 
+	mutex_unlock(&session->s_mutex);
 	if (check_caps == 1)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL,
 				session);
 	else if (check_caps == 2)
 		ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session);
-	else
-		mutex_unlock(&session->s_mutex);
 }
 
 /*
@@ -4203,8 +4130,7 @@ done:
 	mutex_unlock(&session->s_mutex);
 done_unlocked:
 	ceph_put_string(extra_info.pool_ns);
-	/* avoid calling iput_final() in mds dispatch threads */
-	ceph_async_iput(inode);
+	iput(inode);
 	return;
 
 flush_cap_releases:
@@ -4246,8 +4172,7 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 			spin_unlock(&mdsc->cap_delay_lock);
 			dout("check_delayed_caps on %p\n", inode);
 			ceph_check_caps(ci, 0, NULL);
-			/* avoid calling iput_final() in tick thread */
-			ceph_async_iput(inode);
+			iput(inode);
 			spin_lock(&mdsc->cap_delay_lock);
 		}
 	}
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 425f3356332a..38b78b45811f 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -127,7 +127,7 @@ static int mdsc_show(struct seq_file *s, void *p)
 	return 0;
 }
 
-#define CEPH_METRIC_SHOW(name, total, avg, min, max, sq) {		\
+#define CEPH_LAT_METRIC_SHOW(name, total, avg, min, max, sq) {		\
 	s64 _total, _avg, _min, _max, _sq, _st;				\
 	_avg = ktime_to_us(avg);					\
 	_min = ktime_to_us(min == KTIME_MAX ? 0 : min);			\
@@ -140,6 +140,12 @@ static int mdsc_show(struct seq_file *s, void *p)
 		   name, total, _avg, _min, _max, _st);			\
 }
 
+#define CEPH_SZ_METRIC_SHOW(name, total, avg, min, max, sum) {		\
+	u64 _min = min == U64_MAX ? 0 : min;				\
+	seq_printf(s, "%-14s%-12lld%-16llu%-16llu%-16llu%llu\n",	\
+		   name, total, avg, _min, max, sum);			\
+}
+
 static int metric_show(struct seq_file *s, void *p)
 {
 	struct ceph_fs_client *fsc = s->private;
@@ -147,6 +153,7 @@ static int metric_show(struct seq_file *s, void *p)
 	struct ceph_client_metric *m = &mdsc->metric;
 	int nr_caps = 0;
 	s64 total, sum, avg, min, max, sq;
+	u64 sum_sz, avg_sz, min_sz, max_sz;
 
 	sum = percpu_counter_sum(&m->total_inodes);
 	seq_printf(s, "item                               total\n");
@@ -170,7 +177,7 @@ static int metric_show(struct seq_file *s, void *p)
 	max = m->read_latency_max;
 	sq = m->read_latency_sq_sum;
 	spin_unlock(&m->read_metric_lock);
-	CEPH_METRIC_SHOW("read", total, avg, min, max, sq);
+	CEPH_LAT_METRIC_SHOW("read", total, avg, min, max, sq);
 
 	spin_lock(&m->write_metric_lock);
 	total = m->total_writes;
@@ -180,7 +187,7 @@ static int metric_show(struct seq_file *s, void *p)
 	max = m->write_latency_max;
 	sq = m->write_latency_sq_sum;
 	spin_unlock(&m->write_metric_lock);
-	CEPH_METRIC_SHOW("write", total, avg, min, max, sq);
+	CEPH_LAT_METRIC_SHOW("write", total, avg, min, max, sq);
 
 	spin_lock(&m->metadata_metric_lock);
 	total = m->total_metadatas;
@@ -190,7 +197,29 @@ static int metric_show(struct seq_file *s, void *p)
 	max = m->metadata_latency_max;
 	sq = m->metadata_latency_sq_sum;
 	spin_unlock(&m->metadata_metric_lock);
-	CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq);
+	CEPH_LAT_METRIC_SHOW("metadata", total, avg, min, max, sq);
+
+	seq_printf(s, "\n");
+	seq_printf(s, "item          total       avg_sz(bytes)   min_sz(bytes)   max_sz(bytes)  total_sz(bytes)\n");
+	seq_printf(s, "----------------------------------------------------------------------------------------\n");
+
+	spin_lock(&m->read_metric_lock);
+	total = m->total_reads;
+	sum_sz = m->read_size_sum;
+	avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0;
+	min_sz = m->read_size_min;
+	max_sz = m->read_size_max;
+	spin_unlock(&m->read_metric_lock);
+	CEPH_SZ_METRIC_SHOW("read", total, avg_sz, min_sz, max_sz, sum_sz);
+
+	spin_lock(&m->write_metric_lock);
+	total = m->total_writes;
+	sum_sz = m->write_size_sum;
+	avg_sz = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum_sz, total) : 0;
+	min_sz = m->write_size_min;
+	max_sz = m->write_size_max;
+	spin_unlock(&m->write_metric_lock);
+	CEPH_SZ_METRIC_SHOW("write", total, avg_sz, min_sz, max_sz, sum_sz);
 
 	seq_printf(s, "\n");
 	seq_printf(s, "item          total           miss            hit\n");
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 9ba79b6531fb..133dbd9338e7 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -788,6 +788,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 		mask |= CEPH_CAP_XATTR_SHARED;
 	req->r_args.getattr.mask = cpu_to_le32(mask);
 
+	ihold(dir);
 	req->r_parent = dir;
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -868,6 +869,7 @@ static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir,
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
 	req->r_parent = dir;
+	ihold(dir);
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_args.mknod.mode = cpu_to_le32(mode);
 	req->r_args.mknod.rdev = cpu_to_le32(rdev);
@@ -929,6 +931,8 @@ static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir,
 		goto out;
 	}
 	req->r_parent = dir;
+	ihold(dir);
+
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
@@ -993,6 +997,7 @@ static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir,
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
 	req->r_parent = dir;
+	ihold(dir);
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_args.mkdir.mode = cpu_to_le32(mode);
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
@@ -1037,6 +1042,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
 	req->r_num_caps = 2;
 	req->r_old_dentry = dget(old_dentry);
 	req->r_parent = dir;
+	ihold(dir);
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -1158,6 +1164,7 @@ retry:
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
 	req->r_parent = dir;
+	ihold(dir);
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 	req->r_inode_drop = ceph_drop_caps_for_unlink(inode);
@@ -1232,6 +1239,7 @@ static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir,
 	req->r_old_dentry = dget(old_dentry);
 	req->r_old_dentry_dir = old_dir;
 	req->r_parent = new_dir;
+	ihold(new_dir);
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -1548,10 +1556,8 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di)
 		u32 gen;
 		unsigned long ttl;
 
-		spin_lock(&session->s_gen_ttl_lock);
-		gen = session->s_cap_gen;
+		gen = atomic_read(&session->s_cap_gen);
 		ttl = session->s_cap_ttl;
-		spin_unlock(&session->s_gen_ttl_lock);
 
 		if (di->lease_gen == gen &&
 		    time_before(jiffies, ttl) &&
@@ -1730,6 +1736,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 			req->r_dentry = dget(dentry);
 			req->r_num_caps = 2;
 			req->r_parent = dir;
+			ihold(dir);
 
 			mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
 			if (ceph_security_xattr_wanted(dir))
@@ -1809,8 +1816,7 @@ static void ceph_d_release(struct dentry *dentry)
 	dentry->d_fsdata = NULL;
 	spin_unlock(&dentry->d_lock);
 
-	if (di->lease_session)
-		ceph_put_mds_session(di->lease_session);
+	ceph_put_mds_session(di->lease_session);
 	kmem_cache_free(ceph_dentry_cachep, di);
 }
 
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 65540a4429b2..1d65934c1262 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -542,6 +542,7 @@ static int ceph_get_name(struct dentry *parent, char *name,
 	ihold(inode);
 	req->r_ino2 = ceph_vino(d_inode(parent));
 	req->r_parent = d_inode(parent);
+	ihold(req->r_parent);
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_num_caps = 2;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d51af3698032..d1755ac1d964 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -706,6 +706,7 @@ retry:
 		mask |= CEPH_CAP_XATTR_SHARED;
 	req->r_args.open.mask = cpu_to_le32(mask);
 	req->r_parent = dir;
+	ihold(dir);
 
 	if (flags & O_CREAT) {
 		struct ceph_file_layout lo;
@@ -903,7 +904,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 		ceph_update_read_metrics(&fsc->mdsc->metric,
 					 req->r_start_latency,
 					 req->r_end_latency,
-					 ret);
+					 len, ret);
 
 		ceph_osdc_put_request(req);
 
@@ -1035,12 +1036,12 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	struct ceph_aio_request *aio_req = req->r_priv;
 	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
 	struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
+	unsigned int len = osd_data->bvec_pos.iter.bi_size;
 
 	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
 	BUG_ON(!osd_data->num_bvecs);
 
-	dout("ceph_aio_complete_req %p rc %d bytes %u\n",
-	     inode, rc, osd_data->bvec_pos.iter.bi_size);
+	dout("ceph_aio_complete_req %p rc %d bytes %u\n", inode, rc, len);
 
 	if (rc == -EOLDSNAPC) {
 		struct ceph_aio_work *aio_work;
@@ -1058,9 +1059,9 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	} else if (!aio_req->write) {
 		if (rc == -ENOENT)
 			rc = 0;
-		if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) {
+		if (rc >= 0 && len > rc) {
 			struct iov_iter i;
-			int zlen = osd_data->bvec_pos.iter.bi_size - rc;
+			int zlen = len - rc;
 
 			/*
 			 * If read is satisfied by single OSD request,
@@ -1077,8 +1078,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 			}
 
 			iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs,
-				      osd_data->num_bvecs,
-				      osd_data->bvec_pos.iter.bi_size);
+				      osd_data->num_bvecs, len);
 			iov_iter_advance(&i, rc);
 			iov_iter_zero(zlen, &i);
 		}
@@ -1088,10 +1088,10 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	if (req->r_start_latency) {
 		if (aio_req->write)
 			ceph_update_write_metrics(metric, req->r_start_latency,
-						  req->r_end_latency, rc);
+						  req->r_end_latency, len, rc);
 		else
 			ceph_update_read_metrics(metric, req->r_start_latency,
-						 req->r_end_latency, rc);
+						 req->r_end_latency, len, rc);
 	}
 
 	put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
@@ -1299,10 +1299,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 
 		if (write)
 			ceph_update_write_metrics(metric, req->r_start_latency,
-						  req->r_end_latency, ret);
+						  req->r_end_latency, len, ret);
 		else
 			ceph_update_read_metrics(metric, req->r_start_latency,
-						 req->r_end_latency, ret);
+						 req->r_end_latency, len, ret);
 
 		size = i_size_read(inode);
 		if (!write) {
@@ -1476,7 +1476,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
 		ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
-					  req->r_end_latency, ret);
+					  req->r_end_latency, len, ret);
 out:
 		ceph_osdc_put_request(req);
 		if (ret != 0) {
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index df0c8a724609..1bd2cc015913 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1124,7 +1124,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
 		return;
 	}
 
-	if (di->lease_gen == session->s_cap_gen &&
+	if (di->lease_gen == atomic_read(&session->s_cap_gen) &&
 	    time_before(ttl, di->time))
 		return;  /* we already have a newer lease. */
 
@@ -1135,7 +1135,7 @@ static void __update_dentry_lease(struct inode *dir, struct dentry *dentry,
 
 	if (!di->lease_session)
 		di->lease_session = ceph_get_mds_session(session);
-	di->lease_gen = session->s_cap_gen;
+	di->lease_gen = atomic_read(&session->s_cap_gen);
 	di->lease_seq = le32_to_cpu(lease->seq);
 	di->lease_renew_after = half_ttl;
 	di->lease_renew_from = 0;
@@ -1154,8 +1154,7 @@ static inline void update_dentry_lease(struct inode *dir, struct dentry *dentry,
 	__update_dentry_lease(dir, dentry, lease, session, from_time,
 			      &old_lease_session);
 	spin_unlock(&dentry->d_lock);
-	if (old_lease_session)
-		ceph_put_mds_session(old_lease_session);
+	ceph_put_mds_session(old_lease_session);
 }
 
 /*
@@ -1200,8 +1199,7 @@ static void update_dentry_lease_careful(struct dentry *dentry,
 			      from_time, &old_lease_session);
 out_unlock:
 	spin_unlock(&dentry->d_lock);
-	if (old_lease_session)
-		ceph_put_mds_session(old_lease_session);
+	ceph_put_mds_session(old_lease_session);
 }
 
 /*
@@ -1568,8 +1566,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
 			unlock_new_inode(in);
 		}
 
-		/* avoid calling iput_final() in mds dispatch threads */
-		ceph_async_iput(in);
+		iput(in);
 	}
 
 	return err;
@@ -1766,13 +1763,11 @@ retry_lookup:
 		if (ret < 0) {
 			pr_err("ceph_fill_inode badness on %p\n", in);
 			if (d_really_is_negative(dn)) {
-				/* avoid calling iput_final() in mds
-				 * dispatch threads */
 				if (in->i_state & I_NEW) {
 					ihold(in);
 					discard_new_inode(in);
 				}
-				ceph_async_iput(in);
+				iput(in);
 			}
 			d_drop(dn);
 			err = ret;
@@ -1785,7 +1780,7 @@ retry_lookup:
 			if (ceph_security_xattr_deadlock(in)) {
 				dout(" skip splicing dn %p to inode %p"
 				     " (security xattr deadlock)\n", dn, in);
-				ceph_async_iput(in);
+				iput(in);
 				skipped++;
 				goto next_item;
 			}
@@ -1836,25 +1831,6 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
 	return ret;
 }
 
-/*
- * Put reference to inode, but avoid calling iput_final() in current thread.
- * iput_final() may wait for reahahead pages. The wait can cause deadlock in
- * some contexts.
- */
-void ceph_async_iput(struct inode *inode)
-{
-	if (!inode)
-		return;
-	for (;;) {
-		if (atomic_add_unless(&inode->i_count, -1, 1))
-			break;
-		if (queue_work(ceph_inode_to_client(inode)->inode_wq,
-			       &ceph_inode(inode)->i_work))
-			break;
-		/* queue work failed, i_count must be at least 2 */
-	}
-}
-
 void ceph_queue_inode_work(struct inode *inode, int work_bit)
 {
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index e5af591d3bd4..a818213c972f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -664,6 +664,9 @@ struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
 
 void ceph_put_mds_session(struct ceph_mds_session *s)
 {
+	if (IS_ERR_OR_NULL(s))
+		return;
+
 	dout("mdsc put_session %p %d -> %d\n", s,
 	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
 	if (refcount_dec_and_test(&s->s_ref)) {
@@ -746,8 +749,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 
 	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 
-	spin_lock_init(&s->s_gen_ttl_lock);
-	s->s_cap_gen = 1;
+	atomic_set(&s->s_cap_gen, 1);
 	s->s_cap_ttl = jiffies - 1;
 
 	spin_lock_init(&s->s_cap_lock);
@@ -822,14 +824,13 @@ void ceph_mdsc_release_request(struct kref *kref)
 		ceph_msg_put(req->r_reply);
 	if (req->r_inode) {
 		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
-		/* avoid calling iput_final() in mds dispatch threads */
-		ceph_async_iput(req->r_inode);
+		iput(req->r_inode);
 	}
 	if (req->r_parent) {
 		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
-		ceph_async_iput(req->r_parent);
+		iput(req->r_parent);
 	}
-	ceph_async_iput(req->r_target_inode);
+	iput(req->r_target_inode);
 	if (req->r_dentry)
 		dput(req->r_dentry);
 	if (req->r_old_dentry)
@@ -843,7 +844,7 @@ void ceph_mdsc_release_request(struct kref *kref)
 		 */
 		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
 				  CEPH_CAP_PIN);
-		ceph_async_iput(req->r_old_dentry_dir);
+		iput(req->r_old_dentry_dir);
 	}
 	kfree(req->r_path1);
 	kfree(req->r_path2);
@@ -958,8 +959,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
 	}
 
 	if (req->r_unsafe_dir) {
-		/* avoid calling iput_final() in mds dispatch threads */
-		ceph_async_iput(req->r_unsafe_dir);
+		iput(req->r_unsafe_dir);
 		req->r_unsafe_dir = NULL;
 	}
 
@@ -1130,7 +1130,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 	if (!cap) {
 		spin_unlock(&ci->i_ceph_lock);
-		ceph_async_iput(inode);
+		iput(inode);
 		goto random;
 	}
 	mds = cap->session->s_mds;
@@ -1139,9 +1139,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 	     cap == ci->i_auth_cap ? "auth " : "", cap);
 	spin_unlock(&ci->i_ceph_lock);
 out:
-	/* avoid calling iput_final() while holding mdsc->mutex or
-	 * in mds dispatch threads */
-	ceph_async_iput(inode);
+	iput(inode);
 	return mds;
 
 random:
@@ -1438,8 +1436,7 @@ static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
 
 	for (i = 0; i < mi->num_export_targets; i++) {
 		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
-		if (!IS_ERR(ts))
-			ceph_put_mds_session(ts);
+		ceph_put_mds_session(ts);
 	}
 }
 
@@ -1545,9 +1542,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
 		spin_unlock(&session->s_cap_lock);
 
 		if (last_inode) {
-			/* avoid calling iput_final() while holding
-			 * s_mutex or in mds dispatch threads */
-			ceph_async_iput(last_inode);
+			iput(last_inode);
 			last_inode = NULL;
 		}
 		if (old_cap) {
@@ -1581,7 +1576,7 @@ out:
 	session->s_cap_iterator = NULL;
 	spin_unlock(&session->s_cap_lock);
 
-	ceph_async_iput(last_inode);
+	iput(last_inode);
 	if (old_cap)
 		ceph_put_cap(session->s_mdsc, old_cap);
 
@@ -1721,8 +1716,7 @@ static void remove_session_caps(struct ceph_mds_session *session)
 			spin_unlock(&session->s_cap_lock);
 
 			inode = ceph_find_inode(sb, vino);
-			 /* avoid calling iput_final() while holding s_mutex */
-			ceph_async_iput(inode);
+			iput(inode);
 
 			spin_lock(&session->s_cap_lock);
 		}
@@ -1761,7 +1755,7 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
 		ci->i_requested_max_size = 0;
 		spin_unlock(&ci->i_ceph_lock);
 	} else if (ev == RENEWCAPS) {
-		if (cap->cap_gen < cap->session->s_cap_gen) {
+		if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
 			/* mds did not re-issue stale cap */
 			spin_lock(&ci->i_ceph_lock);
 			cap->issued = cap->implemented = CEPH_CAP_PIN;
@@ -2988,7 +2982,6 @@ int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
 		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
 		__ceph_touch_fmode(ci, mdsc, fmode);
 		spin_unlock(&ci->i_ceph_lock);
-		ihold(req->r_parent);
 	}
 	if (req->r_old_dentry_dir)
 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
@@ -3499,10 +3492,8 @@ static void handle_session(struct ceph_mds_session *session,
 	case CEPH_SESSION_STALE:
 		pr_info("mds%d caps went stale, renewing\n",
 			session->s_mds);
-		spin_lock(&session->s_gen_ttl_lock);
-		session->s_cap_gen++;
+		atomic_inc(&session->s_cap_gen);
 		session->s_cap_ttl = jiffies - 1;
-		spin_unlock(&session->s_gen_ttl_lock);
 		send_renew_caps(mdsc, session);
 		break;
 
@@ -3771,7 +3762,7 @@ static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	cap->seq = 0;        /* reset cap seq */
 	cap->issue_seq = 0;  /* and issue_seq */
 	cap->mseq = 0;       /* and migrate_seq */
-	cap->cap_gen = cap->session->s_cap_gen;
+	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
 
 	/* These are lost when the session goes away */
 	if (S_ISDIR(inode->i_mode)) {
@@ -4011,9 +4002,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	dout("session %p state %s\n", session,
 	     ceph_session_state_name(session->s_state));
 
-	spin_lock(&session->s_gen_ttl_lock);
-	session->s_cap_gen++;
-	spin_unlock(&session->s_gen_ttl_lock);
+	atomic_inc(&session->s_cap_gen);
 
 	spin_lock(&session->s_cap_lock);
 	/* don't know if session is readonly */
@@ -4344,7 +4333,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 
 	case CEPH_MDS_LEASE_RENEW:
 		if (di->lease_session == session &&
-		    di->lease_gen == session->s_cap_gen &&
+		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
 		    di->lease_renew_from &&
 		    di->lease_renew_after == 0) {
 			unsigned long duration =
@@ -4372,8 +4361,7 @@ release:
 
 out:
 	mutex_unlock(&session->s_mutex);
-	/* avoid calling iput_final() in mds dispatch threads */
-	ceph_async_iput(inode);
+	iput(inode);
 	return;
 
 bad:
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 15c11a0f2caf..20e42d8b66c6 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -186,10 +186,8 @@ struct ceph_mds_session {
 
 	struct ceph_auth_handshake s_auth;
 
-	/* protected by s_gen_ttl_lock */
-	spinlock_t        s_gen_ttl_lock;
-	u32               s_cap_gen;  /* inc each time we get mds stale msg */
-	unsigned long     s_cap_ttl;  /* when session caps expire */
+	atomic_t          s_cap_gen;  /* inc each time we get mds stale msg */
+	unsigned long     s_cap_ttl;  /* when session caps expire. protected by s_mutex */
 
 	/* protected by s_cap_lock */
 	spinlock_t        s_cap_lock;
diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c
index 28b6b42ad677..5ac151eb0d49 100644
--- a/fs/ceph/metric.c
+++ b/fs/ceph/metric.c
@@ -20,8 +20,11 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 	struct ceph_opened_files *files;
 	struct ceph_pinned_icaps *icaps;
 	struct ceph_opened_inodes *inodes;
+	struct ceph_read_io_size *rsize;
+	struct ceph_write_io_size *wsize;
 	struct ceph_client_metric *m = &mdsc->metric;
 	u64 nr_caps = atomic64_read(&m->total_caps);
+	u32 header_len = sizeof(struct ceph_metric_header);
 	struct ceph_msg *msg;
 	struct timespec64 ts;
 	s64 sum;
@@ -30,7 +33,8 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 
 	len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
 	      + sizeof(*meta) + sizeof(*dlease) + sizeof(*files)
-	      + sizeof(*icaps) + sizeof(*inodes);
+	      + sizeof(*icaps) + sizeof(*inodes) + sizeof(*rsize)
+	      + sizeof(*wsize);
 
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
 	if (!msg) {
@@ -43,10 +47,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 
 	/* encode the cap metric */
 	cap = (struct ceph_metric_cap *)(head + 1);
-	cap->type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO);
-	cap->ver = 1;
-	cap->compat = 1;
-	cap->data_len = cpu_to_le32(sizeof(*cap) - 10);
+	cap->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_CAP_INFO);
+	cap->header.ver = 1;
+	cap->header.compat = 1;
+	cap->header.data_len = cpu_to_le32(sizeof(*cap) - header_len);
 	cap->hit = cpu_to_le64(percpu_counter_sum(&m->i_caps_hit));
 	cap->mis = cpu_to_le64(percpu_counter_sum(&m->i_caps_mis));
 	cap->total = cpu_to_le64(nr_caps);
@@ -54,10 +58,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 
 	/* encode the read latency metric */
 	read = (struct ceph_metric_read_latency *)(cap + 1);
-	read->type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
-	read->ver = 1;
-	read->compat = 1;
-	read->data_len = cpu_to_le32(sizeof(*read) - 10);
+	read->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_LATENCY);
+	read->header.ver = 1;
+	read->header.compat = 1;
+	read->header.data_len = cpu_to_le32(sizeof(*read) - header_len);
 	sum = m->read_latency_sum;
 	jiffies_to_timespec64(sum, &ts);
 	read->sec = cpu_to_le32(ts.tv_sec);
@@ -66,10 +70,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 
 	/* encode the write latency metric */
 	write = (struct ceph_metric_write_latency *)(read + 1);
-	write->type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
-	write->ver = 1;
-	write->compat = 1;
-	write->data_len = cpu_to_le32(sizeof(*write) - 10);
+	write->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_LATENCY);
+	write->header.ver = 1;
+	write->header.compat = 1;
+	write->header.data_len = cpu_to_le32(sizeof(*write) - header_len);
 	sum = m->write_latency_sum;
 	jiffies_to_timespec64(sum, &ts);
 	write->sec = cpu_to_le32(ts.tv_sec);
@@ -78,10 +82,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 
 	/* encode the metadata latency metric */
 	meta = (struct ceph_metric_metadata_latency *)(write + 1);
-	meta->type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
-	meta->ver = 1;
-	meta->compat = 1;
-	meta->data_len = cpu_to_le32(sizeof(*meta) - 10);
+	meta->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_METADATA_LATENCY);
+	meta->header.ver = 1;
+	meta->header.compat = 1;
+	meta->header.data_len = cpu_to_le32(sizeof(*meta) - header_len);
 	sum = m->metadata_latency_sum;
 	jiffies_to_timespec64(sum, &ts);
 	meta->sec = cpu_to_le32(ts.tv_sec);
@@ -90,10 +94,10 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 
 	/* encode the dentry lease metric */
 	dlease = (struct ceph_metric_dlease *)(meta + 1);
-	dlease->type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE);
-	dlease->ver = 1;
-	dlease->compat = 1;
-	dlease->data_len = cpu_to_le32(sizeof(*dlease) - 10);
+	dlease->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_DENTRY_LEASE);
+	dlease->header.ver = 1;
+	dlease->header.compat = 1;
+	dlease->header.data_len = cpu_to_le32(sizeof(*dlease) - header_len);
 	dlease->hit = cpu_to_le64(percpu_counter_sum(&m->d_lease_hit));
 	dlease->mis = cpu_to_le64(percpu_counter_sum(&m->d_lease_mis));
 	dlease->total = cpu_to_le64(atomic64_read(&m->total_dentries));
@@ -103,34 +107,54 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 
 	/* encode the opened files metric */
 	files = (struct ceph_opened_files *)(dlease + 1);
-	files->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES);
-	files->ver = 1;
-	files->compat = 1;
-	files->data_len = cpu_to_le32(sizeof(*files) - 10);
+	files->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES);
+	files->header.ver = 1;
+	files->header.compat = 1;
+	files->header.data_len = cpu_to_le32(sizeof(*files) - header_len);
 	files->opened_files = cpu_to_le64(atomic64_read(&m->opened_files));
 	files->total = cpu_to_le64(sum);
 	items++;
 
 	/* encode the pinned icaps metric */
 	icaps = (struct ceph_pinned_icaps *)(files + 1);
-	icaps->type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS);
-	icaps->ver = 1;
-	icaps->compat = 1;
-	icaps->data_len = cpu_to_le32(sizeof(*icaps) - 10);
+	icaps->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS);
+	icaps->header.ver = 1;
+	icaps->header.compat = 1;
+	icaps->header.data_len = cpu_to_le32(sizeof(*icaps) - header_len);
 	icaps->pinned_icaps = cpu_to_le64(nr_caps);
 	icaps->total = cpu_to_le64(sum);
 	items++;
 
 	/* encode the opened inodes metric */
 	inodes = (struct ceph_opened_inodes *)(icaps + 1);
-	inodes->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES);
-	inodes->ver = 1;
-	inodes->compat = 1;
-	inodes->data_len = cpu_to_le32(sizeof(*inodes) - 10);
+	inodes->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES);
+	inodes->header.ver = 1;
+	inodes->header.compat = 1;
+	inodes->header.data_len = cpu_to_le32(sizeof(*inodes) - header_len);
 	inodes->opened_inodes = cpu_to_le64(percpu_counter_sum(&m->opened_inodes));
 	inodes->total = cpu_to_le64(sum);
 	items++;
 
+	/* encode the read io size metric */
+	rsize = (struct ceph_read_io_size *)(inodes + 1);
+	rsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_READ_IO_SIZES);
+	rsize->header.ver = 1;
+	rsize->header.compat = 1;
+	rsize->header.data_len = cpu_to_le32(sizeof(*rsize) - header_len);
+	rsize->total_ops = cpu_to_le64(m->total_reads);
+	rsize->total_size = cpu_to_le64(m->read_size_sum);
+	items++;
+
+	/* encode the write io size metric */
+	wsize = (struct ceph_write_io_size *)(rsize + 1);
+	wsize->header.type = cpu_to_le32(CLIENT_METRIC_TYPE_WRITE_IO_SIZES);
+	wsize->header.ver = 1;
+	wsize->header.compat = 1;
+	wsize->header.data_len = cpu_to_le32(sizeof(*wsize) - header_len);
+	wsize->total_ops = cpu_to_le64(m->total_writes);
+	wsize->total_size = cpu_to_le64(m->write_size_sum);
+	items++;
+
 	put_unaligned_le32(items, &head->num);
 	msg->front.iov_len = len;
 	msg->hdr.version = cpu_to_le16(1);
@@ -225,6 +249,9 @@ int ceph_metric_init(struct ceph_client_metric *m)
 	m->read_latency_max = 0;
 	m->total_reads = 0;
 	m->read_latency_sum = 0;
+	m->read_size_min = U64_MAX;
+	m->read_size_max = 0;
+	m->read_size_sum = 0;
 
 	spin_lock_init(&m->write_metric_lock);
 	m->write_latency_sq_sum = 0;
@@ -232,6 +259,9 @@ int ceph_metric_init(struct ceph_client_metric *m)
 	m->write_latency_max = 0;
 	m->total_writes = 0;
 	m->write_latency_sum = 0;
+	m->write_size_min = U64_MAX;
+	m->write_size_max = 0;
+	m->write_size_sum = 0;
 
 	spin_lock_init(&m->metadata_metric_lock);
 	m->metadata_latency_sq_sum = 0;
@@ -281,23 +311,21 @@ void ceph_metric_destroy(struct ceph_client_metric *m)
 
 	cancel_delayed_work_sync(&m->delayed_work);
 
-	if (m->session)
-		ceph_put_mds_session(m->session);
+	ceph_put_mds_session(m->session);
 }
 
-static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
-				    ktime_t *min, ktime_t *max,
-				    ktime_t *sq_sump, ktime_t lat)
-{
-	ktime_t total, avg, sq, lsum;
-
-	total = ++(*totalp);
-	lsum = (*lsump += lat);
+#define METRIC_UPDATE_MIN_MAX(min, max, new)	\
+{						\
+	if (unlikely(new < min))		\
+		min = new;			\
+	if (unlikely(new > max))		\
+		max = new;			\
+}
 
-	if (unlikely(lat < *min))
-		*min = lat;
-	if (unlikely(lat > *max))
-		*max = lat;
+static inline void __update_stdev(ktime_t total, ktime_t lsum,
+				  ktime_t *sq_sump, ktime_t lat)
+{
+	ktime_t avg, sq;
 
 	if (unlikely(total == 1))
 		return;
@@ -312,33 +340,51 @@ static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
 
 void ceph_update_read_metrics(struct ceph_client_metric *m,
 			      ktime_t r_start, ktime_t r_end,
-			      int rc)
+			      unsigned int size, int rc)
 {
 	ktime_t lat = ktime_sub(r_end, r_start);
+	ktime_t total;
 
 	if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
 		return;
 
 	spin_lock(&m->read_metric_lock);
-	__update_latency(&m->total_reads, &m->read_latency_sum,
-			 &m->read_latency_min, &m->read_latency_max,
-			 &m->read_latency_sq_sum, lat);
+	total = ++m->total_reads;
+	m->read_size_sum += size;
+	m->read_latency_sum += lat;
+	METRIC_UPDATE_MIN_MAX(m->read_size_min,
+			      m->read_size_max,
+			      size);
+	METRIC_UPDATE_MIN_MAX(m->read_latency_min,
+			      m->read_latency_max,
+			      lat);
+	__update_stdev(total, m->read_latency_sum,
+		       &m->read_latency_sq_sum, lat);
 	spin_unlock(&m->read_metric_lock);
 }
 
 void ceph_update_write_metrics(struct ceph_client_metric *m,
 			       ktime_t r_start, ktime_t r_end,
-			       int rc)
+			       unsigned int size, int rc)
 {
 	ktime_t lat = ktime_sub(r_end, r_start);
+	ktime_t total;
 
 	if (unlikely(rc && rc != -ETIMEDOUT))
 		return;
 
 	spin_lock(&m->write_metric_lock);
-	__update_latency(&m->total_writes, &m->write_latency_sum,
-			 &m->write_latency_min, &m->write_latency_max,
-			 &m->write_latency_sq_sum, lat);
+	total = ++m->total_writes;
+	m->write_size_sum += size;
+	m->write_latency_sum += lat;
+	METRIC_UPDATE_MIN_MAX(m->write_size_min,
+			      m->write_size_max,
+			      size);
+	METRIC_UPDATE_MIN_MAX(m->write_latency_min,
+			      m->write_latency_max,
+			      lat);
+	__update_stdev(total, m->write_latency_sum,
+		       &m->write_latency_sq_sum, lat);
 	spin_unlock(&m->write_metric_lock);
 }
 
@@ -347,13 +393,18 @@ void ceph_update_metadata_metrics(struct ceph_client_metric *m,
 				  int rc)
 {
 	ktime_t lat = ktime_sub(r_end, r_start);
+	ktime_t total;
 
 	if (unlikely(rc && rc != -ENOENT))
 		return;
 
 	spin_lock(&m->metadata_metric_lock);
-	__update_latency(&m->total_metadatas, &m->metadata_latency_sum,
-			 &m->metadata_latency_min, &m->metadata_latency_max,
-			 &m->metadata_latency_sq_sum, lat);
+	total = ++m->total_metadatas;
+	m->metadata_latency_sum += lat;
+	METRIC_UPDATE_MIN_MAX(m->metadata_latency_min,
+			      m->metadata_latency_max,
+			      lat);
+	__update_stdev(total, m->metadata_latency_sum,
+		       &m->metadata_latency_sq_sum, lat);
 	spin_unlock(&m->metadata_metric_lock);
 }
diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h
index e984eb2bb14b..0133955a3c6a 100644
--- a/fs/ceph/metric.h
+++ b/fs/ceph/metric.h
@@ -17,8 +17,10 @@ enum ceph_metric_type {
 	CLIENT_METRIC_TYPE_OPENED_FILES,
 	CLIENT_METRIC_TYPE_PINNED_ICAPS,
 	CLIENT_METRIC_TYPE_OPENED_INODES,
+	CLIENT_METRIC_TYPE_READ_IO_SIZES,
+	CLIENT_METRIC_TYPE_WRITE_IO_SIZES,
 
-	CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_OPENED_INODES,
+	CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_WRITE_IO_SIZES,
 };
 
 /*
@@ -34,18 +36,22 @@ enum ceph_metric_type {
 	CLIENT_METRIC_TYPE_OPENED_FILES,	\
 	CLIENT_METRIC_TYPE_PINNED_ICAPS,	\
 	CLIENT_METRIC_TYPE_OPENED_INODES,	\
+	CLIENT_METRIC_TYPE_READ_IO_SIZES,	\
+	CLIENT_METRIC_TYPE_WRITE_IO_SIZES,	\
 						\
 	CLIENT_METRIC_TYPE_MAX,			\
 }
 
-/* metric caps header */
-struct ceph_metric_cap {
+struct ceph_metric_header {
 	__le32 type;     /* ceph metric type */
-
 	__u8  ver;
 	__u8  compat;
-
 	__le32 data_len; /* length of sizeof(hit + mis + total) */
+} __packed;
+
+/* metric caps header */
+struct ceph_metric_cap {
+	struct ceph_metric_header header;
 	__le64 hit;
 	__le64 mis;
 	__le64 total;
@@ -53,48 +59,28 @@ struct ceph_metric_cap {
 
 /* metric read latency header */
 struct ceph_metric_read_latency {
-	__le32 type;     /* ceph metric type */
-
-	__u8  ver;
-	__u8  compat;
-
-	__le32 data_len; /* length of sizeof(sec + nsec) */
+	struct ceph_metric_header header;
 	__le32 sec;
 	__le32 nsec;
 } __packed;
 
 /* metric write latency header */
 struct ceph_metric_write_latency {
-	__le32 type;     /* ceph metric type */
-
-	__u8  ver;
-	__u8  compat;
-
-	__le32 data_len; /* length of sizeof(sec + nsec) */
+	struct ceph_metric_header header;
 	__le32 sec;
 	__le32 nsec;
 } __packed;
 
 /* metric metadata latency header */
 struct ceph_metric_metadata_latency {
-	__le32 type;     /* ceph metric type */
-
-	__u8  ver;
-	__u8  compat;
-
-	__le32 data_len; /* length of sizeof(sec + nsec) */
+	struct ceph_metric_header header;
 	__le32 sec;
 	__le32 nsec;
 } __packed;
 
 /* metric dentry lease header */
 struct ceph_metric_dlease {
-	__le32 type;     /* ceph metric type */
-
-	__u8  ver;
-	__u8  compat;
-
-	__le32 data_len; /* length of sizeof(hit + mis + total) */
+	struct ceph_metric_header header;
 	__le64 hit;
 	__le64 mis;
 	__le64 total;
@@ -102,40 +88,39 @@ struct ceph_metric_dlease {
 
 /* metric opened files header */
 struct ceph_opened_files {
-	__le32 type;     /* ceph metric type */
-
-	__u8  ver;
-	__u8  compat;
-
-	__le32 data_len; /* length of sizeof(opened_files + total) */
+	struct ceph_metric_header header;
 	__le64 opened_files;
 	__le64 total;
 } __packed;
 
 /* metric pinned i_caps header */
 struct ceph_pinned_icaps {
-	__le32 type;     /* ceph metric type */
-
-	__u8  ver;
-	__u8  compat;
-
-	__le32 data_len; /* length of sizeof(pinned_icaps + total) */
+	struct ceph_metric_header header;
 	__le64 pinned_icaps;
 	__le64 total;
 } __packed;
 
 /* metric opened inodes header */
 struct ceph_opened_inodes {
-	__le32 type;     /* ceph metric type */
-
-	__u8  ver;
-	__u8  compat;
-
-	__le32 data_len; /* length of sizeof(opened_inodes + total) */
+	struct ceph_metric_header header;
 	__le64 opened_inodes;
 	__le64 total;
 } __packed;
 
+/* metric read io size header */
+struct ceph_read_io_size {
+	struct ceph_metric_header header;
+	__le64 total_ops;
+	__le64 total_size;
+} __packed;
+
+/* metric write io size header */
+struct ceph_write_io_size {
+	struct ceph_metric_header header;
+	__le64 total_ops;
+	__le64 total_size;
+} __packed;
+
 struct ceph_metric_head {
 	__le32 num;	/* the number of metrics that will be sent */
 } __packed;
@@ -152,6 +137,9 @@ struct ceph_client_metric {
 
 	spinlock_t read_metric_lock;
 	u64 total_reads;
+	u64 read_size_sum;
+	u64 read_size_min;
+	u64 read_size_max;
 	ktime_t read_latency_sum;
 	ktime_t read_latency_sq_sum;
 	ktime_t read_latency_min;
@@ -159,6 +147,9 @@ struct ceph_client_metric {
 
 	spinlock_t write_metric_lock;
 	u64 total_writes;
+	u64 write_size_sum;
+	u64 write_size_min;
+	u64 write_size_max;
 	ktime_t write_latency_sum;
 	ktime_t write_latency_sq_sum;
 	ktime_t write_latency_min;
@@ -206,10 +197,10 @@ static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
 
 extern void ceph_update_read_metrics(struct ceph_client_metric *m,
 				     ktime_t r_start, ktime_t r_end,
-				     int rc);
+				     unsigned int size, int rc);
 extern void ceph_update_write_metrics(struct ceph_client_metric *m,
 				      ktime_t r_start, ktime_t r_end,
-				      int rc);
+				      unsigned int size, int rc);
 extern void ceph_update_metadata_metrics(struct ceph_client_metric *m,
 				         ktime_t r_start, ktime_t r_end,
 					 int rc);
diff --git a/fs/ceph/quota.c b/fs/ceph/quota.c
index 4e32c9600ecc..620c691af40e 100644
--- a/fs/ceph/quota.c
+++ b/fs/ceph/quota.c
@@ -74,8 +74,7 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
 		            le64_to_cpu(h->max_files));
 	spin_unlock(&ci->i_ceph_lock);
 
-	/* avoid calling iput_final() in dispatch thread */
-	ceph_async_iput(inode);
+	iput(inode);
 }
 
 static struct ceph_quotarealm_inode *
@@ -247,8 +246,7 @@ restart:
 
 		ci = ceph_inode(in);
 		has_quota = __ceph_has_any_quota(ci);
-		/* avoid calling iput_final() while holding mdsc->snap_rwsem */
-		ceph_async_iput(in);
+		iput(in);
 
 		next = realm->parent;
 		if (has_quota || !next)
@@ -383,8 +381,7 @@ restart:
 			pr_warn("Invalid quota check op (%d)\n", op);
 			exceeded = true; /* Just break the loop */
 		}
-		/* avoid calling iput_final() while holding mdsc->snap_rwsem */
-		ceph_async_iput(in);
+		iput(in);
 
 		next = realm->parent;
 		if (exceeded || !next)
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4ce18055d931..4ac0606dcbd4 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -60,11 +60,13 @@
 /*
  * increase ref count for the realm
  *
- * caller must hold snap_rwsem for write.
+ * caller must hold snap_rwsem.
  */
 void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
 			 struct ceph_snap_realm *realm)
 {
+	lockdep_assert_held(&mdsc->snap_rwsem);
+
 	dout("get_realm %p %d -> %d\n", realm,
 	     atomic_read(&realm->nref), atomic_read(&realm->nref)+1);
 	/*
@@ -113,6 +115,8 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
 {
 	struct ceph_snap_realm *realm;
 
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	realm = kzalloc(sizeof(*realm), GFP_NOFS);
 	if (!realm)
 		return ERR_PTR(-ENOMEM);
@@ -135,7 +139,7 @@ static struct ceph_snap_realm *ceph_create_snap_realm(
 /*
  * lookup the realm rooted at @ino.
  *
- * caller must hold snap_rwsem for write.
+ * caller must hold snap_rwsem.
  */
 static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
 						   u64 ino)
@@ -143,6 +147,8 @@ static struct ceph_snap_realm *__lookup_snap_realm(struct ceph_mds_client *mdsc,
 	struct rb_node *n = mdsc->snap_realms.rb_node;
 	struct ceph_snap_realm *r;
 
+	lockdep_assert_held(&mdsc->snap_rwsem);
+
 	while (n) {
 		r = rb_entry(n, struct ceph_snap_realm, node);
 		if (ino < r->ino)
@@ -176,6 +182,8 @@ static void __put_snap_realm(struct ceph_mds_client *mdsc,
 static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 				 struct ceph_snap_realm *realm)
 {
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	dout("__destroy_snap_realm %p %llx\n", realm, realm->ino);
 
 	rb_erase(&realm->node, &mdsc->snap_realms);
@@ -198,6 +206,8 @@ static void __destroy_snap_realm(struct ceph_mds_client *mdsc,
 static void __put_snap_realm(struct ceph_mds_client *mdsc,
 			     struct ceph_snap_realm *realm)
 {
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	dout("__put_snap_realm %llx %p %d -> %d\n", realm->ino, realm,
 	     atomic_read(&realm->nref), atomic_read(&realm->nref)-1);
 	if (atomic_dec_and_test(&realm->nref))
@@ -236,6 +246,8 @@ static void __cleanup_empty_realms(struct ceph_mds_client *mdsc)
 {
 	struct ceph_snap_realm *realm;
 
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	spin_lock(&mdsc->snap_empty_lock);
 	while (!list_empty(&mdsc->snap_empty)) {
 		realm = list_first_entry(&mdsc->snap_empty,
@@ -269,6 +281,8 @@ static int adjust_snap_realm_parent(struct ceph_mds_client *mdsc,
 {
 	struct ceph_snap_realm *parent;
 
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	if (realm->parent_ino == parentino)
 		return 0;
 
@@ -460,7 +474,7 @@ static bool has_new_snaps(struct ceph_snap_context *o,
  * Caller must hold snap_rwsem for read (i.e., the realm topology won't
  * change).
  */
-void ceph_queue_cap_snap(struct ceph_inode_info *ci)
+static void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 {
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap_snap *capsnap;
@@ -663,15 +677,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
 		if (!inode)
 			continue;
 		spin_unlock(&realm->inodes_with_caps_lock);
-		/* avoid calling iput_final() while holding
-		 * mdsc->snap_rwsem or in mds dispatch threads */
-		ceph_async_iput(lastinode);
+		iput(lastinode);
 		lastinode = inode;
 		ceph_queue_cap_snap(ci);
 		spin_lock(&realm->inodes_with_caps_lock);
 	}
 	spin_unlock(&realm->inodes_with_caps_lock);
-	ceph_async_iput(lastinode);
+	iput(lastinode);
 
 	dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
 }
@@ -696,6 +708,8 @@ int ceph_update_snap_trace(struct ceph_mds_client *mdsc,
 	int err = -ENOMEM;
 	LIST_HEAD(dirty_realms);
 
+	lockdep_assert_held_write(&mdsc->snap_rwsem);
+
 	dout("update_snap_trace deletion=%d\n", deletion);
 more:
 	ceph_decode_need(&p, e, sizeof(*ri), bad);
@@ -791,7 +805,7 @@ more:
 	return 0;
 
 bad:
-	err = -EINVAL;
+	err = -EIO;
 fail:
 	if (realm && !IS_ERR(realm))
 		ceph_put_snap_realm(mdsc, realm);
@@ -823,17 +837,12 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
 		ihold(inode);
 		spin_unlock(&mdsc->snap_flush_lock);
 		ceph_flush_snaps(ci, &session);
-		/* avoid calling iput_final() while holding
-		 * session->s_mutex or in mds dispatch threads */
-		ceph_async_iput(inode);
+		iput(inode);
 		spin_lock(&mdsc->snap_flush_lock);
 	}
 	spin_unlock(&mdsc->snap_flush_lock);
 
-	if (session) {
-		mutex_unlock(&session->s_mutex);
-		ceph_put_mds_session(session);
-	}
+	ceph_put_mds_session(session);
 	dout("flush_snaps done\n");
 }
 
@@ -969,14 +978,12 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
 			ceph_get_snap_realm(mdsc, realm);
 			ceph_put_snap_realm(mdsc, oldrealm);
 
-			/* avoid calling iput_final() while holding
-			 * mdsc->snap_rwsem or mds in dispatch threads */
-			ceph_async_iput(inode);
+			iput(inode);
 			continue;
 
 skip_inode:
 			spin_unlock(&ci->i_ceph_lock);
-			ceph_async_iput(inode);
+			iput(inode);
 		}
 
 		/* we may have taken some of the old realm's children. */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 839e6b0239ee..6b6332a5c113 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -931,7 +931,6 @@ extern int ceph_update_snap_trace(struct ceph_mds_client *m,
 extern void ceph_handle_snap(struct ceph_mds_client *mdsc,
 			     struct ceph_mds_session *session,
 			     struct ceph_msg *msg);
-extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
 extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
 				  struct ceph_cap_snap *capsnap);
 extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
@@ -989,8 +988,6 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask);
 extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
 extern void __ceph_do_pending_vmtruncate(struct inode *inode);
 
-extern void ceph_async_iput(struct inode *inode);
-
 void ceph_queue_inode_work(struct inode *inode, int work_bit);
 
 static inline void ceph_queue_vmtruncate(struct inode *inode)
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index d2b268a1838e..d38c9eadbe2f 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -58,12 +58,10 @@ struct ceph_auth_client *ceph_auth_init(const char *name,
 					const int *con_modes)
 {
 	struct ceph_auth_client *ac;
-	int ret;
 
-	ret = -ENOMEM;
 	ac = kzalloc(sizeof(*ac), GFP_NOFS);
 	if (!ac)
-		goto out;
+		return ERR_PTR(-ENOMEM);
 
 	mutex_init(&ac->mutex);
 	ac->negotiating = true;
@@ -78,9 +76,6 @@ struct ceph_auth_client *ceph_auth_init(const char *name,
 	dout("%s name '%s' preferred_mode %d fallback_mode %d\n", __func__,
 	     ac->name, ac->preferred_mode, ac->fallback_mode);
 	return ac;
-
-out:
-	return ERR_PTR(ret);
 }
 
 void ceph_auth_destroy(struct ceph_auth_client *ac)
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 097e9f8d87a7..77b5519bc45f 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -112,8 +112,8 @@ static int ceph_auth_none_create_authorizer(
 	auth->authorizer = (struct ceph_authorizer *) au;
 	auth->authorizer_buf = au->buf;
 	auth->authorizer_buf_len = au->buf_len;
-	auth->authorizer_reply_buf = au->reply_buf;
-	auth->authorizer_reply_buf_len = sizeof (au->reply_buf);
+	auth->authorizer_reply_buf = NULL;
+	auth->authorizer_reply_buf_len = 0;
 
 	return 0;
 }
diff --git a/net/ceph/auth_none.h b/net/ceph/auth_none.h
index 4158f064302e..bb121539e796 100644
--- a/net/ceph/auth_none.h
+++ b/net/ceph/auth_none.h
@@ -16,7 +16,6 @@ struct ceph_none_authorizer {
 	struct ceph_authorizer base;
 	char buf[128];
 	int buf_len;
-	char reply_buf[0];
 };
 
 struct ceph_auth_none_info {
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
index 17447c19d937..66136a4c1ce7 100644
--- a/net/ceph/cls_lock_client.c
+++ b/net/ceph/cls_lock_client.c
@@ -10,7 +10,9 @@
 
 /**
  * ceph_cls_lock - grab rados lock for object
- * @oid, @oloc: object to lock
+ * @osdc: OSD client instance
+ * @oid: object to lock
+ * @oloc: object to lock
  * @lock_name: the name of the lock
  * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
  * @cookie: user-defined identifier for this instance of the lock
@@ -82,7 +84,9 @@ EXPORT_SYMBOL(ceph_cls_lock);
 
 /**
  * ceph_cls_unlock - release rados lock for object
- * @oid, @oloc: object to lock
+ * @osdc: OSD client instance
+ * @oid: object to lock
+ * @oloc: object to lock
  * @lock_name: the name of the lock
  * @cookie: user-defined identifier for this instance of the lock
  */
@@ -130,7 +134,9 @@ EXPORT_SYMBOL(ceph_cls_unlock);
 
 /**
  * ceph_cls_break_lock - release rados lock for object for specified client
- * @oid, @oloc: object to lock
+ * @osdc: OSD client instance
+ * @oid: object to lock
+ * @oloc: object to lock
  * @lock_name: the name of the lock
  * @cookie: user-defined identifier for this instance of the lock
  * @locker: current lock owner