summary refs log tree commit diff
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2021-05-06 10:27:02 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2021-05-06 10:27:02 -0700
commit7ac86b3dca1b00f5391d346fdea3ac010d230667 (patch)
treee4d5af14f741ab75bd1b52b5245331b31cbd2ff2 /fs/ceph
parent682a8e2b41effcaf2e80697e395d47f77c91273f (diff)
parent3f1c6f2122fc780560f09735b6d1dbf39b44eb0f (diff)
downloadlinux-7ac86b3dca1b00f5391d346fdea3ac010d230667.tar.gz
Merge tag 'ceph-for-5.13-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "Notable items here are

   - a series to take advantage of David Howells' netfs helper library
     from Jeff

   - three new filesystem client metrics from Xiubo

   - ceph.dir.rsnaps vxattr from Yanhu

   - two auth-related fixes from myself, marked for stable.

  Interspersed is a smattering of assorted fixes and cleanups across the
  filesystem"

* tag 'ceph-for-5.13-rc1' of git://github.com/ceph/ceph-client: (24 commits)
  libceph: allow addrvecs with a single NONE/blank address
  libceph: don't set global_id until we get an auth ticket
  libceph: bump CephXAuthenticate encoding version
  ceph: don't allow access to MDS-private inodes
  ceph: fix up some bare fetches of i_size
  ceph: convert some PAGE_SIZE invocations to thp_size()
  ceph: support getting ceph.dir.rsnaps vxattr
  ceph: drop pinned_page parameter from ceph_get_caps
  ceph: fix inode leak on getattr error in __fh_to_dentry
  ceph: only check pool permissions for regular files
  ceph: send opened files/pinned caps/opened inodes metrics to MDS daemon
  ceph: avoid counting the same request twice or more
  ceph: rename the metric helpers
  ceph: fix kerneldoc copypasta over ceph_start_io_direct
  ceph: use attach/detach_page_private for tracking snap context
  ceph: don't use d_add in ceph_handle_snapdir
  ceph: don't clobber i_snap_caps on non-I_NEW inode
  ceph: fix fall-through warnings for Clang
  ceph: convert ceph_readpages to ceph_readahead
  ceph: convert ceph_write_begin to netfs_write_begin
  ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/Kconfig1
-rw-r--r--fs/ceph/addr.c626
-rw-r--r--fs/ceph/cache.c125
-rw-r--r--fs/ceph/cache.h101
-rw-r--r--fs/ceph/caps.c27
-rw-r--r--fs/ceph/debugfs.c12
-rw-r--r--fs/ceph/dir.c34
-rw-r--r--fs/ceph/export.c12
-rw-r--r--fs/ceph/file.c52
-rw-r--r--fs/ceph/inode.c36
-rw-r--r--fs/ceph/io.c2
-rw-r--r--fs/ceph/mds_client.c20
-rw-r--r--fs/ceph/mds_client.h1
-rw-r--r--fs/ceph/metric.c62
-rw-r--r--fs/ceph/metric.h56
-rw-r--r--fs/ceph/snap.c2
-rw-r--r--fs/ceph/super.h32
-rw-r--r--fs/ceph/xattr.c7
18 files changed, 524 insertions, 684 deletions
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 471e40156065..94df854147d3 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -6,6 +6,7 @@ config CEPH_FS
 	select LIBCRC32C
 	select CRYPTO_AES
 	select CRYPTO
+	select NETFS_SUPPORT
 	default n
 	help
 	  Choose Y or M here to include support for mounting the
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 26e66436f005..c1570fada3d8 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -12,6 +12,7 @@
 #include <linux/signal.h>
 #include <linux/iversion.h>
 #include <linux/ktime.h>
+#include <linux/netfs.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -61,6 +62,9 @@
 	(CONGESTION_ON_THRESH(congestion_kb) -				\
 	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
 
+static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
+					struct page *page, void **_fsdata);
+
 static inline struct ceph_snap_context *page_snap_context(struct page *page)
 {
 	if (PagePrivate(page))
@@ -124,8 +128,7 @@ static int ceph_set_page_dirty(struct page *page)
 	 * PagePrivate so that we get invalidatepage callback.
 	 */
 	BUG_ON(PagePrivate(page));
-	page->private = (unsigned long)snapc;
-	SetPagePrivate(page);
+	attach_page_private(page, snapc);
 
 	ret = __set_page_dirty_nobuffers(page);
 	WARN_ON(!PageLocked(page));
@@ -144,19 +147,19 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
 {
 	struct inode *inode;
 	struct ceph_inode_info *ci;
-	struct ceph_snap_context *snapc = page_snap_context(page);
+	struct ceph_snap_context *snapc;
+
+	wait_on_page_fscache(page);
 
 	inode = page->mapping->host;
 	ci = ceph_inode(inode);
 
-	if (offset != 0 || length != PAGE_SIZE) {
+	if (offset != 0 || length != thp_size(page)) {
 		dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
 		     inode, page, page->index, offset, length);
 		return;
 	}
 
-	ceph_invalidate_fscache_page(inode, page);
-
 	WARN_ON(!PageLocked(page));
 	if (!PagePrivate(page))
 		return;
@@ -164,333 +167,222 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
 	dout("%p invalidatepage %p idx %lu full dirty page\n",
 	     inode, page, page->index);
 
+	snapc = detach_page_private(page);
 	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 	ceph_put_snap_context(snapc);
-	page->private = 0;
-	ClearPagePrivate(page);
 }
 
-static int ceph_releasepage(struct page *page, gfp_t g)
+static int ceph_releasepage(struct page *page, gfp_t gfp)
 {
 	dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
 	     page, page->index, PageDirty(page) ? "" : "not ");
 
-	/* Can we release the page from the cache? */
-	if (!ceph_release_fscache_page(page, g))
-		return 0;
-
+	if (PageFsCache(page)) {
+		if (!(gfp & __GFP_DIRECT_RECLAIM) || !(gfp & __GFP_FS))
+			return 0;
+		wait_on_page_fscache(page);
+	}
 	return !PagePrivate(page);
 }
 
-/* read a single page, without unlocking it. */
-static int ceph_do_readpage(struct file *filp, struct page *page)
+static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
 {
-	struct inode *inode = file_inode(filp);
+	struct inode *inode = rreq->mapping->host;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	struct ceph_osd_request *req;
-	struct ceph_vino vino = ceph_vino(inode);
-	int err = 0;
-	u64 off = page_offset(page);
-	u64 len = PAGE_SIZE;
-
-	if (off >= i_size_read(inode)) {
-		zero_user_segment(page, 0, PAGE_SIZE);
-		SetPageUptodate(page);
-		return 0;
-	}
-
-	if (ci->i_inline_version != CEPH_INLINE_NONE) {
-		/*
-		 * Uptodate inline data should have been added
-		 * into page cache while getting Fcr caps.
-		 */
-		if (off == 0)
-			return -EINVAL;
-		zero_user_segment(page, 0, PAGE_SIZE);
-		SetPageUptodate(page);
-		return 0;
-	}
-
-	err = ceph_readpage_from_fscache(inode, page);
-	if (err == 0)
-		return -EINPROGRESS;
-
-	dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
-	     vino.ino, vino.snap, filp, off, len, page, page->index);
-	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 0, 1,
-				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL,
-				    ci->i_truncate_seq, ci->i_truncate_size,
-				    false);
-	if (IS_ERR(req))
-		return PTR_ERR(req);
+	struct ceph_file_layout *lo = &ci->i_layout;
+	u32 blockoff;
+	u64 blockno;
 
-	osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
+	/* Expand the start downward */
+	blockno = div_u64_rem(rreq->start, lo->stripe_unit, &blockoff);
+	rreq->start = blockno * lo->stripe_unit;
+	rreq->len += blockoff;
 
-	err = ceph_osdc_start_request(osdc, req, false);
-	if (!err)
-		err = ceph_osdc_wait_request(osdc, req);
-
-	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
-				 req->r_end_latency, err);
-
-	ceph_osdc_put_request(req);
-	dout("readpage result %d\n", err);
-
-	if (err == -ENOENT)
-		err = 0;
-	if (err < 0) {
-		ceph_fscache_readpage_cancel(inode, page);
-		if (err == -EBLOCKLISTED)
-			fsc->blocklisted = true;
-		goto out;
-	}
-	if (err < PAGE_SIZE)
-		/* zero fill remainder of page */
-		zero_user_segment(page, err, PAGE_SIZE);
-	else
-		flush_dcache_page(page);
-
-	SetPageUptodate(page);
-	ceph_readpage_to_fscache(inode, page);
-
-out:
-	return err < 0 ? err : 0;
+	/* Now, round up the length to the next block */
+	rreq->len = roundup(rreq->len, lo->stripe_unit);
 }
 
-static int ceph_readpage(struct file *filp, struct page *page)
+static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq)
 {
-	int r = ceph_do_readpage(filp, page);
-	if (r != -EINPROGRESS)
-		unlock_page(page);
-	else
-		r = 0;
-	return r;
+	struct inode *inode = subreq->rreq->mapping->host;
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	u64 objno, objoff;
+	u32 xlen;
+
+	/* Truncate the extent at the end of the current block */
+	ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
+				      &objno, &objoff, &xlen);
+	subreq->len = min(xlen, fsc->mount_options->rsize);
+	return true;
 }
 
-/*
- * Finish an async read(ahead) op.
- */
-static void finish_read(struct ceph_osd_request *req)
+static void finish_netfs_read(struct ceph_osd_request *req)
 {
-	struct inode *inode = req->r_inode;
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_osd_data *osd_data;
-	int rc = req->r_result <= 0 ? req->r_result : 0;
-	int bytes = req->r_result >= 0 ? req->r_result : 0;
+	struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
+	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+	struct netfs_read_subrequest *subreq = req->r_priv;
 	int num_pages;
-	int i;
+	int err = req->r_result;
 
-	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
-	if (rc == -EBLOCKLISTED)
-		ceph_inode_to_client(inode)->blocklisted = true;
+	ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
+				 req->r_end_latency, err);
 
