summary refs log tree commit diff
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2019-09-25 10:21:13 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2019-09-25 10:21:13 -0700
commitf41def397161053eb0d3ed6861ef65985efbf293 (patch)
tree28c03e8f26fc975ab059ff407b0c3d9165bc489f /fs/ceph
parent7b1373dd6e86f3a222590ae404a400e699b32884 (diff)
parent3ee5a7015c8b7cb4de21f7345f8381946f2fce55 (diff)
downloadlinux-f41def397161053eb0d3ed6861ef65985efbf293.tar.gz
Merge tag 'ceph-for-5.4-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - automatic recovery of a blacklisted filesystem session (Zheng Yan).
     This is disabled by default and can be enabled by mounting with the
     new "recover_session=clean" option.

   - serialize buffered reads and O_DIRECT writes (Jeff Layton). Care is
     taken to avoid serializing O_DIRECT reads and writes with each
     other, this is based on the exclusion scheme from NFS.

   - handle large osdmaps better in the face of fragmented memory
     (myself)

   - don't limit what security.* xattrs can be get or set (Jeff Layton).
     We were overly restrictive here, unnecessarily preventing things
     like file capability sets stored in security.capability from
     working.

   - allow copy_file_range() within the same inode and across different
     filesystems within the same cluster (Luis Henriques)"

* tag 'ceph-for-5.4-rc1' of git://github.com/ceph/ceph-client: (41 commits)
  ceph: call ceph_mdsc_destroy from destroy_fs_client
  libceph: use ceph_kvmalloc() for osdmap arrays
  libceph: avoid a __vmalloc() deadlock in ceph_kvmalloc()
  ceph: allow object copies across different filesystems in the same cluster
  ceph: include ceph_debug.h in cache.c
  ceph: move static keyword to the front of declarations
  rbd: pull rbd_img_request_create() dout out into the callers
  ceph: reconnect connection if session hang in opening state
  libceph: drop unused con parameter of calc_target()
  ceph: use release_pages() directly
  rbd: fix response length parameter for encoded strings
  ceph: allow arbitrary security.* xattrs
  ceph: only set CEPH_I_SEC_INITED if we got a MAC label
  ceph: turn ceph_security_invalidate_secctx into static inline
  ceph: add buffered/direct exclusionary locking for reads and writes
  libceph: handle OSD op ceph_pagelist_append() errors
  ceph: don't return a value from void function
  ceph: don't freeze during write page faults
  ceph: update the mtime when truncating up
  ceph: fix indentation in __get_snap_name()
  ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/Makefile2
-rw-r--r--fs/ceph/addr.c61
-rw-r--r--fs/ceph/cache.c2
-rw-r--r--fs/ceph/caps.c173
-rw-r--r--fs/ceph/debugfs.c1
-rw-r--r--fs/ceph/export.c60
-rw-r--r--fs/ceph/file.c104
-rw-r--r--fs/ceph/inode.c50
-rw-r--r--fs/ceph/io.c163
-rw-r--r--fs/ceph/io.h12
-rw-r--r--fs/ceph/locks.c8
-rw-r--r--fs/ceph/mds_client.c110
-rw-r--r--fs/ceph/mds_client.h8
-rw-r--r--fs/ceph/super.c52
-rw-r--r--fs/ceph/super.h49
-rw-r--r--fs/ceph/xattr.c76
16 files changed, 596 insertions, 335 deletions
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index a699e320393f..c1da294418d1 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -6,7 +6,7 @@
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
 ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
-	export.o caps.o snap.o xattr.o quota.o \
+	export.o caps.o snap.o xattr.o quota.o io.o \
 	mds_client.o mdsmap.o strings.o ceph_frag.o \
 	debugfs.o
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index b3c8b886bf64..7ab616601141 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -189,8 +189,7 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
 {
 	struct inode *inode = file_inode(filp);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_osd_client *osdc =
-		&ceph_inode_to_client(inode)->client->osdc;
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	int err = 0;
 	u64 off = page_offset(page);
 	u64 len = PAGE_SIZE;
@@ -219,8 +218,8 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
 
 	dout("readpage inode %p file %p page %p index %lu\n",
 	     inode, filp, page, page->index);
-	err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-				  off, &len,
+	err = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
+				  &ci->i_layout, off, &len,
 				  ci->i_truncate_seq, ci->i_truncate_size,
 				  &page, 1, 0);
 	if (err == -ENOENT)
@@ -228,6 +227,8 @@ static int ceph_do_readpage(struct file *filp, struct page *page)
 	if (err < 0) {
 		SetPageError(page);
 		ceph_fscache_readpage_cancel(inode, page);
+		if (err == -EBLACKLISTED)
+			fsc->blacklisted = true;
 		goto out;
 	}
 	if (err < PAGE_SIZE)
@@ -266,6 +267,8 @@ static void finish_read(struct ceph_osd_request *req)
 	int i;
 
 	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
+	if (rc == -EBLACKLISTED)
+		ceph_inode_to_client(inode)->blacklisted = true;
 
 	/* unlock all pages, zeroing any data we didn't read */
 	osd_data = osd_req_op_extent_osd_data(req, 0);
@@ -323,7 +326,8 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
 		/* caller of readpages does not hold buffer and read caps
 		 * (fadvise, madvise and readahead cases) */
 		int want = CEPH_CAP_FILE_CACHE;
-		ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, true, &got);
+		ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want,
+					true, &got);
 		if (ret < 0) {
 			dout("start_read %p, error getting cap\n", inode);
 		} else if (!(got & want)) {
@@ -569,7 +573,7 @@ static u64 get_writepages_data_length(struct inode *inode,
 /*
  * Write a single page, but leave the page locked.
  *
- * If we get a write error, set the page error bit, but still adjust the
+ * If we get a write error, mark the mapping for error, but still adjust the
  * dirty page accounting (i.e., page is no longer dirty).
  */
 static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
@@ -640,9 +644,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 			end_page_writeback(page);
 			return err;
 		}
+		if (err == -EBLACKLISTED)
+			fsc->blacklisted = true;
 		dout("writepage setting page/mapping error %d %p\n",
 		     err, page);
-		SetPageError(page);
 		mapping_set_error(&inode->i_data, err);
 		wbc->pages_skipped++;
 	} else {
@@ -680,23 +685,6 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
 }
 
 /*
- * lame release_pages helper.  release_pages() isn't exported to
- * modules.
- */
-static void ceph_release_pages(struct page **pages, int num)
-{
-	struct pagevec pvec;
-	int i;
-
-	pagevec_init(&pvec);
-	for (i = 0; i < num; i++) {
-		if (pagevec_add(&pvec, pages[i]) == 0)
-			pagevec_release(&pvec);
-	}
-	pagevec_release(&pvec);
-}
-
-/*
  * async writeback completion handler.
  *
  * If we get an error, set the mapping error bit, but not the individual
@@ -720,6 +708,8 @@ static void writepages_finish(struct ceph_osd_request *req)
 	if (rc < 0) {
 		mapping_set_error(mapping, rc);
 		ceph_set_error_write(ci);
+		if (rc == -EBLACKLISTED)
+			fsc->blacklisted = true;
 	} else {
 		ceph_clear_error_write(ci);
 	}
@@ -769,7 +759,7 @@ static void writepages_finish(struct ceph_osd_request *req)
 		dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
 		     inode, osd_data->length, rc >= 0 ? num_pages : 0);
 
-		ceph_release_pages(osd_data->pages, num_pages);
+		release_pages(osd_data->pages, num_pages);
 	}
 
 	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
@@ -1452,7 +1442,8 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
 		want = CEPH_CAP_FILE_CACHE;
 
 	got = 0;
-	err = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
+	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1,
+			    &got, &pinned_page);
 	if (err < 0)
 		goto out_restore;
 
@@ -1540,6 +1531,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 	if (!prealloc_cf)
 		return VM_FAULT_OOM;
 
+	sb_start_pagefault(inode->i_sb);
 	ceph_block_sigs(&oldset);
 
 	if (ci->i_inline_version != CEPH_INLINE_NONE) {
@@ -1568,7 +1560,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 		want = CEPH_CAP_FILE_BUFFER;
 
 	got = 0;
-	err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
+	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len,
 			    &got, NULL);
 	if (err < 0)
 		goto out_free;
@@ -1614,6 +1606,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 	ceph_put_cap_refs(ci, got);
 out_free:
 	ceph_restore_sigs(&oldset);
+	sb_end_pagefault(inode->i_sb);
 	ceph_free_cap_flush(prealloc_cf);
 	if (err < 0)
 		ret = vmf_error(err);
@@ -1946,12 +1939,17 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
 
 	if (err >= 0 || err == -ENOENT)
 		have |= POOL_READ;
-	else if (err != -EPERM)
+	else if (err != -EPERM) {
+		if (err == -EBLACKLISTED)
+			fsc->blacklisted = true;
 		goto out_unlock;
+	}
 
 	if (err2 == 0 || err2 == -EEXIST)
 		have |= POOL_WRITE;
 	else if (err2 != -EPERM) {
+		if (err2 == -EBLACKLISTED)
+			fsc->blacklisted = true;
 		err = err2;
 		goto out_unlock;
 	}
@@ -1989,10 +1987,11 @@ out:
 	return err;
 }
 
-int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
+int ceph_pool_perm_check(struct inode *inode, int need)
 {
-	s64 pool;
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_string *pool_ns;
+	s64 pool;
 	int ret, flags;
 
 	if (ci->i_vino.snap != CEPH_NOSNAP) {
@@ -2004,7 +2003,7 @@ int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
 		return 0;
 	}
 
-	if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
+	if (ceph_test_mount_opt(ceph_inode_to_client(inode),
 				NOPOOLPERM))
 		return 0;
 
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index bc90cf6ad7ed..b2ec29eeb4c4 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -6,6 +6,8 @@
  *  Written by Milosz Tanski (milosz@adfin.com)
  */
 