-	/* unlock all pages, zeroing any data we didn't read */
-	osd_data = osd_req_op_extent_osd_data(req, 0);
-	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
-	num_pages = calc_pages_for((u64)osd_data->alignment,
-					(u64)osd_data->length);
-	for (i = 0; i < num_pages; i++) {
-		struct page *page = osd_data->pages[i];
-
-		if (rc < 0 && rc != -ENOENT) {
-			ceph_fscache_readpage_cancel(inode, page);
-			goto unlock;
-		}
-		if (bytes < (int)PAGE_SIZE) {
-			/* zero (remainder of) page */
-			int s = bytes < 0 ? 0 : bytes;
-			zero_user_segment(page, s, PAGE_SIZE);
-		}
- 		dout("finish_read %p uptodate %p idx %lu\n", inode, page,
-		     page->index);
-		flush_dcache_page(page);
-		SetPageUptodate(page);
-		ceph_readpage_to_fscache(inode, page);
-unlock:
-		unlock_page(page);
-		put_page(page);
-		bytes -= PAGE_SIZE;
-	}
+	dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
+	     subreq->len, i_size_read(req->r_inode));
 
-	ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
-				 req->r_end_latency, rc);
+	/* no object means success but no data */
+	if (err == -ENOENT)
+		err = 0;
+	else if (err == -EBLOCKLISTED)
+		fsc->blocklisted = true;
+
+	if (err >= 0 && err < subreq->len)
+		__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
+
+	netfs_subreq_terminated(subreq, err, true);
 
-	kfree(osd_data->pages);
+	num_pages = calc_pages_for(osd_data->alignment, osd_data->length);
+	ceph_put_page_vector(osd_data->pages, num_pages, false);
+	iput(req->r_inode);
 }
 
-/*
- * start an async read(ahead) operation.  return nr_pages we submitted
- * a read for on success, or negative error code.
- */
-static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
-		      struct list_head *page_list, int max)
+static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
 {
-	struct ceph_osd_client *osdc =
-		&ceph_inode_to_client(inode)->client->osdc;
+	struct netfs_read_request *rreq = subreq->rreq;
+	struct inode *inode = rreq->mapping->host;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct page *page = lru_to_page(page_list);
-	struct ceph_vino vino;
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_osd_request *req;
-	u64 off;
-	u64 len;
-	int i;
+	struct ceph_vino vino = ceph_vino(inode);
+	struct iov_iter iter;
 	struct page **pages;
-	pgoff_t next_index;
-	int nr_pages = 0;
-	int got = 0;
-	int ret = 0;
-
-	if (!rw_ctx) {
-		/* caller of readpages does not hold buffer and read caps
-		 * (fadvise, madvise and readahead cases) */
-		int want = CEPH_CAP_FILE_CACHE;
-		ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want,
-					true, &got);
-		if (ret < 0) {
-			dout("start_read %p, error getting cap\n", inode);
-		} else if (!(got & want)) {
-			dout("start_read %p, no cache cap\n", inode);
-			ret = 0;
-		}
-		if (ret <= 0) {
-			if (got)
-				ceph_put_cap_refs(ci, got);
-			while (!list_empty(page_list)) {
-				page = lru_to_page(page_list);
-				list_del(&page->lru);
-				put_page(page);
-			}
-			return ret;
-		}
-	}
-
-	off = (u64) page_offset(page);
+	size_t page_off;
+	int err = 0;
+	u64 len = subreq->len;
 
-	/* count pages */
-	next_index = page->index;
-	list_for_each_entry_reverse(page, page_list, lru) {
-		if (page->index != next_index)
-			break;
-		nr_pages++;
-		next_index++;
-		if (max && nr_pages == max)
-			break;
-	}
-	len = nr_pages << PAGE_SHIFT;
-	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
-	     off, len);
-	vino = ceph_vino(inode);
-	req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len,
-				    0, 1, CEPH_OSD_OP_READ,
-				    CEPH_OSD_FLAG_READ, NULL,
-				    ci->i_truncate_seq, ci->i_truncate_size,
-				    false);
+	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
+			0, 1, CEPH_OSD_OP_READ,
+			CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
+			NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
 	if (IS_ERR(req)) {
-		ret = PTR_ERR(req);
+		err = PTR_ERR(req);
+		req = NULL;
 		goto out;
 	}
 
-	/* build page vector */
-	nr_pages = calc_pages_for(0, len);
-	pages = kmalloc_array(nr_pages, sizeof(*pages), GFP_KERNEL);
-	if (!pages) {
-		ret = -ENOMEM;
-		goto out_put;
-	}
-	for (i = 0; i < nr_pages; ++i) {
-		page = list_entry(page_list->prev, struct page, lru);
-		BUG_ON(PageLocked(page));
-		list_del(&page->lru);
-
- 		dout("start_read %p adding %p idx %lu\n", inode, page,
-		     page->index);
-		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
-					  GFP_KERNEL)) {
-			ceph_fscache_uncache_page(inode, page);
-			put_page(page);
-			dout("start_read %p add_to_page_cache failed %p\n",
-			     inode, page);
-			nr_pages = i;
-			if (nr_pages > 0) {
-				len = nr_pages << PAGE_SHIFT;
-				osd_req_op_extent_update(req, 0, len);
-				break;
-			}
-			goto out_pages;
-		}
-		pages[i] = page;
+	dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
+	iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
+	err = iov_iter_get_pages_alloc(&iter, &pages, len, &page_off);
+	if (err < 0) {
+		dout("%s: iov_ter_get_pages_alloc returned %d\n", __func__, err);
+		goto out;
 	}
+
+	/* should always give us a page-aligned read */
+	WARN_ON_ONCE(page_off);
+	len = err;
+
 	osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
-	req->r_callback = finish_read;
+	req->r_callback = finish_netfs_read;
+	req->r_priv = subreq;
 	req->r_inode = inode;
+	ihold(inode);
 
-	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
-	ret = ceph_osdc_start_request(osdc, req, false);
-	if (ret < 0)
-		goto out_pages;
+	err = ceph_osdc_start_request(req->r_osdc, req, false);
+	if (err)
+		iput(inode);
+out:
 	ceph_osdc_put_request(req);
+	if (err)
+		netfs_subreq_terminated(subreq, err, false);
+	dout("%s: result %d\n", __func__, err);
+}
 
-	/* After adding locked pages to page cache, the inode holds cache cap.
-	 * So we can drop our cap refs. */
-	if (got)
-		ceph_put_cap_refs(ci, got);
+static void ceph_init_rreq(struct netfs_read_request *rreq, struct file *file)
+{
+}
 
-	return nr_pages;
+static void ceph_readahead_cleanup(struct address_space *mapping, void *priv)
+{
+	struct inode *inode = mapping->host;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int got = (uintptr_t)priv;
 
-out_pages:
-	for (i = 0; i < nr_pages; ++i) {
-		ceph_fscache_readpage_cancel(inode, pages[i]);
-		unlock_page(pages[i]);
-	}
-	ceph_put_page_vector(pages, nr_pages, false);
-out_put:
-	ceph_osdc_put_request(req);
-out:
 	if (got)
 		ceph_put_cap_refs(ci, got);
-	return ret;
 }
 
+const struct netfs_read_request_ops ceph_netfs_read_ops = {
+	.init_rreq		= ceph_init_rreq,
+	.is_cache_enabled	= ceph_is_cache_enabled,
+	.begin_cache_operation	= ceph_begin_cache_operation,
+	.issue_op		= ceph_netfs_issue_op,
+	.expand_readahead	= ceph_netfs_expand_readahead,
+	.clamp_length		= ceph_netfs_clamp_length,
+	.check_write_begin	= ceph_netfs_check_write_begin,
+	.cleanup		= ceph_readahead_cleanup,
+};
 
-/*
- * Read multiple pages.  Leave pages we don't read + unlock in page_list;
- * the caller (VM) cleans them up.
- */
-static int ceph_readpages(struct file *file, struct address_space *mapping,
-			  struct list_head *page_list, unsigned nr_pages)
+/* read a single page, without unlocking it. */
+static int ceph_readpage(struct file *file, struct page *page)
 {
 	struct inode *inode = file_inode(file);
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_file_info *fi = file->private_data;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_vino vino = ceph_vino(inode);
+	u64 off = page_offset(page);
+	u64 len = thp_size(page);
+
+	if (ci->i_inline_version != CEPH_INLINE_NONE) {
+		/*
+		 * Uptodate inline data should have been added
+		 * into page cache while getting Fcr caps.
+		 */
+		if (off == 0) {
+			unlock_page(page);
+			return -EINVAL;
+		}
+		zero_user_segment(page, 0, thp_size(page));
+		SetPageUptodate(page);
+		unlock_page(page);
+		return 0;
+	}
+
+	dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
+	     vino.ino, vino.snap, file, off, len, page, page->index);
+
+	return netfs_readpage(file, page, &ceph_netfs_read_ops, NULL);
+}
+
+static void ceph_readahead(struct readahead_control *ractl)
+{
+	struct inode *inode = file_inode(ractl->file);
+	struct ceph_file_info *fi = ractl->file->private_data;
 	struct ceph_rw_context *rw_ctx;
-	int rc = 0;
-	int max = 0;
+	int got = 0;
+	int ret = 0;
 
 	if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
-		return -EINVAL;
+		return;
 
-	rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
-					 &nr_pages);
+	rw_ctx = ceph_find_rw_context(fi);
+	if (!rw_ctx) {
+		/*
+		 * readahead callers do not necessarily hold Fcb caps
+		 * (e.g. fadvise, madvise).
+		 */
+		int want = CEPH_CAP_FILE_CACHE;
 
-	if (rc == 0)
-		goto out;
+		ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got);
+		if (ret < 0)
+			dout("start_read %p, error getting cap\n", inode);
+		else if (!(got & want))
+			dout("start_read %p, no cache cap\n", inode);
 