+#include <linux/ceph/ceph_debug.h>
+
 #include "super.h"
 #include "cache.h"
 
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ce0f5658720a..d3b9c9d5c1bd 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -458,37 +458,6 @@ struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
 }
 
 /*
- * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
- */
-static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
-{
-	struct ceph_cap *cap;
-	int mds = -1;
-	struct rb_node *p;
-
-	/* prefer mds with WR|BUFFER|EXCL caps */
-	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
-		cap = rb_entry(p, struct ceph_cap, ci_node);
-		mds = cap->mds;
-		if (cap->issued & (CEPH_CAP_FILE_WR |
-				   CEPH_CAP_FILE_BUFFER |
-				   CEPH_CAP_FILE_EXCL))
-			break;
-	}
-	return mds;
-}
-
-int ceph_get_cap_mds(struct inode *inode)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int mds;
-	spin_lock(&ci->i_ceph_lock);
-	mds = __ceph_get_cap_mds(ceph_inode(inode));
-	spin_unlock(&ci->i_ceph_lock);
-	return mds;
-}
-
-/*
  * Called under i_ceph_lock.
  */
 static void __insert_cap_node(struct ceph_inode_info *ci,
@@ -628,7 +597,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 /*
  * Add a capability under the given MDS session.
  *
- * Caller should hold session snap_rwsem (read) and s_mutex.
+ * Caller should hold session snap_rwsem (read) and ci->i_ceph_lock
  *
  * @fmode is the open file mode, if we are opening a file, otherwise
  * it is < 0.  (This is so we can atomically add the cap and add an
@@ -645,6 +614,9 @@ void ceph_add_cap(struct inode *inode,
 	struct ceph_cap *cap;
 	int mds = session->s_mds;
 	int actual_wanted;
+	u32 gen;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
 
 	dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
 	     session->s_mds, cap_id, ceph_cap_string(issued), seq);
@@ -656,6 +628,10 @@ void ceph_add_cap(struct inode *inode,
 	if (fmode >= 0)
 		wanted |= ceph_caps_for_mode(fmode);
 
+	spin_lock(&session->s_gen_ttl_lock);
+	gen = session->s_cap_gen;
+	spin_unlock(&session->s_gen_ttl_lock);
+
 	cap = __get_cap_for_mds(ci, mds);
 	if (!cap) {
 		cap = *new_cap;
@@ -681,7 +657,7 @@ void ceph_add_cap(struct inode *inode,
 		list_move_tail(&cap->session_caps, &session->s_caps);
 		spin_unlock(&session->s_cap_lock);
 
-		if (cap->cap_gen < session->s_cap_gen)
+		if (cap->cap_gen < gen)
 			cap->issued = cap->implemented = CEPH_CAP_PIN;
 
 		/*
@@ -775,7 +751,7 @@ void ceph_add_cap(struct inode *inode,
 	cap->seq = seq;
 	cap->issue_seq = seq;
 	cap->mseq = mseq;
-	cap->cap_gen = session->s_cap_gen;
+	cap->cap_gen = gen;
 
 	if (fmode >= 0)
 		__ceph_get_fmode(ci, fmode);
@@ -1284,10 +1260,6 @@ void __ceph_remove_caps(struct ceph_inode_info *ci)
  * Make note of max_size reported/requested from mds, revoked caps
  * that have now been implemented.
  *
- * Make half-hearted attempt ot to invalidate page cache if we are
- * dropping RDCACHE.  Note that this will leave behind locked pages
- * that we'll then need to deal with elsewhere.
- *
  * Return non-zero if delayed release, or we experienced an error
  * such that the caller should requeue + retry later.
  *
@@ -1746,11 +1718,11 @@ static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
  *
- * Called under i_ceph_lock.
+ * Called under i_ceph_lock. Returns the flush tid.
  */
-static int __mark_caps_flushing(struct inode *inode,
+static u64 __mark_caps_flushing(struct inode *inode,
 				struct ceph_mds_session *session, bool wake,
-				u64 *flush_tid, u64 *oldest_flush_tid)
+				u64 *oldest_flush_tid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1789,8 +1761,7 @@ static int __mark_caps_flushing(struct inode *inode,
 
 	list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
 
-	*flush_tid = cf->tid;
-	return flushing;
+	return cf->tid;
 }
 
 /*
@@ -2028,11 +1999,6 @@ retry_locked:
 		}
 
 ack:
-		if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
-			dout(" skipping %p I_NOFLUSH set\n", inode);
-			continue;
-		}
-
 		if (session && session != cap->session) {
 			dout("oops, wrong session %p mutex\n", session);
 			mutex_unlock(&session->s_mutex);
@@ -2080,9 +2046,9 @@ ack:
 		}
 
 		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
-			flushing = __mark_caps_flushing(inode, session, false,
-							&flush_tid,
-							&oldest_flush_tid);
+			flushing = ci->i_dirty_caps;
+			flush_tid = __mark_caps_flushing(inode, session, false,
+							 &oldest_flush_tid);
 		} else {
 			flushing = 0;
 			flush_tid = 0;
@@ -2130,16 +2096,11 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
 retry:
 	spin_lock(&ci->i_ceph_lock);
 retry_locked:
-	if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
-		spin_unlock(&ci->i_ceph_lock);
-		dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
-		goto out;
-	}
 	if (ci->i_dirty_caps && ci->i_auth_cap) {
 		struct ceph_cap *cap = ci->i_auth_cap;
 		int delayed;
 
-		if (!session || session != cap->session) {
+		if (session != cap->session) {
 			spin_unlock(&ci->i_ceph_lock);
 			if (session)
 				mutex_unlock(&session->s_mutex);
@@ -2161,8 +2122,9 @@ retry_locked:
 			goto retry_locked;
 		}
 
-		flushing = __mark_caps_flushing(inode, session, true,
-						&flush_tid, &oldest_flush_tid);
+		flushing = ci->i_dirty_caps;
+		flush_tid = __mark_caps_flushing(inode, session, true,
+						 &oldest_flush_tid);
 
 		/* __send_cap drops i_ceph_lock */
 		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
@@ -2261,35 +2223,45 @@ static int unsafe_request_wait(struct inode *inode)
 
 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
+	struct ceph_file_info *fi = file->private_data;
 	struct inode *inode = file->f_mapping->host;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 flush_tid;
-	int ret;
+	int ret, err;
 	int dirty;
 
 	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
 
 	ret = file_write_and_wait_range(file, start, end);
-	if (ret < 0)
-		goto out;
-
 	if (datasync)
 		goto out;
 
 	dirty = try_flush_caps(inode, &flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
-	ret = unsafe_request_wait(inode);
+	err = unsafe_request_wait(inode);
 
 	/*
 	 * only wait on non-file metadata writeback (the mds
 	 * can recover size and mtime, so we don't need to
 	 * wait for that)
 	 */
-	if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
-		ret = wait_event_interruptible(ci->i_cap_wq,
+	if (!err && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
+		err = wait_event_interruptible(ci->i_cap_wq,
 					caps_are_flushed(inode, flush_tid));
 	}
+
+	if (err < 0)
+		ret = err;
+
+	if (errseq_check(&ci->i_meta_err, READ_ONCE(fi->meta_err))) {
+		spin_lock(&file->f_lock);
+		err = errseq_check_and_advance(&ci->i_meta_err,
+					       &fi->meta_err);
+		spin_unlock(&file->f_lock);
+		if (err < 0)
+			ret = err;
+	}
 out:
 	dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
 	return ret;
@@ -2560,10 +2532,15 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got,
  *
  * FIXME: how does a 0 return differ from -EAGAIN?
  */
-static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-			    loff_t endoff, bool nonblock, int *got)
+enum {
+	NON_BLOCKING	= 1,
+	CHECK_FILELOCK	= 2,
+};
+
+static int try_get_cap_refs(struct inode *inode, int need, int want,
+			    loff_t endoff, int flags, int *got)
 {
-	struct inode *inode = &ci->vfs_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	int ret = 0;
 	int have, implemented;
@@ -2576,6 +2553,13 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
 again:
 	spin_lock(&ci->i_ceph_lock);
 
+	if ((flags & CHECK_FILELOCK) &&
+	    (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK)) {
+		dout("try_get_cap_refs %p error filelock\n", inode);
+		ret = -EIO;
+		goto out_unlock;
+	}
+
 	/* make sure file is actually open */
 	file_wanted = __ceph_caps_file_wanted(ci);
 	if ((file_wanted & need) != need) {
@@ -2637,7 +2621,7 @@ again:
 					 * we can not call down_read() when
 					 * task isn't in TASK_RUNNING state
 					 */
-					if (nonblock) {
+					if (flags & NON_BLOCKING) {
 						ret = -EAGAIN;
 						goto out_unlock;
 					}
@@ -2731,18 +2715,19 @@ static void check_max_size(struct inode *inode, loff_t endoff)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
-int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
+int ceph_try_get_caps(struct inode *inode, int need, int want,
 		      bool nonblock, int *got)
 {
 	int ret;
 
 	BUG_ON(need & ~CEPH_CAP_FILE_RD);
 	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
-	ret = ceph_pool_perm_check(ci, need);
+	ret = ceph_pool_perm_check(inode, need);
 	if (ret < 0)
 		return ret;
 
-	ret = try_get_cap_refs(ci, need, want, 0, nonblock, got);
+	ret = try_get_cap_refs(inode, need, want, 0,
+			       (nonblock ? NON_BLOCKING : 0), got);
 	return ret == -EAGAIN ? 0 : ret;
 }
 
@@ -2751,30 +2736,40 @@ int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want,
  * due to a small max_size, make sure we check_max_size (and possibly
  * ask the mds) so we don't get hung up indefinitely.
  */
-int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+int ceph_get_caps(struct file *filp, int need, int want,
 		  loff_t endoff, int *got, struct page **pinned_page)
 {
-	int _got, ret;
+	struct ceph_file_info *fi = filp->private_data;
+	struct inode *inode = file_inode(filp);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	int ret, _got, flags;
 
-	ret = ceph_pool_perm_check(ci, need);
+	ret = ceph_pool_perm_check(inode, need);
 	if (ret < 0)
 		return ret;
 
+	if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+	    fi->filp_gen != READ_ONCE(fsc->filp_gen))
+		return -EBADF;
+
 	while (true) {
 		if (endoff > 0)
-			check_max_size(&ci->vfs_inode, endoff);
+			check_max_size(inode, endoff);
 
+		flags = atomic_read(&fi->num_locks) ? CHECK_FILELOCK : 0;
 		_got = 0;
-		ret = try_get_cap_refs(ci, need, want, endoff,
-				       false, &_got);
+		ret = try_get_cap_refs(inode, need, want, endoff,
+				       flags, &_got);
 		if (ret == -EAGAIN)
 			continue;
 		if (!ret) {
 			DEFINE_WAIT_FUNC(wait, woken_wake_function);
 			add_wait_queue(&ci->i_cap_wq, &wait);
 
-			while (!(ret = try_get_cap_refs(ci, need, want, endoff,
-							true, &_got))) {
+			flags |= NON_BLOCKING;
+			while (!(ret = try_get_cap_refs(inode, need, want,
+							endoff, flags, &_got))) {
 				if (signal_pending(current)) {
 					ret = -ERESTARTSYS;
 					break;
@@ -2786,10 +2781,18 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
 			if (ret == -EAGAIN)
 				continue;
 		}
+
+		if ((fi->fmode & CEPH_FILE_MODE_WR) &&
+		    fi->filp_gen != READ_ONCE(fsc->filp_gen)) {
+			if (ret >= 0 && _got)
+				ceph_put_cap_refs(ci, _got);
+			return -EBADF;
+		}
+
 		if (ret < 0) {
 			if (ret == -ESTALE) {
 				/* session was killed, try renew caps */
-				ret = ceph_renew_caps(&ci->vfs_inode);
+				ret = ceph_renew_caps(inode);
 				if (ret == 0)
 					continue;
 			}
@@ -2798,9 +2801,9 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
 
 		if (ci->i_inline_version != CEPH_INLINE_NONE &&
 		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
-		    i_size_read(&ci->vfs_inode) > 0) {
+		    i_size_read(inode) > 0) {
 			struct page *page =
-				find_get_page(ci->vfs_inode.i_mapping, 0);
+				find_get_page(inode->i_mapping, 0);
 			if (page) {
 				if (PageUptodate(page)) {
 					*pinned_page = page;
@@ -2819,7 +2822,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
 			 * getattr request will bring inline data into
 			 * page cache
 			 */
-			ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+			ret = __ceph_do_getattr(inode, NULL,
 						CEPH_STAT_CAP_INLINE_DATA,
 						true);
 			if (ret < 0)
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 2eb88ed22993..facb387c2735 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -294,7 +294,6 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
 
 void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
 {
-	return 0;
 }
 
 void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 15ff1b09cfa2..b6bfa94332c3 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -35,7 +35,7 @@ struct ceph_nfs_snapfh {
 static int ceph_encode_snapfh(struct inode *inode, u32 *rawfh, int *max_len,
 			      struct inode *parent_inode)
 {
-	const static int snap_handle_length =
+	static const int snap_handle_length =
 		sizeof(struct ceph_nfs_snapfh) >> 2;
 	struct ceph_nfs_snapfh *sfh = (void *)rawfh;
 	u64 snapid = ceph_snap(inode);
@@ -85,9 +85,9 @@ out:
 static int ceph_encode_fh(struct inode *inode, u32 *rawfh, int *max_len,
 			  struct inode *parent_inode)
 {
-	const static int handle_length =
+	static const int handle_length =
 		sizeof(struct ceph_nfs_fh) >> 2;
-	const static int connected_handle_length =
+	static const int connected_handle_length =
 		sizeof(struct ceph_nfs_confh) >> 2;
 	int type;
 
@@ -458,33 +458,33 @@ static int __get_snap_name(struct dentry *parent, char *name,
 		if (err < 0)
 			goto out;
 
-		 rinfo = &req->r_reply_info;
-		 for (i = 0; i < rinfo->dir_nr; i++) {
-			 rde = rinfo->dir_entries + i;
-			 BUG_ON(!rde->inode.in);
-			 if (ceph_snap(inode) ==
-			     le64_to_cpu(rde->inode.in->snapid)) {
-				 memcpy(name, rde->name, rde->name_len);
-				 name[rde->name_len] = '\0';
-				 err = 0;
-				 goto out;
-			 }
-		 }
-
-		 if (rinfo->dir_end)
-			 break;
-
-		 BUG_ON(rinfo->dir_nr <= 0);
-		 rde = rinfo->dir_entries + (rinfo->dir_nr - 1);
-		 next_offset += rinfo->dir_nr;
-		 last_name = kstrndup(rde->name, rde->name_len, GFP_KERNEL);
-		 if (!last_name) {
-			 err = -ENOMEM;
-			 goto out;
-		 }
-
-		 ceph_mdsc_put_request(req);
-		 req = NULL;
+		rinfo = &req->r_reply_info;
+		for (i = 0; i < rinfo->dir_nr; i++) {
+			rde = rinfo->dir_entries + i;
+			BUG_ON(!rde->inode.in);
+			if (ceph_snap(inode) ==
+			    le64_to_cpu(rde->inode.in->snapid)) {
+				memcpy(name, rde->name, rde->name_len);
+				name[rde->name_len] = '\0';
+				err = 0;
+				goto out;
+			}
+		}
+
+		if (rinfo->dir_end)
+			break;
+
+		BUG_ON(rinfo->dir_nr <= 0);
+		rde = rinfo->dir_entries + (rinfo->dir_nr - 1);
+		next_offset += rinfo->dir_nr;
+		last_name = kstrndup(rde->name, rde->name_len, GFP_KERNEL);
+		if (!last_name) {
+			err = -ENOMEM;
+			goto out;
+		}
+
+		ceph_mdsc_put_request(req);
+		req = NULL;
 	}
 	err = -ENOENT;
 out:
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 685a03cc4b77..d277f71abe0b 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -15,6 +15,7 @@
 #include "super.h"
 #include "mds_client.h"
 #include "cache.h"
+#include "io.h"
 
 static __le32 ceph_flags_sys2wire(u32 flags)
 {
@@ -201,6 +202,7 @@ out:
 static int ceph_init_file_info(struct inode *inode, struct file *file,
 					int fmode, bool isdir)
 {
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_file_info *fi;
 
 	dout("%s %p %p 0%o (%s)\n", __func__, inode, file,
@@ -211,7 +213,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
 		struct ceph_dir_file_info *dfi =
 			kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL);
 		if (!dfi) {
-			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+			ceph_put_fmode(ci, fmode); /* clean up */
 			return -ENOMEM;
 		}
 
@@ -222,7 +224,7 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
 	} else {
 		fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
 		if (!fi) {
-			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
+			ceph_put_fmode(ci, fmode); /* clean up */
 			return -ENOMEM;
 		}
 
@@ -232,6 +234,8 @@ static int ceph_init_file_info(struct inode *inode, struct file *file,
 	fi->fmode = fmode;
 	spin_lock_init(&fi->rw_contexts_lock);
 	INIT_LIST_HEAD(&fi->rw_contexts);
+	fi->meta_err = errseq_sample(&ci->i_meta_err);
+	fi->filp_gen = READ_ONCE(ceph_inode_to_client(inode)->filp_gen);
 
 	return 0;
 }
@@ -695,7 +699,13 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 			ceph_release_page_vector(pages, num_pages);
 		}
 
-		if (ret <= 0 || off >= i_size || !more)
+		if (ret < 0) {
+			if (ret == -EBLACKLISTED)
+				fsc->blacklisted = true;
+			break;
+		}
+
+		if (off >= i_size || !more)
 			break;
 	}
 
@@ -921,7 +931,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	struct ceph_aio_request *aio_req = NULL;
 	int num_pages = 0;
 	int flags;
-	int ret;
+	int ret = 0;
 	struct timespec64 mtime = current_time(inode);
 	size_t count = iov_iter_count(iter);
 	loff_t pos = iocb->ki_pos;
@@ -935,11 +945,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	     (write ? "write" : "read"), file, pos, (unsigned)count,
 	     snapc, snapc ? snapc->seq : 0);
 
-	ret = filemap_write_and_wait_range(inode->i_mapping,
-					   pos, pos + count - 1);
-	if (ret < 0)
-		return ret;
-
 	if (write) {
 		int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
 					pos >> PAGE_SHIFT,
@@ -1260,7 +1265,8 @@ again:
 		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 	else
 		want = CEPH_CAP_FILE_CACHE;
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
+	ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
+			    &got, &pinned_page);
 	if (ret < 0)
 		return ret;
 
@@ -1274,12 +1280,16 @@ again:
 
 		if (ci->i_inline_version == CEPH_INLINE_NONE) {
 			if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
+				ceph_start_io_direct(inode);
 				ret = ceph_direct_read_write(iocb, to,
 							     NULL, NULL);
+				ceph_end_io_direct(inode);
 				if (ret >= 0 && ret < len)
 					retry_op = CHECK_EOF;
 			} else {
+				ceph_start_io_read(inode);
 				ret = ceph_sync_read(iocb, to, &retry_op);
+				ceph_end_io_read(inode);
 			}
 		} else {
 			retry_op = READ_INLINE;
@@ -1290,7 +1300,9 @@ again:
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
 		     ceph_cap_string(got));
 		ceph_add_rw_context(fi, &rw_ctx);
+		ceph_start_io_read(inode);
 		ret = generic_file_read_iter(iocb, to);
+		ceph_end_io_read(inode);
 		ceph_del_rw_context(fi, &rw_ctx);
 	}
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
@@ -1399,7 +1411,10 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 		return -ENOMEM;
 
 retry_snap:
-	inode_lock(inode);
+	if (iocb->ki_flags & IOCB_DIRECT)
+		ceph_start_io_direct(inode);
+	else
+		ceph_start_io_write(inode);
 
 	/* We can write back this queue in page reclaim */
 	current->backing_dev_info = inode_to_bdi(inode);
@@ -1457,7 +1472,7 @@ retry_snap:
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 	got = 0;
-	err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+	err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count,
 			    &got, NULL);
 	if (err < 0)
 		goto out;
@@ -1470,7 +1485,6 @@ retry_snap:
 	    (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
 		struct ceph_snap_context *snapc;
 		struct iov_iter data;
-		inode_unlock(inode);
 
 		spin_lock(&ci->i_ceph_lock);
 		if (__ceph_have_pending_cap_snap(ci)) {
@@ -1487,11 +1501,14 @@ retry_snap:
 
 		/* we might need to revert back to that point */
 		data = *from;
-		if (iocb->ki_flags & IOCB_DIRECT)
+		if (iocb->ki_flags & IOCB_DIRECT) {
 			written = ceph_direct_read_write(iocb, &data, snapc,
 							 &prealloc_cf);
-		else
+			ceph_end_io_direct(inode);
+		} else {
 			written = ceph_sync_write(iocb, &data, pos, snapc);
+			ceph_end_io_write(inode);
+		}
 		if (written > 0)
 			iov_iter_advance(from, written);
 		ceph_put_snap_context(snapc);
@@ -1506,7 +1523,7 @@ retry_snap:
 		written = generic_perform_write(file, from, pos);
 		if (likely(written >= 0))
 			iocb->ki_pos = pos + written;
-		inode_unlock(inode);
+		ceph_end_io_write(inode);
 	}
 
 	if (written >= 0) {
@@ -1541,9 +1558,11 @@ retry_snap:
 	}
 
 	goto out_unlocked;
-
 out:
-	inode_unlock(inode);
+	if (iocb->ki_flags & IOCB_DIRECT)
+		ceph_end_io_direct(inode);
+	else
+		ceph_end_io_write(inode);
 out_unlocked:
 	ceph_free_cap_flush(prealloc_cf);
 	current->backing_dev_info = NULL;
@@ -1781,7 +1800,7 @@ static long ceph_fallocate(struct file *file, int mode,
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
+	ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
 	if (ret < 0)
 		goto unlock;
 
@@ -1810,16 +1829,15 @@ unlock:
  * src_ci.  Two attempts are made to obtain both caps, and an error is return if
  * this fails; zero is returned on success.
  */
-static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
-			  loff_t src_endoff, int *src_got,
-			  struct ceph_inode_info *dst_ci,
+static int get_rd_wr_caps(struct file *src_filp, int *src_got,
+			  struct file *dst_filp,
 			  loff_t dst_endoff, int *dst_got)
 {
 	int ret = 0;
 	bool retrying = false;
 
 retry_caps:
-	ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
+	ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
 			    dst_endoff, dst_got, NULL);
 	if (ret < 0)
 		return ret;
@@ -1829,24 +1847,24 @@ retry_caps:
 	 * we would risk a deadlock by using ceph_get_caps.  Thus, we'll do some
 	 * retry dance instead to try to get both capabilities.
 	 */
-	ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
+	ret = ceph_try_get_caps(file_inode(src_filp),
+				CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
 				false, src_got);
 	if (ret <= 0) {
 		/* Start by dropping dst_ci caps and getting src_ci caps */
-		ceph_put_cap_refs(dst_ci, *dst_got);
+		ceph_put_cap_refs(ceph_inode(file_inode(dst_filp)), *dst_got);
 		if (retrying) {
 			if (!ret)
 				/* ceph_try_get_caps masks EAGAIN */
 				ret = -EAGAIN;
 			return ret;
 		}
-		ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
-				    CEPH_CAP_FILE_SHARED, src_endoff,
-				    src_got, NULL);
+		ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD,
+				    CEPH_CAP_FILE_SHARED, -1, src_got, NULL);
 		if (ret < 0)
 			return ret;
 		/*... drop src_ci caps too, and retry */
-		ceph_put_cap_refs(src_ci, *src_got);
+		ceph_put_cap_refs(ceph_inode(file_inode(src_filp)), *src_got);
 		retrying = true;
 		goto retry_caps;
 	}
@@ -1904,6 +1922,7 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
 	struct ceph_inode_info *src_ci = ceph_inode(src_inode);
 	struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
 	struct ceph_cap_flush *prealloc_cf;
+	struct ceph_fs_client *src_fsc = ceph_inode_to_client(src_inode);
 	struct ceph_object_locator src_oloc, dst_oloc;
 	struct ceph_object_id src_oid, dst_oid;
 	loff_t endoff = 0, size;
@@ -1913,10 +1932,16 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
 	int src_got = 0, dst_got = 0, err, dirty;
 	bool do_final_copy = false;
 
-	if (src_inode == dst_inode)
-		return -EINVAL;
-	if (src_inode->i_sb != dst_inode->i_sb)
-		return -EXDEV;
+	if (src_inode->i_sb != dst_inode->i_sb) {
+		struct ceph_fs_client *dst_fsc = ceph_inode_to_client(dst_inode);
+
+		if (ceph_fsid_compare(&src_fsc->client->fsid,
+				      &dst_fsc->client->fsid)) {
+			dout("Copying files across clusters: src: %pU dst: %pU\n",
+			     &src_fsc->client->fsid, &dst_fsc->client->fsid);
+			return -EXDEV;
+		}
+	}
 	if (ceph_snap(dst_inode) != CEPH_NOSNAP)
 		return -EROFS;
 
@@ -1928,7 +1953,7 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
 	 * efficient).
 	 */
 
-	if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM))
+	if (ceph_test_mount_opt(src_fsc, NOCOPYFROM))
 		return -EOPNOTSUPP;
 
 	if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
@@ -1960,8 +1985,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
 	 * clients may have dirty data in their caches.  And OSDs know nothing
 	 * about caps, so they can't safely do the remote object copies.
 	 */
-	err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
-			     dst_ci, (dst_off + len), &dst_got);
+	err = get_rd_wr_caps(src_file, &src_got,
+			     dst_file, (dst_off + len), &dst_got);
 	if (err < 0) {
 		dout("get_rd_wr_caps returned %d\n", err);
 		ret = -EOPNOTSUPP;
@@ -2018,9 +2043,8 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
 			goto out;
 		}
 		len -= ret;
-		err = get_rd_wr_caps(src_ci, (src_off + len),
-				     &src_got, dst_ci,
-				     (dst_off + len), &dst_got);
+		err = get_rd_wr_caps(src_file, &src_got,
+				     dst_file, (dst_off + len), &dst_got);
 		if (err < 0)
 			goto out;
 		err = is_file_size_ok(src_inode, dst_inode,
@@ -2044,7 +2068,7 @@ static ssize_t __ceph_copy_file_range(struct file *src_file, loff_t src_off,
 				dst_ci->i_vino.ino, dst_objnum);
 		/* Do an object remote copy */
 		err = ceph_osdc_copy_from(
-			&ceph_inode_to_client(src_inode)->client->osdc,
+			&src_fsc->client->osdc,
 			src_ci->i_vino.snap, 0,
 			&src_oid, &src_oloc,
 			CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 18500edefc56..9f135624ae47 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -515,6 +515,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 
 	ceph_fscache_inode_init(ci);
 
+	ci->i_meta_err = 0;
+
 	return &ci->vfs_inode;
 }
 
@@ -801,7 +803,12 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 
 	/* update inode */
 	inode->i_rdev = le32_to_cpu(info->rdev);
-	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+	/* directories have fl_stripe_unit set to zero */
+	if (le32_to_cpu(info->layout.fl_stripe_unit))
+		inode->i_blkbits =
+			fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+	else
+		inode->i_blkbits = CEPH_BLOCK_SHIFT;
 
 	__ceph_update_quota(ci, iinfo->max_bytes, iinfo->max_files);
 
@@ -1982,7 +1989,7 @@ static const struct inode_operations ceph_symlink_iops = {
 int __ceph_setattr(struct inode *inode, struct iattr *attr)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	const unsigned int ia_valid = attr->ia_valid;
+	unsigned int ia_valid = attr->ia_valid;
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_cap_flush *prealloc_cf;
@@ -2087,6 +2094,26 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
 				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
 		}
 	}
+	if (ia_valid & ATTR_SIZE) {
+		dout("setattr %p size %lld -> %lld\n", inode,
+		     inode->i_size, attr->ia_size);
+		if ((issued & CEPH_CAP_FILE_EXCL) &&
+		    attr->ia_size > inode->i_size) {
+			i_size_write(inode, attr->ia_size);
+			inode->i_blocks = calc_inode_blocks(attr->ia_size);
+			ci->i_reported_size = attr->ia_size;
+			dirtied |= CEPH_CAP_FILE_EXCL;
+			ia_valid |= ATTR_MTIME;
+		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
+			   attr->ia_size != inode->i_size) {
+			req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
+			req->r_args.setattr.old_size =
+				cpu_to_le64(inode->i_size);
+			mask |= CEPH_SETATTR_SIZE;
+			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
+		}
+	}
 	if (ia_valid & ATTR_MTIME) {
 		dout("setattr %p mtime %lld.%ld -> %lld.%ld\n", inode,
 		     inode->i_mtime.tv_sec, inode->i_mtime.tv_nsec,
@@ -2109,25 +2136,6 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
 				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
 		}
 	}
-	if (ia_valid & ATTR_SIZE) {
-		dout("setattr %p size %lld -> %lld\n", inode,
-		     inode->i_size, attr->ia_size);
-		if ((issued & CEPH_CAP_FILE_EXCL) &&
-		    attr->ia_size > inode->i_size) {
-			i_size_write(inode, attr->ia_size);
-			inode->i_blocks = calc_inode_blocks(attr->ia_size);
-			ci->i_reported_size = attr->ia_size;
-			dirtied |= CEPH_CAP_FILE_EXCL;
-		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
-			   attr->ia_size != inode->i_size) {
-			req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
-			req->r_args.setattr.old_size =
-				cpu_to_le64(inode->i_size);
-			mask |= CEPH_SETATTR_SIZE;
-			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
-				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
-		}
-	}
 
 	/* these do nothing */
 	if (ia_valid & ATTR_CTIME) {
diff --git a/fs/ceph/io.c b/fs/ceph/io.c
new file mode 100644
index 000000000000..97602ea92ff4
--- /dev/null
+++ b/fs/ceph/io.c
@@ -0,0 +1,163 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2016 Trond Myklebust
+ * Copyright (c) 2019 Jeff Layton
+ *
+ * I/O and data path helper functionality.
+ *
+ * Heavily borrowed from equivalent code in fs/nfs/io.c
+ */
+
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/rwsem.h>
+#include <linux/fs.h>
+
+#include "super.h"
+#include "io.h"
+
+/* Call with exclusively locked inode->i_rwsem */
+static void ceph_block_o_direct(struct ceph_inode_info *ci, struct inode *inode)
+{
+	lockdep_assert_held_write(&inode->i_rwsem);
+
+	if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT) {
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_ceph_flags &= ~CEPH_I_ODIRECT;
+		spin_unlock(&ci->i_ceph_lock);
+		inode_dio_wait(inode);
+	}
+}
+
+/**
+ * ceph_start_io_read - declare the file is being used for buffered reads
+ * @inode: file inode
+ *
+ * Declare that a buffered read operation is about to start, and ensure
+ * that we block all direct I/O.
+ * On exit, the function ensures that the CEPH_I_ODIRECT flag is unset,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that buffered read operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas direct I/O
+ * operations need to wait to grab an exclusive lock in order to set
+ * CEPH_I_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. the reads.
+ */
+void
+ceph_start_io_read(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+
+	/* Be an optimist! */
+	down_read(&inode->i_rwsem);
+	if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT))
+		return;
+	up_read(&inode->i_rwsem);
+	/* Slow path.... */
+	down_write(&inode->i_rwsem);
+	ceph_block_o_direct(ci, inode);
+	downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * ceph_end_io_read - declare that the buffered read operation is done
+ * @inode: file inode
+ *
+ * Declare that a buffered read operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_read(struct inode *inode)
+{
+	up_read(&inode->i_rwsem);
+}
+
+/**
+ * ceph_start_io_write - declare the file is being used for buffered writes
+ * @inode: file inode
+ *
+ * Declare that a buffered write operation is about to start, and ensure
+ * that we block all direct I/O.
+ */
+void
+ceph_start_io_write(struct inode *inode)
+{
+	down_write(&inode->i_rwsem);
+	ceph_block_o_direct(ceph_inode(inode), inode);
+}
+
+/**
+ * ceph_end_io_write - declare that the buffered write operation is done
+ * @inode: file inode
+ *
+ * Declare that a buffered write operation is done, and release the
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_write(struct inode *inode)
+{
+	up_write(&inode->i_rwsem);
+}
+
+/* Call with exclusively locked inode->i_rwsem */
+static void ceph_block_buffered(struct ceph_inode_info *ci, struct inode *inode)
+{
+	lockdep_assert_held_write(&inode->i_rwsem);
+
+	if (!(READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT)) {
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_ceph_flags |= CEPH_I_ODIRECT;
+		spin_unlock(&ci->i_ceph_lock);
+		/* FIXME: unmap_mapping_range? */
+		filemap_write_and_wait(inode->i_mapping);
+	}
+}
+
+/**
+ * ceph_end_io_direct - declare the file is being used for direct i/o
+ * @inode: file inode
+ *
+ * Declare that a direct I/O operation is about to start, and ensure
+ * that we block all buffered I/O.
+ * On exit, the function ensures that the CEPH_I_ODIRECT flag is set,
+ * and holds a shared lock on inode->i_rwsem to ensure that the flag
+ * cannot be changed.
+ * In practice, this means that direct I/O operations are allowed to
+ * execute in parallel, thanks to the shared lock, whereas buffered I/O
+ * operations need to wait to grab an exclusive lock in order to clear
+ * CEPH_I_ODIRECT.
+ * Note that buffered writes and truncates both take a write lock on
+ * inode->i_rwsem, meaning that those are serialised w.r.t. O_DIRECT.
+ */
+void
+ceph_start_io_direct(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+
+	/* Be an optimist! */
+	down_read(&inode->i_rwsem);
+	if (READ_ONCE(ci->i_ceph_flags) & CEPH_I_ODIRECT)
+		return;
+	up_read(&inode->i_rwsem);
+	/* Slow path.... */
+	down_write(&inode->i_rwsem);
+	ceph_block_buffered(ci, inode);
+	downgrade_write(&inode->i_rwsem);
+}
+
+/**
+ * ceph_end_io_direct - declare that the direct i/o operation is done
+ * @inode: file inode
+ *
+ * Declare that a direct I/O operation is done, and release the shared
+ * lock on inode->i_rwsem.
+ */
+void
+ceph_end_io_direct(struct inode *inode)
+{
+	up_read(&inode->i_rwsem);
+}
diff --git a/fs/ceph/io.h b/fs/ceph/io.h
new file mode 100644
index 000000000000..fa594cd77348
--- /dev/null
+++ b/fs/ceph/io.h
@@ -0,0 +1,12 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef _FS_CEPH_IO_H
+#define _FS_CEPH_IO_H
+
+void ceph_start_io_read(struct inode *inode);
+void ceph_end_io_read(struct inode *inode);
+void ceph_start_io_write(struct inode *inode);
+void ceph_end_io_write(struct inode *inode);
+void ceph_start_io_direct(struct inode *inode);
+void ceph_end_io_direct(struct inode *inode);
+
+#endif /* FS_CEPH_IO_H */
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index 5083e238ad15..544e9e85b120 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -32,14 +32,18 @@ void __init ceph_flock_init(void)
 
 static void ceph_fl_copy_lock(struct file_lock *dst, struct file_lock *src)
 {
-	struct inode *inode = file_inode(src->fl_file);
+	struct ceph_file_info *fi = dst->fl_file->private_data;
+	struct inode *inode = file_inode(dst->fl_file);
 	atomic_inc(&ceph_inode(inode)->i_filelock_ref);
+	atomic_inc(&fi->num_locks);
 }
 
 static void ceph_fl_release_lock(struct file_lock *fl)
 {
+	struct ceph_file_info *fi = fl->fl_file->private_data;
 	struct inode *inode = file_inode(fl->fl_file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	atomic_dec(&fi->num_locks);
 	if (atomic_dec_and_test(&ci->i_filelock_ref)) {
 		/* clear error when all locks are released */
 		spin_lock(&ci->i_ceph_lock);
@@ -73,7 +77,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
 		 * window. Caller function will decrease the counter.
 		 */
 		fl->fl_ops = &ceph_fl_lock_ops;
-		atomic_inc(&ceph_inode(inode)->i_filelock_ref);
+		fl->fl_ops->fl_copy_lock(fl, NULL);
 	}
 
 	if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 920e9f048bd8..a8a8f84f3bbf 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -639,7 +639,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	s->s_renew_seq = 0;
 	INIT_LIST_HEAD(&s->s_caps);
 	s->s_nr_caps = 0;
-	s->s_trim_caps = 0;
 	refcount_set(&s->s_ref, 1);
 	INIT_LIST_HEAD(&s->s_waiting);
 	INIT_LIST_HEAD(&s->s_unsafe);
@@ -1270,6 +1269,7 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
 {
 	struct ceph_mds_request *req;
 	struct rb_node *p;
+	struct ceph_inode_info *ci;
 
 	dout("cleanup_session_requests mds%d\n", session->s_mds);
 	mutex_lock(&mdsc->mutex);
@@ -1278,6 +1278,16 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
 				       struct ceph_mds_request, r_unsafe_item);
 		pr_warn_ratelimited(" dropping unsafe request %llu\n",
 				    req->r_tid);
+		if (req->r_target_inode) {
+			/* dropping unsafe change of inode's attributes */
+			ci = ceph_inode(req->r_target_inode);
+			errseq_set(&ci->i_meta_err, -EIO);
+		}
+		if (req->r_unsafe_dir) {
+			/* dropping unsafe directory operation */
+			ci = ceph_inode(req->r_unsafe_dir);
+			errseq_set(&ci->i_meta_err, -EIO);
+		}
 		__unregister_request(mdsc, req);
 	}
 	/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1370,7 +1380,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	LIST_HEAD(to_remove);
-	bool drop = false;
+	bool dirty_dropped = false;
 	bool invalidate = false;
 
 	dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1383,9 +1393,12 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 		struct ceph_cap_flush *cf;
 		struct ceph_mds_client *mdsc = fsc->mdsc;
 
-		if (ci->i_wrbuffer_ref > 0 &&
-		    READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
-			invalidate = true;
+		if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+			if (inode->i_data.nrpages > 0)
+				invalidate = true;
+			if (ci->i_wrbuffer_ref > 0)
+				mapping_set_error(&inode->i_data, -EIO);
+		}
 
 		while (!list_empty(&ci->i_cap_flush_list)) {
 			cf = list_first_entry(&ci->i_cap_flush_list,
@@ -1405,7 +1418,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 				inode, ceph_ino(inode));
 			ci->i_dirty_caps = 0;
 			list_del_init(&ci->i_dirty_item);
-			drop = true;
+			dirty_dropped = true;
 		}
 		if (!list_empty(&ci->i_flushing_item)) {
 			pr_warn_ratelimited(
@@ -1415,10 +1428,22 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			ci->i_flushing_caps = 0;
 			list_del_init(&ci->i_flushing_item);
 			mdsc->num_cap_flushing--;
-			drop = true;
+			dirty_dropped = true;
 		}
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		if (dirty_dropped) {
+			errseq_set(&ci->i_meta_err, -EIO);
+
+			if (ci->i_wrbuffer_ref_head == 0 &&
+			    ci->i_wr_ref == 0 &&
+			    ci->i_dirty_caps == 0 &&
+			    ci->i_flushing_caps == 0) {
+				ceph_put_snap_context(ci->i_head_snapc);
+				ci->i_head_snapc = NULL;
+			}
+		}
+
 		if (atomic_read(&ci->i_filelock_ref) > 0) {
 			/* make further file lock syscall return -EIO */
 			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
@@ -1430,15 +1455,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
 			ci->i_prealloc_cap_flush = NULL;
 		}
-
-               if (drop &&
-                  ci->i_wrbuffer_ref_head == 0 &&
-                  ci->i_wr_ref == 0 &&
-                  ci->i_dirty_caps == 0 &&
-                  ci->i_flushing_caps == 0) {
-                      ceph_put_snap_context(ci->i_head_snapc);
-                      ci->i_head_snapc = NULL;
-               }
 	}
 	spin_unlock(&ci->i_ceph_lock);
 	while (!list_empty(&to_remove)) {
@@ -1452,7 +1468,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	wake_up_all(&ci->i_cap_wq);
 	if (invalidate)
 		ceph_queue_invalidate(inode);
-	if (drop)
+	if (dirty_dropped)
 		iput(inode);
 	return 0;
 }
@@ -1705,11 +1721,11 @@ out:
  */
 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 {
-	struct ceph_mds_session *session = arg;
+	int *remaining = arg;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int used, wanted, oissued, mine;
 
-	if (session->s_trim_caps <= 0)
+	if (*remaining <= 0)
 		return -1;
 
 	spin_lock(&ci->i_ceph_lock);
@@ -1746,7 +1762,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 	if (oissued) {
 		/* we aren't the only cap.. just remove us */
 		__ceph_remove_cap(cap, true);
-		session->s_trim_caps--;
+		(*remaining)--;
 	} else {
 		struct dentry *dentry;
 		/* try dropping referring dentries */
@@ -1758,7 +1774,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 			d_prune_aliases(inode);
 			count = atomic_read(&inode->i_count);
 			if (count == 1)
-				session->s_trim_caps--;
+				(*remaining)--;
 			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
 			     inode, cap, count);
 		} else {
@@ -1784,12 +1800,12 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
 	if (trim_caps > 0) {
-		session->s_trim_caps = trim_caps;
-		ceph_iterate_session_caps(session, trim_caps_cb, session);
+		int remaining = trim_caps;
+
+		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
 		     session->s_mds, session->s_nr_caps, max_caps,
-			trim_caps - session->s_trim_caps);
-		session->s_trim_caps = 0;
+			trim_caps - remaining);
 	}
 
 	ceph_flush_cap_releases(mdsc, session);
@@ -3015,18 +3031,23 @@ bad:
 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
 }
 
-static int __decode_and_drop_session_metadata(void **p, void *end)
+static int __decode_session_metadata(void **p, void *end,
+				     bool *blacklisted)
 {
 	/* map<string,string> */
 	u32 n;
+	bool err_str;
 	ceph_decode_32_safe(p, end, n, bad);
 	while (n-- > 0) {
 		u32 len;
 		ceph_decode_32_safe(p, end, len, bad);
 		ceph_decode_need(p, end, len, bad);
+		err_str = !strncmp(*p, "error_string", len);
 		*p += len;
 		ceph_decode_32_safe(p, end, len, bad);
 		ceph_decode_need(p, end, len, bad);
+		if (err_str && strnstr(*p, "blacklisted", len))
+			*blacklisted = true;
 		*p += len;
 	}
 	return 0;
@@ -3050,6 +3071,7 @@ static void handle_session(struct ceph_mds_session *session,
 	u64 seq;
 	unsigned long features = 0;
 	int wake = 0;
+	bool blacklisted = false;
 
 	/* decode */
 	ceph_decode_need(&p, end, sizeof(*h), bad);
@@ -3062,7 +3084,7 @@ static void handle_session(struct ceph_mds_session *session,
 	if (msg_version >= 3) {
 		u32 len;
 		/* version >= 2, metadata */
-		if (__decode_and_drop_session_metadata(&p, end) < 0)
+		if (__decode_session_metadata(&p, end, &blacklisted) < 0)
 			goto bad;
 		/* version >= 3, feature bits */
 		ceph_decode_32_safe(&p, end, len, bad);
@@ -3149,6 +3171,8 @@ static void handle_session(struct ceph_mds_session *session,
 		session->s_state = CEPH_MDS_SESSION_REJECTED;
 		cleanup_session_requests(mdsc, session);
 		remove_session_caps(session);
+		if (blacklisted)
+			mdsc->fsc->blacklisted = true;
 		wake = 2; /* for good measure */
 		break;
 
@@ -3998,7 +4022,27 @@ static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
 	mutex_unlock(&mdsc->mutex);
 }
 
+static void maybe_recover_session(struct ceph_mds_client *mdsc)
+{
+	struct ceph_fs_client *fsc = mdsc->fsc;
+
+	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
+		return;
+
+	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
+		return;
+
+	if (!READ_ONCE(fsc->blacklisted))
+		return;
+
+	if (fsc->last_auto_reconnect &&
+	    time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
+		return;
 
+	pr_info("auto reconnect after blacklisted\n");
+	fsc->last_auto_reconnect = jiffies;
+	ceph_force_reconnect(fsc->sb);
+}
 
 /*
  * delayed work -- periodically trim expired leases, renew caps with mds
@@ -4044,7 +4088,9 @@ static void delayed_work(struct work_struct *work)
 				pr_info("mds%d hung\n", s->s_mds);
 			}
 		}
-		if (s->s_state < CEPH_MDS_SESSION_OPEN) {
+		if (s->s_state == CEPH_MDS_SESSION_NEW ||
+		    s->s_state == CEPH_MDS_SESSION_RESTARTING ||
+		    s->s_state == CEPH_MDS_SESSION_REJECTED) {
 			/* this mds is failed or recovering, just wait */
 			ceph_put_mds_session(s);
 			continue;
@@ -4072,6 +4118,8 @@ static void delayed_work(struct work_struct *work)
 
 	ceph_trim_snapid_map(mdsc);
 
+	maybe_recover_session(mdsc);
+
 	schedule_delayed(mdsc);
 }
 
@@ -4355,7 +4403,12 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
 		session = __ceph_lookup_mds_session(mdsc, mds);
 		if (!session)
 			continue;
+
+		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
+			__unregister_session(mdsc, session);
+		__wake_requests(mdsc, &session->s_waiting);
 		mutex_unlock(&mdsc->mutex);
+
 		mutex_lock(&session->s_mutex);
 		__close_session(mdsc, session);
 		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
@@ -4364,6 +4417,7 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
 		}
 		mutex_unlock(&session->s_mutex);
 		ceph_put_mds_session(session);
+
 		mutex_lock(&mdsc->mutex);
 		kick_requests(mdsc, mds);
 	}
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index f7c8603484fe..5cd131b41d84 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -148,9 +148,9 @@ enum {
 	CEPH_MDS_SESSION_OPENING = 2,
 	CEPH_MDS_SESSION_OPEN = 3,
 	CEPH_MDS_SESSION_HUNG = 4,
-	CEPH_MDS_SESSION_CLOSING = 5,
-	CEPH_MDS_SESSION_RESTARTING = 6,
-	CEPH_MDS_SESSION_RECONNECTING = 7,
+	CEPH_MDS_SESSION_RESTARTING = 5,
+	CEPH_MDS_SESSION_RECONNECTING = 6,
+	CEPH_MDS_SESSION_CLOSING = 7,
 	CEPH_MDS_SESSION_REJECTED = 8,
 };
 
@@ -176,7 +176,7 @@ struct ceph_mds_session {
 	spinlock_t        s_cap_lock;
 	struct list_head  s_caps;     /* all caps issued by this session */
 	struct ceph_cap  *s_cap_iterator;
-	int               s_nr_caps, s_trim_caps;
+	int               s_nr_caps;
 	int               s_num_cap_releases;
 	int		  s_cap_reconnect;
 	int		  s_readonly;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 377fafc76f20..edfd643a8205 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -143,6 +143,7 @@ enum {
 	Opt_snapdirname,
 	Opt_mds_namespace,
 	Opt_fscache_uniq,
+	Opt_recover_session,
 	Opt_last_string,
 	/* string args above */
 	Opt_dirstat,
@@ -184,6 +185,7 @@ static match_table_t fsopt_tokens = {
 	/* int args above */
 	{Opt_snapdirname, "snapdirname=%s"},
 	{Opt_mds_namespace, "mds_namespace=%s"},
+	{Opt_recover_session, "recover_session=%s"},
 	{Opt_fscache_uniq, "fsc=%s"},
 	/* string args above */
 	{Opt_dirstat, "dirstat"},
@@ -254,6 +256,17 @@ static int parse_fsopt_token(char *c, void *private)
 		if (!fsopt->mds_namespace)
 			return -ENOMEM;
 		break;
+	case Opt_recover_session:
+		if (!strncmp(argstr[0].from, "no",
+			     argstr[0].to - argstr[0].from)) {
+			fsopt->flags &= ~CEPH_MOUNT_OPT_CLEANRECOVER;
+		} else if (!strncmp(argstr[0].from, "clean",
+				    argstr[0].to - argstr[0].from)) {
+			fsopt->flags |= CEPH_MOUNT_OPT_CLEANRECOVER;
+		} else {
+			return -EINVAL;
+		}
+		break;
 	case Opt_fscache_uniq:
 		kfree(fsopt->fscache_uniq);
 		fsopt->fscache_uniq = kstrndup(argstr[0].from,
@@ -576,6 +589,10 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 
 	if (fsopt->mds_namespace)
 		seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
+
+	if (fsopt->flags & CEPH_MOUNT_OPT_CLEANRECOVER)
+		seq_show_option(m, "recover_session", "clean");
+
 	if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
 	if (fsopt->rsize != CEPH_MAX_READ_SIZE)
@@ -664,6 +681,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 
 	fsc->sb = NULL;
 	fsc->mount_state = CEPH_MOUNT_MOUNTING;
+	fsc->filp_gen = 1;
 
 	atomic_long_set(&fsc->writeback_count, 0);
 
@@ -713,6 +731,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
 {
 	dout("destroy_fs_client %p\n", fsc);
 
+	ceph_mdsc_destroy(fsc);
 	destroy_workqueue(fsc->inode_wq);
 	destroy_workqueue(fsc->cap_wq);
 
@@ -829,7 +848,7 @@ static void ceph_umount_begin(struct super_block *sb)
 	fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
 	ceph_osdc_abort_requests(&fsc->client->osdc, -EIO);
 	ceph_mdsc_force_umount(fsc->mdsc);
-	return;
+	fsc->filp_gen++; // invalidate open files
 }
 
 static int ceph_remount(struct super_block *sb, int *flags, char *data)
@@ -1089,7 +1108,6 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,
 	}
 
 	if (ceph_sb_to_client(sb) != fsc) {
-		ceph_mdsc_destroy(fsc);
 		destroy_fs_client(fsc);
 		fsc = ceph_sb_to_client(sb);
 		dout("get_sb got existing client %p\n", fsc);
@@ -1115,7 +1133,6 @@ out_splat:
 	goto out_final;
 
 out:
-	ceph_mdsc_destroy(fsc);
 	destroy_fs_client(fsc);
 out_final:
 	dout("ceph_mount fail %ld\n", PTR_ERR(res));
@@ -1139,8 +1156,6 @@ static void ceph_kill_sb(struct super_block *s)
 
 	ceph_fscache_unregister_fs(fsc);
 
-	ceph_mdsc_destroy(fsc);
-
 	destroy_fs_client(fsc);
 	free_anon_bdev(dev);
 }
@@ -1154,6 +1169,33 @@ static struct file_system_type ceph_fs_type = {
 };
 MODULE_ALIAS_FS("ceph");
 
+int ceph_force_reconnect(struct super_block *sb)
+{
+	struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
+	int err = 0;
+
+	ceph_umount_begin(sb);
+
+	/* Make sure all page caches get invalidated.
+	 * see remove_session_caps_cb() */
+	flush_workqueue(fsc->inode_wq);
+
+	/* In case that we were blacklisted. This also reset
+	 * all mon/osd connections */
+	ceph_reset_client_addr(fsc->client);
+
+	ceph_osdc_clear_abort_err(&fsc->client->osdc);
+
+	fsc->blacklisted = false;
+	fsc->mount_state = CEPH_MOUNT_MOUNTED;
+
+	if (sb->s_root) {
+		err = __ceph_do_getattr(d_inode(sb->s_root), NULL,
+					CEPH_STAT_CAP_INODE, true);
+	}
+	return err;
+}
+
 static int __init init_ceph(void)
 {
 	int ret = init_caches();
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 6b9f1ee7de85..f98d9247f9cb 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -16,6 +16,7 @@
 #include <linux/slab.h>
 #include <linux/posix_acl.h>
 #include <linux/refcount.h>
+#include <linux/security.h>
 
 #include <linux/ceph/libceph.h>
 
@@ -31,6 +32,7 @@
 #define CEPH_BLOCK_SHIFT   22  /* 4 MB */
 #define CEPH_BLOCK         (1 << CEPH_BLOCK_SHIFT)
 
+#define CEPH_MOUNT_OPT_CLEANRECOVER    (1<<1) /* auto reonnect (clean mode) after blacklisted */
 #define CEPH_MOUNT_OPT_DIRSTAT         (1<<4) /* `cat dirname` for stats */
 #define CEPH_MOUNT_OPT_RBYTES          (1<<5) /* dir st_bytes = rbytes */
 #define CEPH_MOUNT_OPT_NOASYNCREADDIR  (1<<7) /* no dcache readdir */
@@ -101,6 +103,11 @@ struct ceph_fs_client {
 	struct ceph_client *client;
 
 	unsigned long mount_state;
+
+	unsigned long last_auto_reconnect;
+	bool blacklisted;
+
+	u32 filp_gen;
 	loff_t max_file_size;
 
 	struct ceph_mds_client *mdsc;
@@ -395,6 +402,8 @@ struct ceph_inode_info {
 	struct fscache_cookie *fscache;
 	u32 i_fscache_gen;
 #endif
+	errseq_t i_meta_err;
+
 	struct inode vfs_inode; /* at end */
 };
 
@@ -499,17 +508,16 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_DIR_ORDERED	(1 << 0)  /* dentries in dir are ordered */
 #define CEPH_I_NODELAY		(1 << 1)  /* do not delay cap release */
 #define CEPH_I_FLUSH		(1 << 2)  /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH		(1 << 3)  /* do not flush dirty caps */
-#define CEPH_I_POOL_PERM	(1 << 4)  /* pool rd/wr bits are valid */
-#define CEPH_I_POOL_RD		(1 << 5)  /* can read from pool */
-#define CEPH_I_POOL_WR		(1 << 6)  /* can write to pool */
-#define CEPH_I_SEC_INITED	(1 << 7)  /* security initialized */
-#define CEPH_I_CAP_DROPPED	(1 << 8)  /* caps were forcibly dropped */
-#define CEPH_I_KICK_FLUSH	(1 << 9)  /* kick flushing caps */
-#define CEPH_I_FLUSH_SNAPS	(1 << 10) /* need flush snapss */
-#define CEPH_I_ERROR_WRITE	(1 << 11) /* have seen write errors */
-#define CEPH_I_ERROR_FILELOCK	(1 << 12) /* have seen file lock errors */
-
+#define CEPH_I_POOL_PERM	(1 << 3)  /* pool rd/wr bits are valid */
+#define CEPH_I_POOL_RD		(1 << 4)  /* can read from pool */
+#define CEPH_I_POOL_WR		(1 << 5)  /* can write to pool */
+#define CEPH_I_SEC_INITED	(1 << 6)  /* security initialized */
+#define CEPH_I_CAP_DROPPED	(1 << 7)  /* caps were forcibly dropped */
+#define CEPH_I_KICK_FLUSH	(1 << 8)  /* kick flushing caps */
+#define CEPH_I_FLUSH_SNAPS	(1 << 9)  /* need flush snapss */
+#define CEPH_I_ERROR_WRITE	(1 << 10) /* have seen write errors */
+#define CEPH_I_ERROR_FILELOCK	(1 << 11) /* have seen file lock errors */
+#define CEPH_I_ODIRECT		(1 << 12) /* inode in direct I/O mode */
 
 /*
  * Masks of ceph inode work.
@@ -703,6 +711,10 @@ struct ceph_file_info {
 
 	spinlock_t rw_contexts_lock;
 	struct list_head rw_contexts;
+
+	errseq_t meta_err;
+	u32 filp_gen;
+	atomic_t num_locks;
 };
 
 struct ceph_dir_file_info {
@@ -842,7 +854,8 @@ static inline int default_congestion_kb(void)
 }
 
 
-
+/* super.c */
+extern int ceph_force_reconnect(struct super_block *sb);
 /* snap.c */
 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
 					       u64 ino);
@@ -959,7 +972,10 @@ static inline bool ceph_security_xattr_wanted(struct inode *in)
 #ifdef CONFIG_CEPH_FS_SECURITY_LABEL
 extern int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
 				     struct ceph_acl_sec_ctx *ctx);
-extern void ceph_security_invalidate_secctx(struct inode *inode);
+static inline void ceph_security_invalidate_secctx(struct inode *inode)
+{
+	security_inode_invalidate_secctx(inode);
+}
 #else
 static inline int ceph_security_init_secctx(struct dentry *dentry, umode_t mode,
 					    struct ceph_acl_sec_ctx *ctx)
@@ -1039,7 +1055,6 @@ extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 				    struct ceph_mds_session *session);
 extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
 					     int mds);
-extern int ceph_get_cap_mds(struct inode *inode);
 extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
@@ -1058,9 +1073,9 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
 				      struct inode *dir,
 				      int mds, int drop, int unless);
 
-extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+extern int ceph_get_caps(struct file *filp, int need, int want,
 			 loff_t endoff, int *got, struct page **pinned_page);
-extern int ceph_try_get_caps(struct ceph_inode_info *ci,
+extern int ceph_try_get_caps(struct inode *inode,
 			     int need, int want, bool nonblock, int *got);
 
 /* for counting open files by mode */
@@ -1071,7 +1086,7 @@ extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
 extern const struct address_space_operations ceph_aops;
 extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
 extern int ceph_uninline_data(struct file *filp, struct page *locked_page);
-extern int ceph_pool_perm_check(struct ceph_inode_info *ci, int need);
+extern int ceph_pool_perm_check(struct inode *inode, int need);
 extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
 
 /* file.c */
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 939eab7aa219..cb18ee637cb7 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -20,7 +20,8 @@ static int __remove_xattr(struct ceph_inode_info *ci,
 
 static bool ceph_is_valid_xattr(const char *name)
 {
-	return !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
+	return !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) ||
+	       !strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN) ||
 	       !strncmp(name, XATTR_TRUSTED_PREFIX, XATTR_TRUSTED_PREFIX_LEN) ||
 	       !strncmp(name, XATTR_USER_PREFIX, XATTR_USER_PREFIX_LEN);
 }
@@ -892,7 +893,8 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 	memcpy(value, xattr->val, xattr->val_len);
 
 	if (current->journal_info &&
-	    !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
+	    !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN) &&
+	    security_ismaclabel(name + XATTR_SECURITY_PREFIX_LEN))
 		ci->i_ceph_flags |= CEPH_I_SEC_INITED;
 out:
 	spin_unlock(&ci->i_ceph_lock);
@@ -903,11 +905,9 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
 {
 	struct inode *inode = d_inode(dentry);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_vxattr *vxattrs = ceph_inode_vxattrs(inode);
 	bool len_only = (size == 0);
 	u32 namelen;
 	int err;
-	int i;
 
 	spin_lock(&ci->i_ceph_lock);
 	dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
@@ -936,33 +936,6 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
 		names = __copy_xattr_names(ci, names);
 		size -= namelen;
 	}
-
-
-	/* virtual xattr names, too */
-	if (vxattrs) {
-		for (i = 0; vxattrs[i].name; i++) {
-			size_t this_len;
-
-			if (vxattrs[i].flags & VXATTR_FLAG_HIDDEN)
-				continue;
-			if (vxattrs[i].exists_cb && !vxattrs[i].exists_cb(ci))
-				continue;
-
-			this_len = strlen(vxattrs[i].name) + 1;
-			namelen += this_len;
-			if (len_only)
-				continue;
-
-			if (this_len > size) {
-				err = -ERANGE;
-				goto out;
-			}
-
-			memcpy(names, vxattrs[i].name, this_len);
-			names += this_len;
-			size -= this_len;
-		}
-	}
 	err = namelen;
 out:
 	spin_unlock(&ci->i_ceph_lock);
@@ -1293,42 +1266,8 @@ out:
 		ceph_pagelist_release(pagelist);
 	return err;
 }
-
-void ceph_security_invalidate_secctx(struct inode *inode)
-{
-	security_inode_invalidate_secctx(inode);
-}
-
-static int ceph_xattr_set_security_label(const struct xattr_handler *handler,
-				    struct dentry *unused, struct inode *inode,
-				    const char *key, const void *buf,
-				    size_t buflen, int flags)
-{
-	if (security_ismaclabel(key)) {
-		const char *name = xattr_full_name(handler, key);
-		return __ceph_setxattr(inode, name, buf, buflen, flags);
-	}
-	return  -EOPNOTSUPP;
-}
-
-static int ceph_xattr_get_security_label(const struct xattr_handler *handler,
-				    struct dentry *unused, struct inode *inode,
-				    const char *key, void *buf, size_t buflen)
-{
-	if (security_ismaclabel(key)) {
-		const char *name = xattr_full_name(handler, key);
-		return __ceph_getxattr(inode, name, buf, buflen);
-	}
-	return  -EOPNOTSUPP;
-}
-
-static const struct xattr_handler ceph_security_label_handler = {
-	.prefix = XATTR_SECURITY_PREFIX,
-	.get    = ceph_xattr_get_security_label,
-	.set    = ceph_xattr_set_security_label,
-};
-#endif
-#endif
+#endif /* CONFIG_CEPH_FS_SECURITY_LABEL */
+#endif /* CONFIG_SECURITY */
 
 void ceph_release_acl_sec_ctx(struct ceph_acl_sec_ctx *as_ctx)
 {
@@ -1352,9 +1291,6 @@ const struct xattr_handler *ceph_xattr_handlers[] = {
 	&posix_acl_access_xattr_handler,
 	&posix_acl_default_xattr_handler,
 #endif
-#ifdef CONFIG_CEPH_FS_SECURITY_LABEL
-	&ceph_security_label_handler,
-#endif
 	&ceph_other_xattr_handler,
 	NULL,
 };