-	rw_ctx = ceph_find_rw_context(fi);
-	max = fsc->mount_options->rsize >> PAGE_SHIFT;
-	dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
-	     inode, file, rw_ctx, nr_pages, max);
-	while (!list_empty(page_list)) {
-		rc = start_read(inode, rw_ctx, page_list, max);
-		if (rc < 0)
-			goto out;
+		if (ret <= 0)
+			return;
 	}
-out:
-	ceph_fscache_readpages_cancel(inode, page_list);
-
-	dout("readpages %p file %p ret %d\n", inode, file, rc);
-	return rc;
+	netfs_readahead(ractl, &ceph_netfs_read_ops, (void *)(uintptr_t)got);
 }
 
 struct ceph_writeback_ctl
@@ -585,8 +477,8 @@ static u64 get_writepages_data_length(struct inode *inode,
 		spin_unlock(&ci->i_ceph_lock);
 		WARN_ON(!found);
 	}
-	if (end > page_offset(page) + PAGE_SIZE)
-		end = page_offset(page) + PAGE_SIZE;
+	if (end > page_offset(page) + thp_size(page))
+		end = page_offset(page) + thp_size(page);
 	return end > start ? end - start : 0;
 }
 
@@ -604,7 +496,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	struct ceph_snap_context *snapc, *oldest;
 	loff_t page_off = page_offset(page);
 	int err;
-	loff_t len = PAGE_SIZE;
+	loff_t len = thp_size(page);
 	struct ceph_writeback_ctl ceph_wbc;
 	struct ceph_osd_client *osdc = &fsc->client->osdc;
 	struct ceph_osd_request *req;
@@ -632,7 +524,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	/* is this a partial page at end of file? */
 	if (page_off >= ceph_wbc.i_size) {
 		dout("%p page eof %llu\n", page, ceph_wbc.i_size);
-		page->mapping->a_ops->invalidatepage(page, 0, PAGE_SIZE);
+		page->mapping->a_ops->invalidatepage(page, 0, thp_size(page));
 		return 0;
 	}
 
@@ -658,7 +550,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	}
 
 	/* it may be a short write due to an object boundary */
-	WARN_ON_ONCE(len > PAGE_SIZE);
+	WARN_ON_ONCE(len > thp_size(page));
 	osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
 	dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
 
@@ -667,7 +559,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	if (!err)
 		err = ceph_osdc_wait_request(osdc, req);
 
-	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
 				  req->r_end_latency, err);
 
 	ceph_osdc_put_request(req);
@@ -695,8 +587,8 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 		dout("writepage cleaned page %p\n", page);
 		err = 0;  /* vfs expects us to return 0 */
 	}
-	page->private = 0;
-	ClearPagePrivate(page);
+	oldest = detach_page_private(page);
+	WARN_ON_ONCE(oldest != snapc);
 	end_page_writeback(page);
 	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 	ceph_put_snap_context(snapc);  /* page's reference */
@@ -755,7 +647,7 @@ static void writepages_finish(struct ceph_osd_request *req)
 		ceph_clear_error_write(ci);
 	}
 
-	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
 				  req->r_end_latency, rc);
 
 	/*
@@ -788,11 +680,9 @@ static void writepages_finish(struct ceph_osd_request *req)
 				clear_bdi_congested(inode_to_bdi(inode),
 						    BLK_RW_ASYNC);
 
-			ceph_put_snap_context(page_snap_context(page));
-			page->private = 0;
-			ClearPagePrivate(page);
-			dout("unlocking %p\n", page);
+			ceph_put_snap_context(detach_page_private(page));
 			end_page_writeback(page);
+			dout("unlocking %p\n", page);
 
 			if (remove_page)
 				generic_error_remove_page(inode->i_mapping,
@@ -949,7 +839,7 @@ get_more_pages:
 				    page_offset(page) >= i_size_read(inode)) &&
 				    clear_page_dirty_for_io(page))
 					mapping->a_ops->invalidatepage(page,
-								0, PAGE_SIZE);
+								0, thp_size(page));
 				unlock_page(page);
 				continue;
 			}
@@ -1038,7 +928,7 @@ get_more_pages:
 			pages[locked_pages++] = page;
 			pvec.pages[i] = NULL;
 
-			len += PAGE_SIZE;
+			len += thp_size(page);
 		}
 
 		/* did we get anything? */
@@ -1087,7 +977,7 @@ new_request:
 			BUG_ON(IS_ERR(req));
 		}
 		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
-			     PAGE_SIZE - offset);
+			     thp_size(page) - offset);
 
 		req->r_callback = writepages_finish;
 		req->r_inode = inode;
@@ -1117,7 +1007,7 @@ new_request:
 			}
 
 			set_page_writeback(pages[i]);
-			len += PAGE_SIZE;
+			len += thp_size(page);
 		}
 
 		if (ceph_wbc.size_stable) {
@@ -1126,7 +1016,7 @@ new_request:
 			/* writepages_finish() clears writeback pages
 			 * according to the data length, so make sure
 			 * data length covers all locked pages */
-			u64 min_len = len + 1 - PAGE_SIZE;
+			u64 min_len = len + 1 - thp_size(page);
 			len = get_writepages_data_length(inode, pages[i - 1],
 							 offset);
 			len = max(len, min_len);
@@ -1302,6 +1192,31 @@ ceph_find_incompatible(struct page *page)
 	return NULL;
 }
 
+static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
+					struct page *page, void **_fsdata)
+{
+	struct inode *inode = file_inode(file);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_snap_context *snapc;
+
+	snapc = ceph_find_incompatible(page);
+	if (snapc) {
+		int r;
+
+		unlock_page(page);
+		put_page(page);
+		if (IS_ERR(snapc))
+			return PTR_ERR(snapc);
+
+		ceph_queue_writeback(inode);
+		r = wait_event_killable(ci->i_cap_wq,
+					context_is_writeable_or_written(inode, snapc));
+		ceph_put_snap_context(snapc);
+		return r == 0 ? -EAGAIN : r;
+	}
+	return 0;
+}
+
 /*
  * We are only allowed to write into/dirty the page if the page is
  * clean, or already dirty within the same snap context.
@@ -1312,75 +1227,47 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 {
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_snap_context *snapc;
 	struct page *page = NULL;
 	pgoff_t index = pos >> PAGE_SHIFT;
-	int pos_in_page = pos & ~PAGE_MASK;
-	int r = 0;
+	int r;
 
-	dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len);
-
-	for (;;) {
+	/*
+	 * Uninlining should have already been done and everything updated, EXCEPT
+	 * for inline_version sent to the MDS.
+	 */
+	if (ci->i_inline_version != CEPH_INLINE_NONE) {
 		page = grab_cache_page_write_begin(mapping, index, flags);
-		if (!page) {
-			r = -ENOMEM;
-			break;
-		}
-
-		snapc = ceph_find_incompatible(page);
-		if (snapc) {
-			if (IS_ERR(snapc)) {
-				r = PTR_ERR(snapc);
-				break;
-			}
-			unlock_page(page);
-			put_page(page);
-			page = NULL;
-			ceph_queue_writeback(inode);
-			r = wait_event_killable(ci->i_cap_wq,
-						context_is_writeable_or_written(inode, snapc));
-			ceph_put_snap_context(snapc);
-			if (r != 0)
-				break;
-			continue;
-		}
-
-		if (PageUptodate(page)) {
-			dout(" page %p already uptodate\n", page);
-			break;
-		}
+		if (!page)
+			return -ENOMEM;
 
 		/*
-		 * In some cases we don't need to read at all:
-		 * - full page write
-		 * - write that lies completely beyond EOF
-		 * - write that covers the the page from start to EOF or beyond it
+		 * The inline_version on a new inode is set to 1. If that's the
+		 * case, then the page is brand new and isn't yet Uptodate.
 		 */
-		if ((pos_in_page == 0 && len == PAGE_SIZE) ||
-		    (pos >= i_size_read(inode)) ||
-		    (pos_in_page == 0 && (pos + len) >= i_size_read(inode))) {
-			zero_user_segments(page, 0, pos_in_page,
-					   pos_in_page + len, PAGE_SIZE);
-			break;
+		r = 0;
+		if (index == 0 && ci->i_inline_version != 1) {
+			if (!PageUptodate(page)) {
+				WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
+					  ci->i_inline_version);
+				r = -EINVAL;
+			}
+			goto out;
 		}
-
-		/*
-		 * We need to read it. If we get back -EINPROGRESS, then the page was
-		 * handed off to fscache and it will be unlocked when the read completes.
-		 * Refind the page in that case so we can reacquire the page lock. Otherwise
-		 * we got a hard error or the read was completed synchronously.
-		 */
-		r = ceph_do_readpage(file, page);
-		if (r != -EINPROGRESS)
-			break;
+		zero_user_segment(page, 0, thp_size(page));
+		SetPageUptodate(page);
+		goto out;
 	}
 
+	r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &page, NULL,
+			      &ceph_netfs_read_ops, NULL);
+out:
+	if (r == 0)
+		wait_on_page_fscache(page);
 	if (r < 0) {
-		if (page) {
-			unlock_page(page);
+		if (page)
 			put_page(page);
-		}
 	} else {
+		WARN_ON_ONCE(!PageLocked(page));
 		*pagep = page;
 	}
 	return r;
@@ -1438,7 +1325,7 @@ static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
 
 const struct address_space_operations ceph_aops = {
 	.readpage = ceph_readpage,
-	.readpages = ceph_readpages,
+	.readahead = ceph_readahead,
 	.writepage = ceph_writepage,
 	.writepages = ceph_writepages_start,
 	.write_begin = ceph_write_begin,
@@ -1470,7 +1357,6 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
 	struct inode *inode = file_inode(vma->vm_file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_file_info *fi = vma->vm_file->private_data;
-	struct page *pinned_page = NULL;
 	loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
 	int want, got, err;
 	sigset_t oldset;
@@ -1478,21 +1364,20 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
 
 	ceph_block_sigs(&oldset);
 
-	dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
-	     inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE);
+	dout("filemap_fault %p %llx.%llx %llu trying to get caps\n",
+	     inode, ceph_vinop(inode), off);
 	if (fi->fmode & CEPH_FILE_MODE_LAZY)
 		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 	else
 		want = CEPH_CAP_FILE_CACHE;
 
 	got = 0;
-	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1,
-			    &got, &pinned_page);
+	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got);
 	if (err < 0)
 		goto out_restore;
 
-	dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
-	     inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
+	dout("filemap_fault %p %llu got cap refs on %s\n",
+	     inode, off, ceph_cap_string(got));
 
 	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
 	    ci->i_inline_version == CEPH_INLINE_NONE) {
@@ -1500,14 +1385,11 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
 		ceph_add_rw_context(fi, &rw_ctx);
 		ret = filemap_fault(vmf);
 		ceph_del_rw_context(fi, &rw_ctx);
-		dout("filemap_fault %p %llu~%zd drop cap refs %s ret %x\n",
-			inode, off, (size_t)PAGE_SIZE,
-				ceph_cap_string(got), ret);
+		dout("filemap_fault %p %llu drop cap refs %s ret %x\n",
+		     inode, off, ceph_cap_string(got), ret);
 	} else
 		err = -EAGAIN;
 
-	if (pinned_page)
-		put_page(pinned_page);
 	ceph_put_cap_refs(ci, got);
 
 	if (err != -EAGAIN)
@@ -1542,8 +1424,8 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
 		vmf->page = page;
 		ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
 out_inline:
-		dout("filemap_fault %p %llu~%zd read inline data ret %x\n",
-		     inode, off, (size_t)PAGE_SIZE, ret);
+		dout("filemap_fault %p %llu read inline data ret %x\n",
+		     inode, off, ret);
 	}
 out_restore:
 	ceph_restore_sigs(&oldset);
@@ -1553,9 +1435,6 @@ out_restore:
 	return ret;
 }
 
-/*
- * Reuse write_begin here for simplicity.
- */
 static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 {
 	struct vm_area_struct *vma = vmf->vma;
@@ -1591,10 +1470,10 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 			goto out_free;
 	}
 
-	if (off + PAGE_SIZE <= size)
-		len = PAGE_SIZE;
+	if (off + thp_size(page) <= size)
+		len = thp_size(page);
 	else
-		len = size & ~PAGE_MASK;
+		len = offset_in_thp(page, size);
 
 	dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
 	     inode, ceph_vinop(inode), off, len, size);
@@ -1604,8 +1483,7 @@ static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
 		want = CEPH_CAP_FILE_BUFFER;
 
 	got = 0;
-	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len,
-			    &got, NULL);
+	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got);
 	if (err < 0)
 		goto out_free;
 
@@ -1832,7 +1710,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 	if (!err)
 		err = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
-	ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
 				  req->r_end_latency, err);
 
 out_put:
@@ -2057,6 +1935,10 @@ int ceph_pool_perm_check(struct inode *inode, int need)
 	s64 pool;
 	int ret, flags;
 
+	/* Only need to do this for regular files */
+	if (!S_ISREG(inode->i_mode))
+		return 0;
+
 	if (ci->i_vino.snap != CEPH_NOSNAP) {
 		/*
 		 * Pool permission check needs to write to the first object.
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 2f5cb6bc78e1..9cfadbb86568 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -173,7 +173,6 @@ void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
 
 	ci->fscache = NULL;
 
-	fscache_uncache_all_inode_pages(cookie, &ci->vfs_inode);
 	fscache_relinquish_cookie(cookie, &ci->i_vino, false);
 }
 
@@ -194,7 +193,6 @@ void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp)
 		dout("fscache_file_set_cookie %p %p disabling cache\n",
 		     inode, filp);
 		fscache_disable_cookie(ci->fscache, &ci->i_vino, false);
-		fscache_uncache_all_inode_pages(ci->fscache, inode);
 	} else {
 		fscache_enable_cookie(ci->fscache, &ci->i_vino, i_size_read(inode),
 				      ceph_fscache_can_enable, inode);
@@ -205,108 +203,6 @@ void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp)
 	}
 }
 
-static void ceph_readpage_from_fscache_complete(struct page *page, void *data, int error)
-{
-	if (!error)
-		SetPageUptodate(page);
-
-	unlock_page(page);
-}
-
-static inline bool cache_valid(struct ceph_inode_info *ci)
-{
-	return ci->i_fscache_gen == ci->i_rdcache_gen;
-}
-
-
-/* Atempt to read from the fscache,
- *
- * This function is called from the readpage_nounlock context. DO NOT attempt to
- * unlock the page here (or in the callback).
- */
-int ceph_readpage_from_fscache(struct inode *inode, struct page *page)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int ret;
-
-	if (!cache_valid(ci))
-		return -ENOBUFS;
-
-	ret = fscache_read_or_alloc_page(ci->fscache, page,
-					 ceph_readpage_from_fscache_complete, NULL,
-					 GFP_KERNEL);
-
-	switch (ret) {
-		case 0: /* Page found */
-			dout("page read submitted\n");
-			return 0;
-		case -ENOBUFS: /* Pages were not found, and can't be */
-		case -ENODATA: /* Pages were not found */
-			dout("page/inode not in cache\n");
-			return ret;
-		default:
-			dout("%s: unknown error ret = %i\n", __func__, ret);
-			return ret;
-	}
-}
-
-int ceph_readpages_from_fscache(struct inode *inode,
-				  struct address_space *mapping,
-				  struct list_head *pages,
-				  unsigned *nr_pages)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int ret;
-
-	if (!cache_valid(ci))
-		return -ENOBUFS;
-
-	ret = fscache_read_or_alloc_pages(ci->fscache, mapping, pages, nr_pages,
-					  ceph_readpage_from_fscache_complete,
-					  NULL, mapping_gfp_mask(mapping));
-
-	switch (ret) {
-		case 0: /* All pages found */
-			dout("all-page read submitted\n");
-			return 0;
-		case -ENOBUFS: /* Some pages were not found, and can't be */
-		case -ENODATA: /* some pages were not found */
-			dout("page/inode not in cache\n");
-			return ret;
-		default:
-			dout("%s: unknown error ret = %i\n", __func__, ret);
-			return ret;
-	}
-}
-
-void ceph_readpage_to_fscache(struct inode *inode, struct page *page)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int ret;
-
-	if (!PageFsCache(page))
-		return;
-
-	if (!cache_valid(ci))
-		return;
-
-	ret = fscache_write_page(ci->fscache, page, i_size_read(inode),
-				 GFP_KERNEL);
-	if (ret)
-		 fscache_uncache_page(ci->fscache, page);
-}
-
-void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-
-	if (!PageFsCache(page))
-		return;
-
-	fscache_wait_on_page_write(ci->fscache, page);
-	fscache_uncache_page(ci->fscache, page);
-}
-
 void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
 {
 	if (fscache_cookie_valid(fsc->fscache)) {
@@ -329,24 +225,3 @@ void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
 	}
 	fsc->fscache = NULL;
 }
-
-/*
- * caller should hold CEPH_CAP_FILE_{RD,CACHE}
- */
-void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci)
-{
-	if (cache_valid(ci))
-		return;
-
-	/* resue i_truncate_mutex. There should be no pending
-	 * truncate while the caller holds CEPH_CAP_FILE_RD */
-	mutex_lock(&ci->i_truncate_mutex);
-	if (!cache_valid(ci)) {
-		if (fscache_check_consistency(ci->fscache, &ci->i_vino))
-			fscache_invalidate(ci->fscache);
-		spin_lock(&ci->i_ceph_lock);
-		ci->i_fscache_gen = ci->i_rdcache_gen;
-		spin_unlock(&ci->i_ceph_lock);
-	}
-	mutex_unlock(&ci->i_truncate_mutex);
-}
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
index 89dbdd1eb14a..1409d6149281 100644
--- a/fs/ceph/cache.h
+++ b/fs/ceph/cache.h
@@ -9,6 +9,8 @@
 #ifndef _CEPH_CACHE_H
 #define _CEPH_CACHE_H
 
+#include <linux/netfs.h>
+
 #ifdef CONFIG_CEPH_FSCACHE
 
 extern struct fscache_netfs ceph_cache_netfs;
@@ -29,54 +31,37 @@ int ceph_readpages_from_fscache(struct inode *inode,
 				struct address_space *mapping,
 				struct list_head *pages,
 				unsigned *nr_pages);
-void ceph_readpage_to_fscache(struct inode *inode, struct page *page);
-void ceph_invalidate_fscache_page(struct inode* inode, struct page *page);
 
 static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
 {
 	ci->fscache = NULL;
-	ci->i_fscache_gen = 0;
 }
 
-static inline void ceph_fscache_invalidate(struct inode *inode)
+static inline struct fscache_cookie *ceph_fscache_cookie(struct ceph_inode_info *ci)
 {
-	fscache_invalidate(ceph_inode(inode)->fscache);
+	return ci->fscache;
 }
 
-static inline void ceph_fscache_uncache_page(struct inode *inode,
-					     struct page *page)
+static inline void ceph_fscache_invalidate(struct inode *inode)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	return fscache_uncache_page(ci->fscache, page);
+	fscache_invalidate(ceph_inode(inode)->fscache);
 }
 
-static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
+static inline bool ceph_is_cache_enabled(struct inode *inode)
 {
-	struct inode* inode = page->mapping->host;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	return fscache_maybe_release_page(ci->fscache, page, gfp);
-}
+	struct fscache_cookie *cookie = ceph_fscache_cookie(ceph_inode(inode));
 
-static inline void ceph_fscache_readpage_cancel(struct inode *inode,
-						struct page *page)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	if (fscache_cookie_valid(ci->fscache) && PageFsCache(page))
-		__fscache_uncache_page(ci->fscache, page);
+	if (!cookie)
+		return false;
+	return fscache_cookie_enabled(cookie);
 }
 
-static inline void ceph_fscache_readpages_cancel(struct inode *inode,
-						 struct list_head *pages)
+static inline int ceph_begin_cache_operation(struct netfs_read_request *rreq)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	return fscache_readpages_cancel(ci->fscache, pages);
-}
+	struct fscache_cookie *cookie = ceph_fscache_cookie(ceph_inode(rreq->inode));
 
-static inline void ceph_disable_fscache_readpage(struct ceph_inode_info *ci)
-{
-	ci->i_fscache_gen = ci->i_rdcache_gen - 1;
+	return fscache_begin_read_operation(rreq, cookie);
 }
-
 #else
 
 static inline int ceph_fscache_register(void)
@@ -102,6 +87,11 @@ static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
 {
 }
 
+static inline struct fscache_cookie *ceph_fscache_cookie(struct ceph_inode_info *ci)
+{
+	return NULL;
+}
+
 static inline void ceph_fscache_register_inode_cookie(struct inode *inode)
 {
 }
@@ -115,62 +105,19 @@ static inline void ceph_fscache_file_set_cookie(struct inode *inode,
 {
 }
 
-static inline void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci)
-{
-}
-
-static inline void ceph_fscache_uncache_page(struct inode *inode,
-					     struct page *pages)
-{
-}
-
-static inline int ceph_readpage_from_fscache(struct inode* inode,
-					     struct page *page)
-{
-	return -ENOBUFS;
-}
-
-static inline int ceph_readpages_from_fscache(struct inode *inode,
-					      struct address_space *mapping,
-					      struct list_head *pages,
-					      unsigned *nr_pages)
-{
-	return -ENOBUFS;
-}
-
-static inline void ceph_readpage_to_fscache(struct inode *inode,
-					    struct page *page)
-{
-}
-
 static inline void ceph_fscache_invalidate(struct inode *inode)
 {
 }
 
-static inline void ceph_invalidate_fscache_page(struct inode *inode,
-						struct page *page)
+static inline bool ceph_is_cache_enabled(struct inode *inode)
 {
+	return false;
 }
 
-static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
-{
-	return 1;
-}
-
-static inline void ceph_fscache_readpage_cancel(struct inode *inode,
-						struct page *page)
-{
-}
-
-static inline void ceph_fscache_readpages_cancel(struct inode *inode,
-						 struct list_head *pages)
-{
-}
-
-static inline void ceph_disable_fscache_readpage(struct ceph_inode_info *ci)
+static inline int ceph_begin_cache_operation(struct netfs_read_request *rreq)
 {
+	return -ENOBUFS;
 }
-
 #endif
 
-#endif
+#endif /* _CEPH_CACHE_H */
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3c03fa37cac4..a5e93b185515 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1390,7 +1390,7 @@ static void __prep_cap(struct cap_msg_args *arg, struct ceph_cap *cap,
 	arg->flush_tid = flush_tid;
 	arg->oldest_flush_tid = oldest_flush_tid;
 
-	arg->size = inode->i_size;
+	arg->size = i_size_read(inode);
 	ci->i_reported_size = arg->size;
 	arg->max_size = ci->i_wanted_max_size;
 	if (cap == ci->i_auth_cap) {
@@ -1867,6 +1867,7 @@ static int try_nonblocking_invalidate(struct inode *inode)
 	u32 invalidating_gen = ci->i_rdcache_gen;
 
 	spin_unlock(&ci->i_ceph_lock);
+	ceph_fscache_invalidate(inode);
 	invalidate_mapping_pages(&inode->i_data, 0, -1);
 	spin_lock(&ci->i_ceph_lock);
 
@@ -1884,7 +1885,7 @@ static int try_nonblocking_invalidate(struct inode *inode)
 
 bool __ceph_should_report_size(struct ceph_inode_info *ci)
 {
-	loff_t size = ci->vfs_inode.i_size;
+	loff_t size = i_size_read(&ci->vfs_inode);
 	/* mds will adjust max size according to the reported size */
 	if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
 		return false;
@@ -2730,10 +2731,6 @@ again:
 				*got = need | want;
 			else
 				*got = need;
-			if (S_ISREG(inode->i_mode) &&
-			    (need & CEPH_CAP_FILE_RD) &&
-			    !(*got & CEPH_CAP_FILE_CACHE))
-				ceph_disable_fscache_readpage(ci);
 			ceph_take_cap_refs(ci, *got, true);
 			ret = 1;
 		}
@@ -2858,8 +2855,7 @@ int ceph_try_get_caps(struct inode *inode, int need, int want,
  * due to a small max_size, make sure we check_max_size (and possibly
  * ask the mds) so we don't get hung up indefinitely.
  */
-int ceph_get_caps(struct file *filp, int need, int want,
-		  loff_t endoff, int *got, struct page **pinned_page)
+int ceph_get_caps(struct file *filp, int need, int want, loff_t endoff, int *got)
 {
 	struct ceph_file_info *fi = filp->private_data;
 	struct inode *inode = file_inode(filp);
@@ -2957,11 +2953,11 @@ int ceph_get_caps(struct file *filp, int need, int want,
 			struct page *page =
 				find_get_page(inode->i_mapping, 0);
 			if (page) {
-				if (PageUptodate(page)) {
-					*pinned_page = page;
-					break;
-				}
+				bool uptodate = PageUptodate(page);
+
 				put_page(page);
+				if (uptodate)
+					break;
 			}
 			/*
 			 * drop cap refs first because getattr while
@@ -2983,11 +2979,6 @@ int ceph_get_caps(struct file *filp, int need, int want,
 		}
 		break;
 	}
-
-	if (S_ISREG(ci->vfs_inode.i_mode) &&
-	    (_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
-		ceph_fscache_revalidate_cookie(ci);
-
 	*got = _got;
 	return 0;
 }
@@ -3308,7 +3299,7 @@ static void handle_cap_grant(struct inode *inode,
 	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
 	     inode, cap, session->s_mds, seq, ceph_cap_string(newcaps));
 	dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
-		inode->i_size);
+		i_size_read(inode));
 
 
 	/*
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 66989c880adb..425f3356332a 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -162,34 +162,34 @@ static int metric_show(struct seq_file *s, void *p)
 	seq_printf(s, "item          total       avg_lat(us)     min_lat(us)     max_lat(us)     stdev(us)\n");
 	seq_printf(s, "-----------------------------------------------------------------------------------\n");
 
-	spin_lock(&m->read_latency_lock);
+	spin_lock(&m->read_metric_lock);
 	total = m->total_reads;
 	sum = m->read_latency_sum;
 	avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
 	min = m->read_latency_min;
 	max = m->read_latency_max;
 	sq = m->read_latency_sq_sum;
-	spin_unlock(&m->read_latency_lock);
+	spin_unlock(&m->read_metric_lock);
 	CEPH_METRIC_SHOW("read", total, avg, min, max, sq);
 
-	spin_lock(&m->write_latency_lock);
+	spin_lock(&m->write_metric_lock);
 	total = m->total_writes;
 	sum = m->write_latency_sum;
 	avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
 	min = m->write_latency_min;
 	max = m->write_latency_max;
 	sq = m->write_latency_sq_sum;
-	spin_unlock(&m->write_latency_lock);
+	spin_unlock(&m->write_metric_lock);
 	CEPH_METRIC_SHOW("write", total, avg, min, max, sq);
 
-	spin_lock(&m->metadata_latency_lock);
+	spin_lock(&m->metadata_metric_lock);
 	total = m->total_metadatas;
 	sum = m->metadata_latency_sum;
 	avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
 	min = m->metadata_latency_min;
 	max = m->metadata_latency_max;
 	sq = m->metadata_latency_sq_sum;
-	spin_unlock(&m->metadata_latency_lock);
+	spin_unlock(&m->metadata_metric_lock);
 	CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq);
 
 	seq_printf(s, "\n");
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index f7a790ed62c4..5624fae7a603 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -631,10 +631,12 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 	switch (whence) {
 	case SEEK_CUR:
 		offset += file->f_pos;
+		break;
 	case SEEK_SET:
 		break;
 	case SEEK_END:
 		retval = -EOPNOTSUPP;
+		goto out;
 	default:
 		goto out;
 	}
@@ -665,8 +667,8 @@ out:
 /*
  * Handle lookups for the hidden .snap directory.
  */
-int ceph_handle_snapdir(struct ceph_mds_request *req,
-			struct dentry *dentry, int err)
+struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req,
+				   struct dentry *dentry, int err)
 {
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
 	struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */
@@ -674,18 +676,17 @@ int ceph_handle_snapdir(struct ceph_mds_request *req,
 	/* .snap dir? */
 	if (err == -ENOENT &&
 	    ceph_snap(parent) == CEPH_NOSNAP &&
-	    strcmp(dentry->d_name.name,
-		   fsc->mount_options->snapdir_name) == 0) {
+	    strcmp(dentry->d_name.name, fsc->mount_options->snapdir_name) == 0) {
+		struct dentry *res;
 		struct inode *inode = ceph_get_snapdir(parent);
-		if (IS_ERR(inode))
-			return PTR_ERR(inode);
-		dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n",
-		     dentry, dentry, inode);
-		BUG_ON(!d_unhashed(dentry));
-		d_add(dentry, inode);
-		err = 0;
+
+		res = d_splice_alias(inode, dentry);
+		dout("ENOENT on snapdir %p '%pd', linking to snapdir %p. Spliced dentry %p\n",
+		     dentry, dentry, inode, res);
+		if (res)
+			dentry = res;
 	}
-	return err;
+	return dentry;
 }
 
 /*
@@ -741,6 +742,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
 	struct ceph_mds_request *req;
+	struct dentry *res;
 	int op;
 	int mask;
 	int err;
@@ -791,7 +793,13 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 	req->r_parent = dir;
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
-	err = ceph_handle_snapdir(req, dentry, err);
+	res = ceph_handle_snapdir(req, dentry, err);
+	if (IS_ERR(res)) {
+		err = PTR_ERR(res);
+	} else {
+		dentry = res;
+		err = 0;
+	}
 	dentry = ceph_finish_lookup(req, dentry, err);
 	ceph_mdsc_put_request(req);  /* will dput(dentry) */
 	dout("lookup result=%p\n", dentry);
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index f22156ee7306..65540a4429b2 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -129,6 +129,10 @@ static struct inode *__lookup_inode(struct super_block *sb, u64 ino)
 
 	vino.ino = ino;
 	vino.snap = CEPH_NOSNAP;
+
+	if (ceph_vino_is_reserved(vino))
+		return ERR_PTR(-ESTALE);
+
 	inode = ceph_find_inode(sb, vino);
 	if (!inode) {
 		struct ceph_mds_request *req;
@@ -178,8 +182,10 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
 		return ERR_CAST(inode);
 	/* We need LINK caps to reliably check i_nlink */
 	err = ceph_do_getattr(inode, CEPH_CAP_LINK_SHARED, false);
-	if (err)
+	if (err) {
+		iput(inode);
 		return ERR_PTR(err);
+	}
 	/* -ESTALE if inode as been unlinked and no file is open */
 	if ((inode->i_nlink == 0) && (atomic_read(&inode->i_count) == 1)) {
 		iput(inode);
@@ -212,6 +218,10 @@ static struct dentry *__snapfh_to_dentry(struct super_block *sb,
 		vino.ino = sfh->ino;
 		vino.snap = sfh->snapid;
 	}
+
+	if (ceph_vino_is_reserved(vino))
+		return ERR_PTR(-ESTALE);
+
 	inode = ceph_find_inode(sb, vino);
 	if (inode)
 		return d_obtain_alias(inode);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 209535d5b8d3..77fc037d5beb 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -739,9 +739,12 @@ retry:
 	err = ceph_mdsc_do_request(mdsc,
 				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
 				   req);
-	err = ceph_handle_snapdir(req, dentry, err);
-	if (err)
+	dentry = ceph_handle_snapdir(req, dentry, err);
+	if (IS_ERR(dentry)) {
+		err = PTR_ERR(dentry);
 		goto out_req;
+	}
+	err = 0;
 
 	if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
@@ -892,7 +895,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 		if (!ret)
 			ret = ceph_osdc_wait_request(osdc, req);
 
-		ceph_update_read_latency(&fsc->mdsc->metric,
+		ceph_update_read_metrics(&fsc->mdsc->metric,
 					 req->r_start_latency,
 					 req->r_end_latency,
 					 ret);
@@ -1034,16 +1037,6 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 	dout("ceph_aio_complete_req %p rc %d bytes %u\n",
 	     inode, rc, osd_data->bvec_pos.iter.bi_size);
 
-	/* r_start_latency == 0 means the request was not submitted */
-	if (req->r_start_latency) {
-		if (aio_req->write)
-			ceph_update_write_latency(metric, req->r_start_latency,
-						  req->r_end_latency, rc);
-		else
-			ceph_update_read_latency(metric, req->r_start_latency,
-						 req->r_end_latency, rc);
-	}
-
 	if (rc == -EOLDSNAPC) {
 		struct ceph_aio_work *aio_work;
 		BUG_ON(!aio_req->write);
@@ -1086,6 +1079,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
 		}
 	}
 
+	/* r_start_latency == 0 means the request was not submitted */
+	if (req->r_start_latency) {
+		if (aio_req->write)
+			ceph_update_write_metrics(metric, req->r_start_latency,
+						  req->r_end_latency, rc);
+		else
+			ceph_update_read_metrics(metric, req->r_start_latency,
+						 req->r_end_latency, rc);
+	}
+
 	put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
 		  aio_req->should_dirty);
 	ceph_osdc_put_request(req);
@@ -1290,10 +1293,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
 		if (write)
-			ceph_update_write_latency(metric, req->r_start_latency,
+			ceph_update_write_metrics(metric, req->r_start_latency,
 						  req->r_end_latency, ret);
 		else
-			ceph_update_read_latency(metric, req->r_start_latency,
+			ceph_update_read_metrics(metric, req->r_start_latency,
 						 req->r_end_latency, ret);
 
 		size = i_size_read(inode);
@@ -1467,7 +1470,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 		if (!ret)
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
-		ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
+		ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
 					  req->r_end_latency, ret);
 out:
 		ceph_osdc_put_request(req);
@@ -1510,7 +1513,6 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
 	size_t len = iov_iter_count(to);
 	struct inode *inode = file_inode(filp);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct page *pinned_page = NULL;
 	bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
 	ssize_t ret;
 	int want, got = 0;
@@ -1529,8 +1531,7 @@ again:
 		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 	else
 		want = CEPH_CAP_FILE_CACHE;
-	ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1,
-			    &got, &pinned_page);
+	ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1, &got);
 	if (ret < 0) {
 		if (iocb->ki_flags & IOCB_DIRECT)
 			ceph_end_io_direct(inode);
@@ -1571,10 +1572,6 @@ again:
 
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
 	     inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
-	if (pinned_page) {
-		put_page(pinned_page);
-		pinned_page = NULL;
-	}
 	ceph_put_cap_refs(ci, got);
 
 	if (direct_lock)
@@ -1753,8 +1750,7 @@ retry_snap:
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 	got = 0;
-	err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count,
-			    &got, NULL);
+	err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count, &got);
 	if (err < 0)
 		goto out;
 
@@ -2083,7 +2079,7 @@ static long ceph_fallocate(struct file *file, int mode,
 	else
 		want = CEPH_CAP_FILE_BUFFER;
 
-	ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
+	ret = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, endoff, &got);
 	if (ret < 0)
 		goto unlock;
 
@@ -2121,7 +2117,7 @@ static int get_rd_wr_caps(struct file *src_filp, int *src_got,
 
 retry_caps:
 	ret = ceph_get_caps(dst_filp, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
-			    dst_endoff, dst_got, NULL);
+			    dst_endoff, dst_got);
 	if (ret < 0)
 		return ret;
 
@@ -2143,7 +2139,7 @@ retry_caps:
 			return ret;
 		}
 		ret = ceph_get_caps(src_filp, CEPH_CAP_FILE_RD,
-				    CEPH_CAP_FILE_SHARED, -1, src_got, NULL);
+				    CEPH_CAP_FILE_SHARED, -1, src_got);
 		if (ret < 0)
 			return ret;
 		/*... drop src_ci caps too, and retry */
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 689e3ffd29d7..e1c63adb196d 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -56,6 +56,9 @@ struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
 {
 	struct inode *inode;
 
+	if (ceph_vino_is_reserved(vino))
+		return ERR_PTR(-EREMOTEIO);
+
 	inode = iget5_locked(sb, (unsigned long)vino.ino, ceph_ino_compare,
 			     ceph_set_ino_cb, &vino);
 	if (!inode)
@@ -99,14 +102,15 @@ struct inode *ceph_get_snapdir(struct inode *parent)
 	inode->i_mtime = parent->i_mtime;
 	inode->i_ctime = parent->i_ctime;
 	inode->i_atime = parent->i_atime;
-	inode->i_op = &ceph_snapdir_iops;
-	inode->i_fop = &ceph_snapdir_fops;
-	ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
 	ci->i_rbytes = 0;
 	ci->i_btime = ceph_inode(parent)->i_btime;
 
-	if (inode->i_state & I_NEW)
+	if (inode->i_state & I_NEW) {
+		inode->i_op = &ceph_snapdir_iops;
+		inode->i_fop = &ceph_snapdir_fops;
+		ci->i_snap_caps = CEPH_CAP_PIN; /* so we can open */
 		unlock_new_inode(inode);
+	}
 
 	return inode;
 }
@@ -628,10 +632,11 @@ int ceph_fill_file_size(struct inode *inode, int issued,
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int queue_trunc = 0;
+	loff_t isize = i_size_read(inode);
 
 	if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
-	    (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
-		dout("size %lld -> %llu\n", inode->i_size, size);
+	    (truncate_seq == ci->i_truncate_seq && size > isize)) {
+		dout("size %lld -> %llu\n", isize, size);
 		if (size > 0 && S_ISDIR(inode->i_mode)) {
 			pr_err("fill_file_size non-zero size for directory\n");
 			size = 0;
@@ -925,6 +930,7 @@ int ceph_fill_inode(struct inode *inode, struct page *locked_page,
 			ci->i_rfiles = le64_to_cpu(info->rfiles);
 			ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
 			ci->i_dir_pin = iinfo->dir_pin;
+			ci->i_rsnaps = iinfo->rsnaps;
 			ceph_decode_timespec64(&ci->i_rctime, &info->rctime);
 		}
 	}
@@ -1818,7 +1824,7 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
 	bool ret;
 
 	spin_lock(&ci->i_ceph_lock);
-	dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
+	dout("set_size %p %llu -> %llu\n", inode, i_size_read(inode), size);
 	i_size_write(inode, size);
 	inode->i_blocks = calc_inode_blocks(size);
 
@@ -1894,6 +1900,7 @@ static void ceph_do_invalidate_pages(struct inode *inode)
 	orig_gen = ci->i_rdcache_gen;
 	spin_unlock(&ci->i_ceph_lock);
 
+	ceph_fscache_invalidate(inode);
 	if (invalidate_inode_pages2(inode->i_mapping) < 0) {
 		pr_err("invalidate_pages %p fails\n", inode);
 	}
@@ -2124,20 +2131,19 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
 		}
 	}
 	if (ia_valid & ATTR_SIZE) {
-		dout("setattr %p size %lld -> %lld\n", inode,
-		     inode->i_size, attr->ia_size);
-		if ((issued & CEPH_CAP_FILE_EXCL) &&
-		    attr->ia_size > inode->i_size) {
+		loff_t isize = i_size_read(inode);
+
+		dout("setattr %p size %lld -> %lld\n", inode, isize, attr->ia_size);
+		if ((issued & CEPH_CAP_FILE_EXCL) && attr->ia_size > isize) {
 			i_size_write(inode, attr->ia_size);
 			inode->i_blocks = calc_inode_blocks(attr->ia_size);
 			ci->i_reported_size = attr->ia_size;
 			dirtied |= CEPH_CAP_FILE_EXCL;
 			ia_valid |= ATTR_MTIME;
 		} else if ((issued & CEPH_CAP_FILE_SHARED) == 0 ||
-			   attr->ia_size != inode->i_size) {
+			   attr->ia_size != isize) {
 			req->r_args.setattr.size = cpu_to_le64(attr->ia_size);
-			req->r_args.setattr.old_size =
-				cpu_to_le64(inode->i_size);
+			req->r_args.setattr.old_size = cpu_to_le64(isize);
 			mask |= CEPH_SETATTR_SIZE;
 			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
 				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
@@ -2247,7 +2253,7 @@ int ceph_setattr(struct user_namespace *mnt_userns, struct dentry *dentry,
 		return err;
 
 	if ((attr->ia_valid & ATTR_SIZE) &&
-	    attr->ia_size > max(inode->i_size, fsc->max_file_size))
+	    attr->ia_size > max(i_size_read(inode), fsc->max_file_size))
 		return -EFBIG;
 
 	if ((attr->ia_valid & ATTR_SIZE) &&
diff --git a/fs/ceph/io.c b/fs/ceph/io.c
index 97602ea92ff4..c456509b31c3 100644
--- a/fs/ceph/io.c
+++ b/fs/ceph/io.c
@@ -118,7 +118,7 @@ static void ceph_block_buffered(struct ceph_inode_info *ci, struct inode *inode)
 }
 
 /**
- * ceph_end_io_direct - declare the file is being used for direct i/o
+ * ceph_start_io_direct - declare the file is being used for direct i/o
  * @inode: file inode
  *
  * Declare that a direct I/O operation is about to start, and ensure
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index d87bd852ed96..e5af591d3bd4 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -176,6 +176,13 @@ static int parse_reply_info_in(void **p, void *end,
 			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
 		}
 
+		/* snapshot count, remains zero for v<=3 */
+		if (struct_v >= 4) {
+			ceph_decode_64_safe(p, end, info->rsnaps, bad);
+		} else {
+			info->rsnaps = 0;
+		}
+
 		*p = end;
 	} else {
 		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
@@ -214,7 +221,7 @@ static int parse_reply_info_in(void **p, void *end,
 		}
 
 		info->dir_pin = -ENODATA;
-		/* info->snap_btime remains zero */
+		/* info->snap_btime and info->rsnaps remain zero */
 	}
 	return 0;
 bad:
@@ -433,6 +440,13 @@ static int ceph_parse_deleg_inos(void **p, void *end,
 
 		ceph_decode_64_safe(p, end, start, bad);
 		ceph_decode_64_safe(p, end, len, bad);
+
+		/* Don't accept a delegation of system inodes */
+		if (start < CEPH_INO_SYSTEM_BASE) {
+			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
+					start, len);
+			continue;
+		}
 		while (len--) {
 			int err = xa_insert(&s->s_delegated_inos, ino = start++,
 					    DELEGATED_INO_AVAILABLE,
@@ -3306,7 +3320,7 @@ out_err:
 	/* kick calling process */
 	complete_request(mdsc, req);
 
-	ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
+	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
 				     req->r_end_latency, err);
 out:
 	ceph_mdsc_put_request(req);
@@ -3780,7 +3794,7 @@ static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
 		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
 		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
 		rec.v1.issued = cpu_to_le32(cap->issued);
-		rec.v1.size = cpu_to_le64(inode->i_size);
+		rec.v1.size = cpu_to_le64(i_size_read(inode));
 		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
 		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index eaa7c5422116..15c11a0f2caf 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -88,6 +88,7 @@ struct ceph_mds_reply_info_in {
 	s32 dir_pin;
 	struct ceph_timespec btime;
 	struct ceph_timespec snap_btime;
+	u64 rsnaps;
 	u64 change_attr;
 };
 
diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c
index 5ec94bd4c1de..28b6b42ad677 100644
--- a/fs/ceph/metric.c
+++ b/fs/ceph/metric.c
@@ -17,6 +17,9 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 	struct ceph_metric_write_latency *write;
 	struct ceph_metric_metadata_latency *meta;
 	struct ceph_metric_dlease *dlease;
+	struct ceph_opened_files *files;
+	struct ceph_pinned_icaps *icaps;
+	struct ceph_opened_inodes *inodes;
 	struct ceph_client_metric *m = &mdsc->metric;
 	u64 nr_caps = atomic64_read(&m->total_caps);
 	struct ceph_msg *msg;
@@ -26,7 +29,8 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 	s32 len;
 
 	len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
-	      + sizeof(*meta) + sizeof(*dlease);
+	      + sizeof(*meta) + sizeof(*dlease) + sizeof(*files)
+	      + sizeof(*icaps) + sizeof(*inodes);
 
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
 	if (!msg) {
@@ -95,6 +99,38 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
 	dlease->total = cpu_to_le64(atomic64_read(&m->total_dentries));
 	items++;
 
+	sum = percpu_counter_sum(&m->total_inodes);
+
+	/* encode the opened files metric */
+	files = (struct ceph_opened_files *)(dlease + 1);
+	files->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_FILES);
+	files->ver = 1;
+	files->compat = 1;
+	files->data_len = cpu_to_le32(sizeof(*files) - 10);
+	files->opened_files = cpu_to_le64(atomic64_read(&m->opened_files));
+	files->total = cpu_to_le64(sum);
+	items++;
+
+	/* encode the pinned icaps metric */
+	icaps = (struct ceph_pinned_icaps *)(files + 1);
+	icaps->type = cpu_to_le32(CLIENT_METRIC_TYPE_PINNED_ICAPS);
+	icaps->ver = 1;
+	icaps->compat = 1;
+	icaps->data_len = cpu_to_le32(sizeof(*icaps) - 10);
+	icaps->pinned_icaps = cpu_to_le64(nr_caps);
+	icaps->total = cpu_to_le64(sum);
+	items++;
+
+	/* encode the opened inodes metric */
+	inodes = (struct ceph_opened_inodes *)(icaps + 1);
+	inodes->type = cpu_to_le32(CLIENT_METRIC_TYPE_OPENED_INODES);
+	inodes->ver = 1;
+	inodes->compat = 1;
+	inodes->data_len = cpu_to_le32(sizeof(*inodes) - 10);
+	inodes->opened_inodes = cpu_to_le64(percpu_counter_sum(&m->opened_inodes));
+	inodes->total = cpu_to_le64(sum);
+	items++;
+
 	put_unaligned_le32(items, &head->num);
 	msg->front.iov_len = len;
 	msg->hdr.version = cpu_to_le16(1);
@@ -183,21 +219,21 @@ int ceph_metric_init(struct ceph_client_metric *m)
 	if (ret)
 		goto err_i_caps_mis;
 
-	spin_lock_init(&m->read_latency_lock);
+	spin_lock_init(&m->read_metric_lock);
 	m->read_latency_sq_sum = 0;
 	m->read_latency_min = KTIME_MAX;
 	m->read_latency_max = 0;
 	m->total_reads = 0;
 	m->read_latency_sum = 0;
 
-	spin_lock_init(&m->write_latency_lock);
+	spin_lock_init(&m->write_metric_lock);
 	m->write_latency_sq_sum = 0;
 	m->write_latency_min = KTIME_MAX;
 	m->write_latency_max = 0;
 	m->total_writes = 0;
 	m->write_latency_sum = 0;
 
-	spin_lock_init(&m->metadata_latency_lock);
+	spin_lock_init(&m->metadata_metric_lock);
 	m->metadata_latency_sq_sum = 0;
 	m->metadata_latency_min = KTIME_MAX;
 	m->metadata_latency_max = 0;
@@ -274,7 +310,7 @@ static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
 	*sq_sump += sq;
 }
 
-void ceph_update_read_latency(struct ceph_client_metric *m,
+void ceph_update_read_metrics(struct ceph_client_metric *m,
 			      ktime_t r_start, ktime_t r_end,
 			      int rc)
 {
@@ -283,14 +319,14 @@ void ceph_update_read_latency(struct ceph_client_metric *m,
 	if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
 		return;
 
-	spin_lock(&m->read_latency_lock);
+	spin_lock(&m->read_metric_lock);
 	__update_latency(&m->total_reads, &m->read_latency_sum,
 			 &m->read_latency_min, &m->read_latency_max,
 			 &m->read_latency_sq_sum, lat);
-	spin_unlock(&m->read_latency_lock);
+	spin_unlock(&m->read_metric_lock);
 }
 
-void ceph_update_write_latency(struct ceph_client_metric *m,
+void ceph_update_write_metrics(struct ceph_client_metric *m,
 			       ktime_t r_start, ktime_t r_end,
 			       int rc)
 {
@@ -299,14 +335,14 @@ void ceph_update_write_latency(struct ceph_client_metric *m,
 	if (unlikely(rc && rc != -ETIMEDOUT))
 		return;
 
-	spin_lock(&m->write_latency_lock);
+	spin_lock(&m->write_metric_lock);
 	__update_latency(&m->total_writes, &m->write_latency_sum,
 			 &m->write_latency_min, &m->write_latency_max,
 			 &m->write_latency_sq_sum, lat);
-	spin_unlock(&m->write_latency_lock);
+	spin_unlock(&m->write_metric_lock);
 }
 
-void ceph_update_metadata_latency(struct ceph_client_metric *m,
+void ceph_update_metadata_metrics(struct ceph_client_metric *m,
 				  ktime_t r_start, ktime_t r_end,
 				  int rc)
 {
@@ -315,9 +351,9 @@ void ceph_update_metadata_latency(struct ceph_client_metric *m,
 	if (unlikely(rc && rc != -ENOENT))
 		return;
 
-	spin_lock(&m->metadata_latency_lock);
+	spin_lock(&m->metadata_metric_lock);
 	__update_latency(&m->total_metadatas, &m->metadata_latency_sum,
 			 &m->metadata_latency_min, &m->metadata_latency_max,
 			 &m->metadata_latency_sq_sum, lat);
-	spin_unlock(&m->metadata_latency_lock);
+	spin_unlock(&m->metadata_metric_lock);
 }
diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h
index af6038ff39d4..e984eb2bb14b 100644
--- a/fs/ceph/metric.h
+++ b/fs/ceph/metric.h
@@ -14,8 +14,11 @@ enum ceph_metric_type {
 	CLIENT_METRIC_TYPE_WRITE_LATENCY,
 	CLIENT_METRIC_TYPE_METADATA_LATENCY,
 	CLIENT_METRIC_TYPE_DENTRY_LEASE,
+	CLIENT_METRIC_TYPE_OPENED_FILES,
+	CLIENT_METRIC_TYPE_PINNED_ICAPS,
+	CLIENT_METRIC_TYPE_OPENED_INODES,
 
-	CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_DENTRY_LEASE,
+	CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_OPENED_INODES,
 };
 
 /*
@@ -28,6 +31,9 @@ enum ceph_metric_type {
 	CLIENT_METRIC_TYPE_WRITE_LATENCY,	\
 	CLIENT_METRIC_TYPE_METADATA_LATENCY,	\
 	CLIENT_METRIC_TYPE_DENTRY_LEASE,	\
+	CLIENT_METRIC_TYPE_OPENED_FILES,	\
+	CLIENT_METRIC_TYPE_PINNED_ICAPS,	\
+	CLIENT_METRIC_TYPE_OPENED_INODES,	\
 						\
 	CLIENT_METRIC_TYPE_MAX,			\
 }
@@ -94,6 +100,42 @@ struct ceph_metric_dlease {
 	__le64 total;
 } __packed;
 
+/* metric opened files header */
+struct ceph_opened_files {
+	__le32 type;     /* ceph metric type */
+
+	__u8  ver;
+	__u8  compat;
+
+	__le32 data_len; /* length of sizeof(opened_files + total) */
+	__le64 opened_files;
+	__le64 total;
+} __packed;
+
+/* metric pinned i_caps header */
+struct ceph_pinned_icaps {
+	__le32 type;     /* ceph metric type */
+
+	__u8  ver;
+	__u8  compat;
+
+	__le32 data_len; /* length of sizeof(pinned_icaps + total) */
+	__le64 pinned_icaps;
+	__le64 total;
+} __packed;
+
+/* metric opened inodes header */
+struct ceph_opened_inodes {
+	__le32 type;     /* ceph metric type */
+
+	__u8  ver;
+	__u8  compat;
+
+	__le32 data_len; /* length of sizeof(opened_inodes + total) */
+	__le64 opened_inodes;
+	__le64 total;
+} __packed;
+
 struct ceph_metric_head {
 	__le32 num;	/* the number of metrics that will be sent */
 } __packed;
@@ -108,21 +150,21 @@ struct ceph_client_metric {
 	struct percpu_counter i_caps_hit;
 	struct percpu_counter i_caps_mis;
 
-	spinlock_t read_latency_lock;
+	spinlock_t read_metric_lock;
 	u64 total_reads;
 	ktime_t read_latency_sum;
 	ktime_t read_latency_sq_sum;
 	ktime_t read_latency_min;
 	ktime_t read_latency_max;
 
-	spinlock_t write_latency_lock;
+	spinlock_t write_metric_lock;
 	u64 total_writes;
 	ktime_t write_latency_sum;
 	ktime_t write_latency_sq_sum;
 	ktime_t write_latency_min;
 	ktime_t write_latency_max;
 
-	spinlock_t metadata_latency_lock;
+	spinlock_t metadata_metric_lock;
 	u64 total_metadatas;
 	ktime_t metadata_latency_sum;
 	ktime_t metadata_latency_sq_sum;
@@ -162,13 +204,13 @@ static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
 	percpu_counter_inc(&m->i_caps_mis);
 }
 
-extern void ceph_update_read_latency(struct ceph_client_metric *m,
+extern void ceph_update_read_metrics(struct ceph_client_metric *m,
 				     ktime_t r_start, ktime_t r_end,
 				     int rc);
-extern void ceph_update_write_latency(struct ceph_client_metric *m,
+extern void ceph_update_write_metrics(struct ceph_client_metric *m,
 				      ktime_t r_start, ktime_t r_end,
 				      int rc);
-extern void ceph_update_metadata_latency(struct ceph_client_metric *m,
+extern void ceph_update_metadata_metrics(struct ceph_client_metric *m,
 				         ktime_t r_start, ktime_t r_end,
 					 int rc);
 #endif /* _FS_CEPH_MDS_METRIC_H */
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 0728b01d4d43..4ce18055d931 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -605,7 +605,7 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
 	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 
 	BUG_ON(capsnap->writing);
-	capsnap->size = inode->i_size;
+	capsnap->size = i_size_read(inode);
 	capsnap->mtime = inode->i_mtime;
 	capsnap->atime = inode->i_atime;
 	capsnap->ctime = inode->i_ctime;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index c48bb30c8d70..db80d89556b1 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -21,6 +21,7 @@
 #include <linux/ceph/libceph.h>
 
 #ifdef CONFIG_CEPH_FSCACHE
+#define FSCACHE_USE_NEW_IO_API
 #include <linux/fscache.h>
 #endif
 
@@ -333,7 +334,7 @@ struct ceph_inode_info {
 
 	/* for dirs */
 	struct timespec64 i_rctime;
-	u64 i_rbytes, i_rfiles, i_rsubdirs;
+	u64 i_rbytes, i_rfiles, i_rsubdirs, i_rsnaps;
 	u64 i_files, i_subdirs;
 
 	/* quotas */
@@ -427,7 +428,6 @@ struct ceph_inode_info {
 
 #ifdef CONFIG_CEPH_FSCACHE
 	struct fscache_cookie *fscache;
-	u32 i_fscache_gen;
 #endif
 	errseq_t i_meta_err;
 
@@ -529,10 +529,34 @@ static inline int ceph_ino_compare(struct inode *inode, void *data)
 		ci->i_vino.snap == pvino->snap;
 }
 
+/*
+ * The MDS reserves a set of inodes for its own usage. These should never
+ * be accessible by clients, and so the MDS has no reason to ever hand these
+ * out. The range is CEPH_MDS_INO_MDSDIR_OFFSET..CEPH_INO_SYSTEM_BASE.
+ *
+ * These come from src/mds/mdstypes.h in the ceph sources.
+ */
+#define CEPH_MAX_MDS		0x100
+#define CEPH_NUM_STRAY		10
+#define CEPH_MDS_INO_MDSDIR_OFFSET	(1 * CEPH_MAX_MDS)
+#define CEPH_INO_SYSTEM_BASE		((6*CEPH_MAX_MDS) + (CEPH_MAX_MDS * CEPH_NUM_STRAY))
+
+static inline bool ceph_vino_is_reserved(const struct ceph_vino vino)
+{
+	if (vino.ino < CEPH_INO_SYSTEM_BASE &&
+	    vino.ino >= CEPH_MDS_INO_MDSDIR_OFFSET) {
+		WARN_RATELIMIT(1, "Attempt to access reserved inode number 0x%llx", vino.ino);
+		return true;
+	}
+	return false;
+}
 
 static inline struct inode *ceph_find_inode(struct super_block *sb,
 					    struct ceph_vino vino)
 {
+	if (ceph_vino_is_reserved(vino))
+		return NULL;
+
 	/*
 	 * NB: The hashval will be run through the fs/inode.c hash function
 	 * anyway, so there is no need to squash the inode number down to
@@ -1156,7 +1180,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
 				      int mds, int drop, int unless);
 
 extern int ceph_get_caps(struct file *filp, int need, int want,
-			 loff_t endoff, int *got, struct page **pinned_page);
+			 loff_t endoff, int *got);
 extern int ceph_try_get_caps(struct inode *inode,
 			     int need, int want, bool nonblock, int *got);
 
@@ -1193,7 +1217,7 @@ extern const struct dentry_operations ceph_dentry_ops;
 
 extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);
 extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
-extern int ceph_handle_snapdir(struct ceph_mds_request *req,
+extern struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req,
 			       struct dentry *dentry, int err);
 extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
 					 struct dentry *dentry, int err);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 02f59bcb4f27..1242db8d3444 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -233,6 +233,12 @@ static ssize_t ceph_vxattrcb_dir_rsubdirs(struct ceph_inode_info *ci, char *val,
 	return ceph_fmt_xattr(val, size, "%lld", ci->i_rsubdirs);
 }
 
+static ssize_t ceph_vxattrcb_dir_rsnaps(struct ceph_inode_info *ci, char *val,
+					  size_t size)
+{
+	return ceph_fmt_xattr(val, size, "%lld", ci->i_rsnaps);
+}
+
 static ssize_t ceph_vxattrcb_dir_rbytes(struct ceph_inode_info *ci, char *val,
 					size_t size)
 {
@@ -384,6 +390,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
 	XATTR_RSTAT_FIELD(dir, rentries),
 	XATTR_RSTAT_FIELD(dir, rfiles),
 	XATTR_RSTAT_FIELD(dir, rsubdirs),
+	XATTR_RSTAT_FIELD(dir, rsnaps),
 	XATTR_RSTAT_FIELD(dir, rbytes),
 	XATTR_RSTAT_FIELD(dir, rctime),
 	{