summary refs log tree commit diff
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2015-07-02 11:35:00 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2015-07-02 11:35:00 -0700
commit0c76c6ba246043bbc5c0f9620a0645ae78217421 (patch)
tree644a4db58706c4e97478951f0a3a0087ddf26e5e /fs/ceph
parent8688d9540cc6e17df4cba71615e27f04e0378fe6 (diff)
parent5a60e87603c4c533492c515b7f62578189b03c9c (diff)
downloadlinux-0c76c6ba246043bbc5c0f9620a0645ae78217421.tar.gz
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
 "We have a pile of bug fixes from Ilya, including a few patches that
  sync up the CRUSH code with the latest from userspace.

  There is also a long series from Zheng that fixes various issues with
  snapshots, inline data, and directory fsync, some simplification and
  improvement in the cap release code, and a rework of the caching of
  directory contents.

  To top it off there are a few small fixes and cleanups from Benoit and
  Hong"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
  rbd: use GFP_NOIO in rbd_obj_request_create()
  crush: fix a bug in tree bucket decode
  libceph: Fix ceph_tcp_sendpage()'s more boolean usage
  libceph: Remove spurious kunmap() of the zero page
  rbd: queue_depth map option
  rbd: store rbd_options in rbd_device
  rbd: terminate rbd_opts_tokens with Opt_err
  ceph: fix ceph_writepages_start()
  rbd: bump queue_max_segments
  ceph: rework dcache readdir
  crush: sync up with userspace
  crush: fix crash from invalid 'take' argument
  ceph: switch some GFP_NOFS memory allocation to GFP_KERNEL
  ceph: pre-allocate data structure that tracks caps flushing
  ceph: re-send flushing caps (which are revoked) in reconnect stage
  ceph: send TID of the oldest pending caps flush to MDS
  ceph: track pending caps flushing globally
  ceph: track pending caps flushing accurately
  libceph: fix wrong name "Ceph filesystem for Linux"
  ceph: fix directory fsync
  ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/acl.c4
-rw-r--r--fs/ceph/addr.c308
-rw-r--r--fs/ceph/caps.c836
-rw-r--r--fs/ceph/dir.c383
-rw-r--r--fs/ceph/file.c61
-rw-r--r--fs/ceph/inode.c155
-rw-r--r--fs/ceph/mds_client.c425
-rw-r--r--fs/ceph/mds_client.h23
-rw-r--r--fs/ceph/snap.c173
-rw-r--r--fs/ceph/super.c25
-rw-r--r--fs/ceph/super.h125
-rw-r--r--fs/ceph/xattr.c65
12 files changed, 1689 insertions, 894 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 64fa248343f6..8f84646f10e9 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -187,10 +187,10 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
 		val_size2 = posix_acl_xattr_size(default_acl->a_count);
 
 	err = -ENOMEM;
-	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
+	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL);
 	if (!tmp_buf)
 		goto out_err;
-	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
+	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL);
 	if (!pagelist)
 		goto out_err;
 	ceph_pagelist_init(pagelist);
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index e162bcd105ee..890c50971a69 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page)
 	inode = mapping->host;
 	ci = ceph_inode(inode);
 
-	/*
-	 * Note that we're grabbing a snapc ref here without holding
-	 * any locks!
-	 */
-	snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
-
 	/* dirty the head */
 	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_head_snapc == NULL)
-		ci->i_head_snapc = ceph_get_snap_context(snapc);
-	++ci->i_wrbuffer_ref_head;
+	BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
+	if (__ceph_have_pending_cap_snap(ci)) {
+		struct ceph_cap_snap *capsnap =
+				list_last_entry(&ci->i_cap_snaps,
+						struct ceph_cap_snap,
+						ci_item);
+		snapc = ceph_get_snap_context(capsnap->context);
+		capsnap->dirty_pages++;
+	} else {
+		BUG_ON(!ci->i_head_snapc);
+		snapc = ceph_get_snap_context(ci->i_head_snapc);
+		++ci->i_wrbuffer_ref_head;
+	}
 	if (ci->i_wrbuffer_ref == 0)
 		ihold(inode);
 	++ci->i_wrbuffer_ref;
@@ -346,7 +350,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 
 	/* build page vector */
 	nr_pages = calc_pages_for(0, len);
-	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
+	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
 	ret = -ENOMEM;
 	if (!pages)
 		goto out;
@@ -358,7 +362,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
  		dout("start_read %p adding %p idx %lu\n", inode, page,
 		     page->index);
 		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
-					  GFP_NOFS)) {
+					  GFP_KERNEL)) {
 			ceph_fscache_uncache_page(inode, page);
 			page_cache_release(page);
 			dout("start_read %p add_to_page_cache failed %p\n",
@@ -436,7 +440,7 @@ out:
  * only snap context we are allowed to write back.
  */
 static struct ceph_snap_context *get_oldest_context(struct inode *inode,
-						    u64 *snap_size)
+						    loff_t *snap_size)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_snap_context *snapc = NULL;
@@ -476,8 +480,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	struct ceph_osd_client *osdc;
 	struct ceph_snap_context *snapc, *oldest;
 	loff_t page_off = page_offset(page);
+	loff_t snap_size = -1;
 	long writeback_stat;
-	u64 truncate_size, snap_size = 0;
+	u64 truncate_size;
 	u32 truncate_seq;
 	int err = 0, len = PAGE_CACHE_SIZE;
 
@@ -512,7 +517,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	spin_lock(&ci->i_ceph_lock);
 	truncate_seq = ci->i_truncate_seq;
 	truncate_size = ci->i_truncate_size;
-	if (!snap_size)
+	if (snap_size == -1)
 		snap_size = i_size_read(inode);
 	spin_unlock(&ci->i_ceph_lock);
 
@@ -695,7 +700,8 @@ static int ceph_writepages_start(struct address_space *mapping,
 	unsigned wsize = 1 << inode->i_blkbits;
 	struct ceph_osd_request *req = NULL;
 	int do_sync = 0;
-	u64 truncate_size, snap_size;
+	loff_t snap_size, i_size;
+	u64 truncate_size;
 	u32 truncate_seq;
 
 	/*
@@ -741,7 +747,7 @@ static int ceph_writepages_start(struct address_space *mapping,
 retry:
 	/* find oldest snap context with dirty data */
 	ceph_put_snap_context(snapc);
-	snap_size = 0;
+	snap_size = -1;
 	snapc = get_oldest_context(inode, &snap_size);
 	if (!snapc) {
 		/* hmm, why does writepages get called when there
@@ -749,16 +755,13 @@ retry:
 		dout(" no snap context with dirty data?\n");
 		goto out;
 	}
-	if (snap_size == 0)
-		snap_size = i_size_read(inode);
 	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
 	     snapc, snapc->seq, snapc->num_snaps);
 
 	spin_lock(&ci->i_ceph_lock);
 	truncate_seq = ci->i_truncate_seq;
 	truncate_size = ci->i_truncate_size;
-	if (!snap_size)
-		snap_size = i_size_read(inode);
+	i_size = i_size_read(inode);
 	spin_unlock(&ci->i_ceph_lock);
 
 	if (last_snapc && snapc != last_snapc) {
@@ -828,8 +831,10 @@ get_more_pages:
 				dout("waiting on writeback %p\n", page);
 				wait_on_page_writeback(page);
 			}
-			if (page_offset(page) >= snap_size) {
-				dout("%p page eof %llu\n", page, snap_size);
+			if (page_offset(page) >=
+			    (snap_size == -1 ? i_size : snap_size)) {
+				dout("%p page eof %llu\n", page,
+				     (snap_size == -1 ? i_size : snap_size));
 				done = 1;
 				unlock_page(page);
 				break;
@@ -884,7 +889,8 @@ get_more_pages:
 				}
 
 				if (do_sync)
-					osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+					osd_req_op_init(req, 1,
+							CEPH_OSD_OP_STARTSYNC, 0);
 
 				req->r_callback = writepages_finish;
 				req->r_inode = inode;
@@ -944,10 +950,18 @@ get_more_pages:
 		}
 
 		/* Format the osd request message and submit the write */
-
 		offset = page_offset(pages[0]);
-		len = min(snap_size - offset,
-			  (u64)locked_pages << PAGE_CACHE_SHIFT);
+		len = (u64)locked_pages << PAGE_CACHE_SHIFT;
+		if (snap_size == -1) {
+			len = min(len, (u64)i_size_read(inode) - offset);
+			 /* writepages_finish() clears writeback pages
+			  * according to the data length, so make sure
+			  * data length covers all locked pages */
+			len = max(len, 1 +
+				((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
+		} else {
+			len = min(len, snap_size - offset);
+		}
 		dout("writepages got %d pages at %llu~%llu\n",
 		     locked_pages, offset, len);
 
@@ -1032,7 +1046,6 @@ static int ceph_update_writeable_page(struct file *file,
 {
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	loff_t page_off = pos & PAGE_CACHE_MASK;
 	int pos_in_page = pos & ~PAGE_CACHE_MASK;
 	int end_in_page = pos_in_page + len;
@@ -1044,10 +1057,6 @@ retry_locked:
 	/* writepages currently holds page lock, but if we change that later, */
 	wait_on_page_writeback(page);
 
-	/* check snap context */
-	BUG_ON(!ci->i_snap_realm);
-	down_read(&mdsc->snap_rwsem);
-	BUG_ON(!ci->i_snap_realm->cached_context);
 	snapc = page_snap_context(page);
 	if (snapc && snapc != ci->i_head_snapc) {
 		/*
@@ -1055,7 +1064,6 @@ retry_locked:
 		 * context!  is it writeable now?
 		 */
 		oldest = get_oldest_context(inode, NULL);
-		up_read(&mdsc->snap_rwsem);
 
 		if (snapc->seq > oldest->seq) {
 			ceph_put_snap_context(oldest);
@@ -1112,7 +1120,6 @@ retry_locked:
 	}
 
 	/* we need to read it. */
-	up_read(&mdsc->snap_rwsem);
 	r = readpage_nounlock(file, page);
 	if (r < 0)
 		goto fail_nosnap;
@@ -1157,16 +1164,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 
 /*
  * we don't do anything in here that simple_write_end doesn't do
- * except adjust dirty page accounting and drop read lock on
- * mdsc->snap_rwsem.
+ * except adjust dirty page accounting
  */
 static int ceph_write_end(struct file *file, struct address_space *mapping,
 			  loff_t pos, unsigned len, unsigned copied,
 			  struct page *page, void *fsdata)
 {
 	struct inode *inode = file_inode(file);
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_mds_client *mdsc = fsc->mdsc;
 	unsigned from = pos & (PAGE_CACHE_SIZE - 1);
 	int check_cap = 0;
 
@@ -1188,7 +1192,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
 	set_page_dirty(page);
 
 	unlock_page(page);
-	up_read(&mdsc->snap_rwsem);
 	page_cache_release(page);
 
 	if (check_cap)
@@ -1314,13 +1317,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 	struct inode *inode = file_inode(vma->vm_file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_file_info *fi = vma->vm_file->private_data;
-	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+	struct ceph_cap_flush *prealloc_cf;
 	struct page *page = vmf->page;
 	loff_t off = page_offset(page);
 	loff_t size = i_size_read(inode);
 	size_t len;
 	int want, got, ret;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return VM_FAULT_SIGBUS;
+
 	if (ci->i_inline_version != CEPH_INLINE_NONE) {
 		struct page *locked_page = NULL;
 		if (off == 0) {
@@ -1330,8 +1337,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 		ret = ceph_uninline_data(vma->vm_file, locked_page);
 		if (locked_page)
 			unlock_page(locked_page);
-		if (ret < 0)
-			return VM_FAULT_SIGBUS;
+		if (ret < 0) {
+			ret = VM_FAULT_SIGBUS;
+			goto out_free;
+		}
 	}
 
 	if (off + PAGE_CACHE_SIZE <= size)
@@ -1353,7 +1362,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 			break;
 		if (ret != -ERESTARTSYS) {
 			WARN_ON(1);
-			return VM_FAULT_SIGBUS;
+			ret = VM_FAULT_SIGBUS;
+			goto out_free;
 		}
 	}
 	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
@@ -1373,7 +1383,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 	if (ret == 0) {
 		/* success.  we'll keep the page locked. */
 		set_page_dirty(page);
-		up_read(&mdsc->snap_rwsem);
 		ret = VM_FAULT_LOCKED;
 	} else {
 		if (ret == -ENOMEM)
@@ -1389,7 +1398,8 @@ out:
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1398,6 +1408,8 @@ out:
 	dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n",
 	     inode, off, len, ceph_cap_string(got), ret);
 	ceph_put_cap_refs(ci, got);
+out_free:
+	ceph_free_cap_flush(prealloc_cf);
 
 	return ret;
 }
@@ -1509,8 +1521,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 				    ceph_vino(inode), 0, &len, 0, 1,
 				    CEPH_OSD_OP_CREATE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ci->i_snap_realm->cached_context,
-				    0, 0, false);
+				    ceph_empty_snapc, 0, 0, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
 		goto out;
@@ -1528,7 +1539,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 				    ceph_vino(inode), 0, &len, 1, 3,
 				    CEPH_OSD_OP_WRITE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ci->i_snap_realm->cached_context,
+				    ceph_empty_snapc,
 				    ci->i_truncate_seq, ci->i_truncate_size,
 				    false);
 	if (IS_ERR(req)) {
@@ -1597,3 +1608,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma)
 	vma->vm_ops = &ceph_vmops;
 	return 0;
 }
+
+enum {
+	POOL_READ	= 1,
+	POOL_WRITE	= 2,
+};
+
+static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
+{
+	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
+	struct ceph_mds_client *mdsc = fsc->mdsc;
+	struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
+	struct rb_node **p, *parent;
+	struct ceph_pool_perm *perm;
+	struct page **pages;
+	int err = 0, err2 = 0, have = 0;
+
+	down_read(&mdsc->pool_perm_rwsem);
+	p = &mdsc->pool_perm_tree.rb_node;
+	while (*p) {
+		perm = rb_entry(*p, struct ceph_pool_perm, node);
+		if (pool < perm->pool)
+			p = &(*p)->rb_left;
+		else if (pool > perm->pool)
+			p = &(*p)->rb_right;
+		else {
+			have = perm->perm;
+			break;
+		}
+	}
+	up_read(&mdsc->pool_perm_rwsem);
+	if (*p)
+		goto out;
+
+	dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);
+
+	down_write(&mdsc->pool_perm_rwsem);
+	parent = NULL;
+	while (*p) {
+		parent = *p;
+		perm = rb_entry(parent, struct ceph_pool_perm, node);
+		if (pool < perm->pool)
+			p = &(*p)->rb_left;
+		else if (pool > perm->pool)
+			p = &(*p)->rb_right;
+		else {
+			have = perm->perm;
+			break;
+		}
+	}
+	if (*p) {
+		up_write(&mdsc->pool_perm_rwsem);
+		goto out;
+	}
+
+	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
+					 ceph_empty_snapc,
+					 1, false, GFP_NOFS);
+	if (!rd_req) {
+		err = -ENOMEM;
+		goto out_unlock;
+	}
+
+	rd_req->r_flags = CEPH_OSD_FLAG_READ;
+	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
+	rd_req->r_base_oloc.pool = pool;
+	snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name),
+		 "%llx.00000000", ci->i_vino.ino);
+	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
+
+	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
+					 ceph_empty_snapc,
+					 1, false, GFP_NOFS);
+	if (!wr_req) {
+		err = -ENOMEM;
+		goto out_unlock;
+	}
+
+	wr_req->r_flags = CEPH_OSD_FLAG_WRITE |
+			  CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+	osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
+	wr_req->r_base_oloc.pool = pool;
+	wr_req->r_base_oid = rd_req->r_base_oid;
+
+	/* one page should be large enough for STAT data */
+	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
+	if (IS_ERR(pages)) {
+		err = PTR_ERR(pages);
+		goto out_unlock;
+	}
+
+	osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
+				     0, false, true);
+	ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP,
+				&ci->vfs_inode.i_mtime);
+	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
+
+	ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP,
+				&ci->vfs_inode.i_mtime);
+	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
+
+	if (!err)
+		err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
+	if (!err2)
+		err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
+
+	if (err >= 0 || err == -ENOENT)
+		have |= POOL_READ;
+	else if (err != -EPERM)
+		goto out_unlock;
+
+	if (err2 == 0 || err2 == -EEXIST)
+		have |= POOL_WRITE;
+	else if (err2 != -EPERM) {
+		err = err2;
+		goto out_unlock;
+	}
+
+	perm = kmalloc(sizeof(*perm), GFP_NOFS);
+	if (!perm) {
+		err = -ENOMEM;
+		goto out_unlock;
+	}
+
+	perm->pool = pool;
+	perm->perm = have;
+	rb_link_node(&perm->node, parent, p);
+	rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
+	err = 0;
+out_unlock:
+	up_write(&mdsc->pool_perm_rwsem);
+
+	if (rd_req)
+		ceph_osdc_put_request(rd_req);
+	if (wr_req)
+		ceph_osdc_put_request(wr_req);
+out:
+	if (!err)
+		err = have;
+	dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
+	return err;
+}
+
+int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
+{
+	u32 pool;
+	int ret, flags;
+
+	if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
+				NOPOOLPERM))
+		return 0;
+
+	spin_lock(&ci->i_ceph_lock);
+	flags = ci->i_ceph_flags;
+	pool = ceph_file_layout_pg_pool(ci->i_layout);
+	spin_unlock(&ci->i_ceph_lock);
+check:
+	if (flags & CEPH_I_POOL_PERM) {
+		if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
+			dout("ceph_pool_perm_check pool %u no read perm\n",
+			     pool);
+			return -EPERM;
+		}
+		if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
+			dout("ceph_pool_perm_check pool %u no write perm\n",
+			     pool);
+			return -EPERM;
+		}
+		return 0;
+	}
+
+	ret = __ceph_pool_perm_get(ci, pool);
+	if (ret < 0)
+		return ret;
+
+	flags = CEPH_I_POOL_PERM;
+	if (ret & POOL_READ)
+		flags |= CEPH_I_POOL_RD;
+	if (ret & POOL_WRITE)
+		flags |= CEPH_I_POOL_WR;
+
+	spin_lock(&ci->i_ceph_lock);
+	if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
+		ci->i_ceph_flags = flags;
+        } else {
+		pool = ceph_file_layout_pg_pool(ci->i_layout);
+		flags = ci->i_ceph_flags;
+	}
+	spin_unlock(&ci->i_ceph_lock);
+	goto check;
+}
+
+void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
+{
+	struct ceph_pool_perm *perm;
+	struct rb_node *n;
+
+	while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
+		n = rb_first(&mdsc->pool_perm_tree);
+		perm = rb_entry(n, struct ceph_pool_perm, node);
+		rb_erase(n, &mdsc->pool_perm_tree);
+		kfree(perm);
+	}
+}
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index be5ea6af8366..dc10c9dd36c1 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
 		used |= CEPH_CAP_PIN;
 	if (ci->i_rd_ref)
 		used |= CEPH_CAP_FILE_RD;
-	if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
+	if (ci->i_rdcache_ref ||
+	    (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
+	     ci->vfs_inode.i_data.nrpages))
 		used |= CEPH_CAP_FILE_CACHE;
 	if (ci->i_wr_ref)
 		used |= CEPH_CAP_FILE_WR;
@@ -926,16 +928,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 
 	/* remove from session list */
 	spin_lock(&session->s_cap_lock);
-	/*
-	 * s_cap_reconnect is protected by s_cap_lock. no one changes
-	 * s_cap_gen while session is in the reconnect state.
-	 */
-	if (queue_release &&
-	    (!session->s_cap_reconnect ||
-	     cap->cap_gen == session->s_cap_gen))
-		__queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
-				    cap->mseq, cap->issue_seq);
-
 	if (session->s_cap_iterator == cap) {
 		/* not yet, we are iterating over this very cap */
 		dout("__ceph_remove_cap  delaying %p removal from session %p\n",
@@ -948,6 +940,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 	}
 	/* protect backpointer with s_cap_lock: see iterate_session_caps */
 	cap->ci = NULL;
+
+	/*
+	 * s_cap_reconnect is protected by s_cap_lock. no one changes
+	 * s_cap_gen while session is in the reconnect state.
+	 */
+	if (queue_release &&
+	    (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
+		cap->queue_release = 1;
+		if (removed) {
+			list_add_tail(&cap->session_caps,
+				      &session->s_cap_releases);
+			session->s_num_cap_releases++;
+			removed = 0;
+		}
+	} else {
+		cap->queue_release = 0;
+	}
+	cap->cap_ino = ci->i_vino.ino;
+
 	spin_unlock(&session->s_cap_lock);
 
 	/* remove from inode list */
@@ -977,8 +988,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 static int send_cap_msg(struct ceph_mds_session *session,
 			u64 ino, u64 cid, int op,
 			int caps, int wanted, int dirty,
-			u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq,
-			u64 size, u64 max_size,
+			u32 seq, u64 flush_tid, u64 oldest_flush_tid,
+			u32 issue_seq, u32 mseq, u64 size, u64 max_size,
 			struct timespec *mtime, struct timespec *atime,
 			u64 time_warp_seq,
 			kuid_t uid, kgid_t gid, umode_t mode,
@@ -992,20 +1003,23 @@ static int send_cap_msg(struct ceph_mds_session *session,
 	size_t extra_len;
 
 	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
-	     " seq %u/%u mseq %u follows %lld size %llu/%llu"
+	     " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
 	     " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
 	     cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
 	     ceph_cap_string(dirty),
-	     seq, issue_seq, mseq, follows, size, max_size,
+	     seq, issue_seq, flush_tid, oldest_flush_tid,
+	     mseq, follows, size, max_size,
 	     xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
 
-	/* flock buffer size + inline version + inline data size */
-	extra_len = 4 + 8 + 4;
+	/* flock buffer size + inline version + inline data size +
+	 * osd_epoch_barrier + oldest_flush_tid */
+	extra_len = 4 + 8 + 4 + 4 + 8;
 	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
 			   GFP_NOFS, false);
 	if (!msg)
 		return -ENOMEM;
 
+	msg->hdr.version = cpu_to_le16(6);
 	msg->hdr.tid = cpu_to_le64(flush_tid);
 
 	fc = msg->front.iov_base;
@@ -1041,6 +1055,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
 	ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
 	/* inline data size */
 	ceph_encode_32(&p, 0);
+	/* osd_epoch_barrier */
+	ceph_encode_32(&p, 0);
+	/* oldest_flush_tid */
+	ceph_encode_64(&p, oldest_flush_tid);
 
 	fc->xattr_version = cpu_to_le64(xattr_version);
 	if (xattrs_buf) {
@@ -1053,44 +1071,6 @@ static int send_cap_msg(struct ceph_mds_session *session,
 	return 0;
 }
 
-void __queue_cap_release(struct ceph_mds_session *session,
-			 u64 ino, u64 cap_id, u32 migrate_seq,
-			 u32 issue_seq)
-{
-	struct ceph_msg *msg;
-	struct ceph_mds_cap_release *head;
-	struct ceph_mds_cap_item *item;
-
-	BUG_ON(!session->s_num_cap_releases);
-	msg = list_first_entry(&session->s_cap_releases,
-			       struct ceph_msg, list_head);
-
-	dout(" adding %llx release to mds%d msg %p (%d left)\n",
-	     ino, session->s_mds, msg, session->s_num_cap_releases);
-
-	BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE);
-	head = msg->front.iov_base;
-	le32_add_cpu(&head->num, 1);
-	item = msg->front.iov_base + msg->front.iov_len;
-	item->ino = cpu_to_le64(ino);
-	item->cap_id = cpu_to_le64(cap_id);
-	item->migrate_seq = cpu_to_le32(migrate_seq);
-	item->seq = cpu_to_le32(issue_seq);
-
-	session->s_num_cap_releases--;
-
-	msg->front.iov_len += sizeof(*item);
-	if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
-		dout(" release msg %p full\n", msg);
-		list_move_tail(&msg->list_head, &session->s_cap_releases_done);
-	} else {
-		dout(" release msg %p at %d/%d (%d)\n", msg,
-		     (int)le32_to_cpu(head->num),
-		     (int)CEPH_CAPS_PER_RELEASE,
-		     (int)msg->front.iov_len);
-	}
-}
-
 /*
  * Queue cap releases when an inode is dropped from our cache.  Since
  * inode is about to be destroyed, there is no need for i_ceph_lock.
@@ -1127,7 +1107,7 @@ void ceph_queue_caps_release(struct inode *inode)
  */
 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 		      int op, int used, int want, int retain, int flushing,
-		      unsigned *pflush_tid)
+		      u64 flush_tid, u64 oldest_flush_tid)
 	__releases(cap->ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = cap->ci;
@@ -1145,8 +1125,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	u64 xattr_version = 0;
 	struct ceph_buffer *xattr_blob = NULL;
 	int delayed = 0;
-	u64 flush_tid = 0;
-	int i;
 	int ret;
 	bool inline_data;
 
@@ -1190,26 +1168,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	cap->implemented &= cap->issued | used;
 	cap->mds_wanted = want;
 
-	if (flushing) {
-		/*
-		 * assign a tid for flush operations so we can avoid
-		 * flush1 -> dirty1 -> flush2 -> flushack1 -> mark
-		 * clean type races.  track latest tid for every bit
-		 * so we can handle flush AxFw, flush Fw, and have the
-		 * first ack clean Ax.
-		 */
-		flush_tid = ++ci->i_cap_flush_last_tid;
-		if (pflush_tid)
-			*pflush_tid = flush_tid;
-		dout(" cap_flush_tid %d\n", (int)flush_tid);
-		for (i = 0; i < CEPH_CAP_BITS; i++)
-			if (flushing & (1 << i))
-				ci->i_cap_flush_tid[i] = flush_tid;
-
-		follows = ci->i_head_snapc->seq;
-	} else {
-		follows = 0;
-	}
+	follows = flushing ? ci->i_head_snapc->seq : 0;
 
 	keep = cap->implemented;
 	seq = cap->seq;
@@ -1237,7 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	spin_unlock(&ci->i_ceph_lock);
 
 	ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
-		op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
+		op, keep, want, flushing, seq,
+		flush_tid, oldest_flush_tid, issue_seq, mseq,
 		size, max_size, &mtime, &atime, time_warp_seq,
 		uid, gid, mode, xattr_version, xattr_blob,
 		follows, inline_data);
@@ -1259,14 +1219,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
  * asynchronously back to the MDS once sync writes complete and dirty
  * data is written out.
  *
- * Unless @again is true, skip cap_snaps that were already sent to
+ * Unless @kick is true, skip cap_snaps that were already sent to
  * the MDS (i.e., during this session).
  *
  * Called under i_ceph_lock.  Takes s_mutex as needed.
  */
 void __ceph_flush_snaps(struct ceph_inode_info *ci,
 			struct ceph_mds_session **psession,
-			int again)
+			int kick)
 		__releases(ci->i_ceph_lock)
 		__acquires(ci->i_ceph_lock)
 {
@@ -1297,11 +1257,8 @@ retry:
 		if (capsnap->dirty_pages || capsnap->writing)
 			break;
 
-		/*
-		 * if cap writeback already occurred, we should have dropped
-		 * the capsnap in ceph_put_wrbuffer_cap_refs.
-		 */
-		BUG_ON(capsnap->dirty == 0);
+		/* should be removed by ceph_try_drop_cap_snap() */
+		BUG_ON(!capsnap->need_flush);
 
 		/* pick mds, take s_mutex */
 		if (ci->i_auth_cap == NULL) {
@@ -1310,7 +1267,7 @@ retry:
 		}
 
 		/* only flush each capsnap once */
-		if (!again && !list_empty(&capsnap->flushing_item)) {
+		if (!kick && !list_empty(&capsnap->flushing_item)) {
 			dout("already flushed %p, skipping\n", capsnap);
 			continue;
 		}
@@ -1320,6 +1277,9 @@ retry:
 
 		if (session && session->s_mds != mds) {
 			dout("oops, wrong session %p mutex\n", session);
+			if (kick)
+				goto out;
+
 			mutex_unlock(&session->s_mutex);
 			ceph_put_mds_session(session);
 			session = NULL;
@@ -1343,20 +1303,22 @@ retry:
 			goto retry;
 		}
 
-		capsnap->flush_tid = ++ci->i_cap_flush_last_tid;
+		spin_lock(&mdsc->cap_dirty_lock);
+		capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
+		spin_unlock(&mdsc->cap_dirty_lock);
+
 		atomic_inc(&capsnap->nref);
-		if (!list_empty(&capsnap->flushing_item))
-			list_del_init(&capsnap->flushing_item);
-		list_add_tail(&capsnap->flushing_item,
-			      &session->s_cap_snaps_flushing);
+		if (list_empty(&capsnap->flushing_item))
+			list_add_tail(&capsnap->flushing_item,
+				      &session->s_cap_snaps_flushing);
 		spin_unlock(&ci->i_ceph_lock);
 
 		dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
 		     inode, capsnap, capsnap->follows, capsnap->flush_tid);
 		send_cap_msg(session, ceph_vino(inode).ino, 0,
 			     CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
-			     capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
-			     capsnap->size, 0,
+			     capsnap->dirty, 0, capsnap->flush_tid, 0,
+			     0, mseq, capsnap->size, 0,
 			     &capsnap->mtime, &capsnap->atime,
 			     capsnap->time_warp_seq,
 			     capsnap->uid, capsnap->gid, capsnap->mode,
@@ -1396,7 +1358,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
  * Caller is then responsible for calling __mark_inode_dirty with the
  * returned flags value.
  */
-int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
+int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
+			   struct ceph_cap_flush **pcf)
 {
 	struct ceph_mds_client *mdsc =
 		ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
@@ -1416,9 +1379,14 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
 	     ceph_cap_string(was | mask));
 	ci->i_dirty_caps |= mask;
 	if (was == 0) {
-		if (!ci->i_head_snapc)
+		WARN_ON_ONCE(ci->i_prealloc_cap_flush);
+		swap(ci->i_prealloc_cap_flush, *pcf);
+
+		if (!ci->i_head_snapc) {
+			WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
 			ci->i_head_snapc = ceph_get_snap_context(
 				ci->i_snap_realm->cached_context);
+		}
 		dout(" inode %p now dirty snapc %p auth cap %p\n",
 		     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
 		BUG_ON(!list_empty(&ci->i_dirty_item));
@@ -1429,6 +1397,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
 			ihold(inode);
 			dirty |= I_DIRTY_SYNC;
 		}
+	} else {
+		WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
 	}
 	BUG_ON(list_empty(&ci->i_dirty_item));
 	if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
@@ -1438,6 +1408,74 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
 	return dirty;
 }
 
+static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
+					struct ceph_cap_flush *cf)
+{
+	struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
+	struct rb_node *parent = NULL;
+	struct ceph_cap_flush *other = NULL;
+
+	while (*p) {
+		parent = *p;
+		other = rb_entry(parent, struct ceph_cap_flush, i_node);
+
+		if (cf->tid < other->tid)
+			p = &(*p)->rb_left;
+		else if (cf->tid > other->tid)
+			p = &(*p)->rb_right;
+		else
+			BUG();
+	}
+
+	rb_link_node(&cf->i_node, parent, p);
+	rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
+}
+
+static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
+				       struct ceph_cap_flush *cf)
+{
+	struct rb_node **p = &mdsc->cap_flush_tree.rb_node;
+	struct rb_node *parent = NULL;
+	struct ceph_cap_flush *other = NULL;
+
+	while (*p) {
+		parent = *p;
+		other = rb_entry(parent, struct ceph_cap_flush, g_node);
+
+		if (cf->tid < other->tid)
+			p = &(*p)->rb_left;
+		else if (cf->tid > other->tid)
+			p = &(*p)->rb_right;
+		else
+			BUG();
+	}
+
+	rb_link_node(&cf->g_node, parent, p);
+	rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
+}
+
+struct ceph_cap_flush *ceph_alloc_cap_flush(void)
+{
+	return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+}
+
+void ceph_free_cap_flush(struct ceph_cap_flush *cf)
+{
+	if (cf)
+		kmem_cache_free(ceph_cap_flush_cachep, cf);
+}
+
+static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
+{
+	struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
+	if (n) {
+		struct ceph_cap_flush *cf =
+			rb_entry(n, struct ceph_cap_flush, g_node);
+		return cf->tid;
+	}
+	return 0;
+}
+
 /*
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
@@ -1445,14 +1483,17 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
  * Called under i_ceph_lock.
  */
 static int __mark_caps_flushing(struct inode *inode,
-				 struct ceph_mds_session *session)
+				struct ceph_mds_session *session,
+				u64 *flush_tid, u64 *oldest_flush_tid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_cap_flush *cf = NULL;
 	int flushing;
 
 	BUG_ON(ci->i_dirty_caps == 0);
 	BUG_ON(list_empty(&ci->i_dirty_item));
+	BUG_ON(!ci->i_prealloc_cap_flush);
 
 	flushing = ci->i_dirty_caps;
 	dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n",
@@ -1463,22 +1504,31 @@ static int __mark_caps_flushing(struct inode *inode,
 	ci->i_dirty_caps = 0;
 	dout(" inode %p now !dirty\n", inode);
 
+	swap(cf, ci->i_prealloc_cap_flush);
+	cf->caps = flushing;
+	cf->kick = false;
+
 	spin_lock(&mdsc->cap_dirty_lock);
 	list_del_init(&ci->i_dirty_item);
 
+	cf->tid = ++mdsc->last_cap_flush_tid;
+	__add_cap_flushing_to_mdsc(mdsc, cf);
+	*oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+
 	if (list_empty(&ci->i_flushing_item)) {
-		ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
 		list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
 		mdsc->num_cap_flushing++;
-		dout(" inode %p now flushing seq %lld\n", inode,
-		     ci->i_cap_flush_seq);
+		dout(" inode %p now flushing tid %llu\n", inode, cf->tid);
 	} else {
 		list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
-		dout(" inode %p now flushing (more) seq %lld\n", inode,
-		     ci->i_cap_flush_seq);
+		dout(" inode %p now flushing (more) tid %llu\n",
+		     inode, cf->tid);
 	}
 	spin_unlock(&mdsc->cap_dirty_lock);
 
+	__add_cap_flushing_to_inode(ci, cf);
+
+	*flush_tid = cf->tid;
 	return flushing;
 }
 
@@ -1524,6 +1574,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap *cap;
+	u64 flush_tid, oldest_flush_tid;
 	int file_wanted, used, cap_used;
 	int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
 	int issued, implemented, want, retain, revoking, flushing = 0;
@@ -1553,13 +1604,13 @@ retry:
 retry_locked:
 	file_wanted = __ceph_caps_file_wanted(ci);
 	used = __ceph_caps_used(ci);
-	want = file_wanted | used;
 	issued = __ceph_caps_issued(ci, &implemented);
 	revoking = implemented & ~issued;
 
-	retain = want | CEPH_CAP_PIN;
+	want = file_wanted;
+	retain = file_wanted | used | CEPH_CAP_PIN;
 	if (!mdsc->stopping && inode->i_nlink > 0) {
-		if (want) {
+		if (file_wanted) {
 			retain |= CEPH_CAP_ANY;       /* be greedy */
 		} else if (S_ISDIR(inode->i_mode) &&
 			   (issued & CEPH_CAP_FILE_SHARED) &&
@@ -1602,9 +1653,10 @@ retry_locked:
 	 * If we fail, it's because pages are locked.... try again later.
 	 */
 	if ((!is_delayed || mdsc->stopping) &&
-	    ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
-	    inode->i_data.nrpages &&                 /* have cached pages */
-	    (file_wanted == 0 ||                     /* no open files */
+	    !S_ISDIR(inode->i_mode) &&		/* ignore readdir cache */
+	    ci->i_wrbuffer_ref == 0 &&		/* no dirty pages... */
+	    inode->i_data.nrpages &&		/* have cached pages */
+	    (file_wanted == 0 ||		/* no open files */
 	     (revoking & (CEPH_CAP_FILE_CACHE|
 			  CEPH_CAP_FILE_LAZYIO))) && /*  or revoking cache */
 	    !tried_invalidate) {
@@ -1742,17 +1794,25 @@ ack:
 			took_snap_rwsem = 1;
 		}
 
-		if (cap == ci->i_auth_cap && ci->i_dirty_caps)
-			flushing = __mark_caps_flushing(inode, session);
-		else
+		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
+			flushing = __mark_caps_flushing(inode, session,
+							&flush_tid,
+							&oldest_flush_tid);
+		} else {
 			flushing = 0;
+			flush_tid = 0;
+			spin_lock(&mdsc->cap_dirty_lock);
+			oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+			spin_unlock(&mdsc->cap_dirty_lock);
+		}
 
 		mds = cap->mds;  /* remember mds, so we don't repeat */
 		sent++;
 
 		/* __send_cap drops i_ceph_lock */
 		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
-				      want, retain, flushing, NULL);
+				      want, retain, flushing,
+				      flush_tid, oldest_flush_tid);
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
@@ -1781,12 +1841,13 @@ ack:
 /*
  * Try to flush dirty caps back to the auth mds.
  */
-static int try_flush_caps(struct inode *inode, unsigned *flush_tid)
+static int try_flush_caps(struct inode *inode, u64 *ptid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int flushing = 0;
 	struct ceph_mds_session *session = NULL;
+	int flushing = 0;
+	u64 flush_tid = 0, oldest_flush_tid = 0;
 
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1811,42 +1872,54 @@ retry:
 		if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
 			goto out;
 
-		flushing = __mark_caps_flushing(inode, session);
+		flushing = __mark_caps_flushing(inode, session, &flush_tid,
+						&oldest_flush_tid);
 
 		/* __send_cap drops i_ceph_lock */
 		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
-				     cap->issued | cap->implemented, flushing,
-				     flush_tid);
-		if (!delayed)
-			goto out_unlocked;
+				     (cap->issued | cap->implemented),
+				     flushing, flush_tid, oldest_flush_tid);
 
-		spin_lock(&ci->i_ceph_lock);
-		__cap_delay_requeue(mdsc, ci);
+		if (delayed) {
+			spin_lock(&ci->i_ceph_lock);
+			__cap_delay_requeue(mdsc, ci);
+			spin_unlock(&ci->i_ceph_lock);
+		}
+	} else {
+		struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
+		if (n) {
+			struct ceph_cap_flush *cf =
+				rb_entry(n, struct ceph_cap_flush, i_node);
+			flush_tid = cf->tid;
+		}
+		flushing = ci->i_flushing_caps;
+		spin_unlock(&ci->i_ceph_lock);
 	}
 out:
-	spin_unlock(&ci->i_ceph_lock);
-out_unlocked:
 	if (session)
 		mutex_unlock(&session->s_mutex);
+
+	*ptid = flush_tid;
 	return flushing;
 }
 
 /*
  * Return true if we've flushed caps through the given flush_tid.
  */
-static int caps_are_flushed(struct inode *inode, unsigned tid)
+static int caps_are_flushed(struct inode *inode, u64 flush_tid)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int i, ret = 1;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+	int ret = 1;
 
 	spin_lock(&ci->i_ceph_lock);
-	for (i = 0; i < CEPH_CAP_BITS; i++)
-		if ((ci->i_flushing_caps & (1 << i)) &&
-		    ci->i_cap_flush_tid[i] <= tid) {
-			/* still flushing this bit */
+	n = rb_first(&ci->i_cap_flush_tree);
+	if (n) {
+		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+		if (cf->tid <= flush_tid)
 			ret = 0;
-			break;
-		}
+	}
 	spin_unlock(&ci->i_ceph_lock);
 	return ret;
 }
@@ -1864,13 +1937,16 @@ static void sync_write_wait(struct inode *inode)
 	struct ceph_osd_request *req;
 	u64 last_tid;
 
+	if (!S_ISREG(inode->i_mode))
+		return;
+
 	spin_lock(&ci->i_unsafe_lock);
 	if (list_empty(head))
 		goto out;
 
 	/* set upper bound as _last_ entry in chain */
-	req = list_entry(head->prev, struct ceph_osd_request,
-			 r_unsafe_item);
+	req = list_last_entry(head, struct ceph_osd_request,
+			      r_unsafe_item);
 	last_tid = req->r_tid;
 
 	do {
@@ -1888,18 +1964,64 @@ static void sync_write_wait(struct inode *inode)
 		 */
 		if (list_empty(head))
 			break;
-		req = list_entry(head->next, struct ceph_osd_request,
-				 r_unsafe_item);
+		req = list_first_entry(head, struct ceph_osd_request,
+				       r_unsafe_item);
 	} while (req->r_tid < last_tid);
 out:
 	spin_unlock(&ci->i_unsafe_lock);
 }
 
+/*
+ * wait for any uncommitted directory operations to commit.
+ */
+static int unsafe_dirop_wait(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct list_head *head = &ci->i_unsafe_dirops;
+	struct ceph_mds_request *req;
+	u64 last_tid;
+	int ret = 0;
+
+	if (!S_ISDIR(inode->i_mode))
+		return 0;
+
+	spin_lock(&ci->i_unsafe_lock);
+	if (list_empty(head))
+		goto out;
+
+	req = list_last_entry(head, struct ceph_mds_request,
+			      r_unsafe_dir_item);
+	last_tid = req->r_tid;
+
+	do {
+		ceph_mdsc_get_request(req);
+		spin_unlock(&ci->i_unsafe_lock);
+
+		dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n",
+		     inode, req->r_tid, last_tid);
+		ret = !wait_for_completion_timeout(&req->r_safe_completion,
+					ceph_timeout_jiffies(req->r_timeout));
+		if (ret)
+			ret = -EIO;  /* timed out */
+
+		ceph_mdsc_put_request(req);
+
+		spin_lock(&ci->i_unsafe_lock);
+		if (ret || list_empty(head))
+			break;
+		req = list_first_entry(head, struct ceph_mds_request,
+				       r_unsafe_dir_item);
+	} while (req->r_tid < last_tid);
+out:
+	spin_unlock(&ci->i_unsafe_lock);
+	return ret;
+}
+
 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
 	struct inode *inode = file->f_mapping->host;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	unsigned flush_tid;
+	u64 flush_tid;
 	int ret;
 	int dirty;
 
@@ -1908,25 +2030,30 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 
 	ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
 	if (ret < 0)
-		return ret;
+		goto out;
+
+	if (datasync)
+		goto out;
+
 	mutex_lock(&inode->i_mutex);
 
 	dirty = try_flush_caps(inode, &flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
+	ret = unsafe_dirop_wait(inode);
+
 	/*
 	 * only wait on non-file metadata writeback (the mds
 	 * can recover size and mtime, so we don't need to
 	 * wait for that)
 	 */
-	if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
-		dout("fsync waiting for flush_tid %u\n", flush_tid);
+	if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
 		ret = wait_event_interruptible(ci->i_cap_wq,
-				       caps_are_flushed(inode, flush_tid));
+					caps_are_flushed(inode, flush_tid));
 	}
-
-	dout("fsync %p%s done\n", inode, datasync ? " datasync" : "");
 	mutex_unlock(&inode->i_mutex);
+out:
+	dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
 	return ret;
 }
 
@@ -1939,7 +2066,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	unsigned flush_tid;
+	u64 flush_tid;
 	int err = 0;
 	int dirty;
 	int wait = wbc->sync_mode == WB_SYNC_ALL;
@@ -1994,6 +2121,104 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
 	}
 }
 
+static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
+				struct ceph_mds_session *session,
+				struct ceph_inode_info *ci,
+				bool kick_all)
+{
+	struct inode *inode = &ci->vfs_inode;
+	struct ceph_cap *cap;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+	int delayed = 0;
+	u64 first_tid = 0;
+	u64 oldest_flush_tid;
+
+	spin_lock(&mdsc->cap_dirty_lock);
+	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+	spin_unlock(&mdsc->cap_dirty_lock);
+
+	while (true) {
+		spin_lock(&ci->i_ceph_lock);
+		cap = ci->i_auth_cap;
+		if (!(cap && cap->session == session)) {
+			pr_err("%p auth cap %p not mds%d ???\n", inode,
+					cap, session->s_mds);
+			spin_unlock(&ci->i_ceph_lock);
+			break;
+		}
+
+		for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
+			cf = rb_entry(n, struct ceph_cap_flush, i_node);
+			if (cf->tid < first_tid)
+				continue;
+			if (kick_all || cf->kick)
+				break;
+		}
+		if (!n) {
+			spin_unlock(&ci->i_ceph_lock);
+			break;
+		}
+
+		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+		cf->kick = false;
+
+		first_tid = cf->tid + 1;
+
+		dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
+		     cap, cf->tid, ceph_cap_string(cf->caps));
+		delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+				      __ceph_caps_used(ci),
+				      __ceph_caps_wanted(ci),
+				      cap->issued | cap->implemented,
+				      cf->caps, cf->tid, oldest_flush_tid);
+	}
+	return delayed;
+}
+
+void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
+{
+	struct ceph_inode_info *ci;
+	struct ceph_cap *cap;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+
+	dout("early_kick_flushing_caps mds%d\n", session->s_mds);
+	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
+		spin_lock(&ci->i_ceph_lock);
+		cap = ci->i_auth_cap;
+		if (!(cap && cap->session == session)) {
+			pr_err("%p auth cap %p not mds%d ???\n",
+				&ci->vfs_inode, cap, session->s_mds);
+			spin_unlock(&ci->i_ceph_lock);
+			continue;
+		}
+
+
+		/*
+		 * if flushing caps were revoked, we re-send the cap flush
+		 * in client reconnect stage. This guarantees MDS * processes
+		 * the cap flush message before issuing the flushing caps to
+		 * other client.
+		 */
+		if ((cap->issued & ci->i_flushing_caps) !=
+		    ci->i_flushing_caps) {
+			spin_unlock(&ci->i_ceph_lock);
+			if (!__kick_flushing_caps(mdsc, session, ci, true))
+				continue;
+			spin_lock(&ci->i_ceph_lock);
+		}
+
+		for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
+			cf = rb_entry(n, struct ceph_cap_flush, i_node);
+			cf->kick = true;
+		}
+
+		spin_unlock(&ci->i_ceph_lock);
+	}
+}
+
 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 			     struct ceph_mds_session *session)
 {
@@ -2003,28 +2228,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 
 	dout("kick_flushing_caps mds%d\n", session->s_mds);
 	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
-		struct inode *inode = &ci->vfs_inode;
-		struct ceph_cap *cap;
-		int delayed = 0;
-
-		spin_lock(&ci->i_ceph_lock);
-		cap = ci->i_auth_cap;
-		if (cap && cap->session == session) {
-			dout("kick_flushing_caps %p cap %p %s\n", inode,
-			     cap, ceph_cap_string(ci->i_flushing_caps));
-			delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-					     __ceph_caps_used(ci),
-					     __ceph_caps_wanted(ci),
-					     cap->issued | cap->implemented,
-					     ci->i_flushing_caps, NULL);
-			if (delayed) {
-				spin_lock(&ci->i_ceph_lock);
-				__cap_delay_requeue(mdsc, ci);
-				spin_unlock(&ci->i_ceph_lock);
-			}
-		} else {
-			pr_err("%p auth cap %p not mds%d ???\n", inode,
-			       cap, session->s_mds);
+		int delayed = __kick_flushing_caps(mdsc, session, ci, false);
+		if (delayed) {
+			spin_lock(&ci->i_ceph_lock);
+			__cap_delay_requeue(mdsc, ci);
 			spin_unlock(&ci->i_ceph_lock);
 		}
 	}
@@ -2036,26 +2243,25 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_cap *cap;
-	int delayed = 0;
 
 	spin_lock(&ci->i_ceph_lock);
 	cap = ci->i_auth_cap;
-	dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
-	     ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);
+	dout("kick_flushing_inode_caps %p flushing %s\n", inode,
+	     ceph_cap_string(ci->i_flushing_caps));
 
 	__ceph_flush_snaps(ci, &session, 1);
 
 	if (ci->i_flushing_caps) {
+		int delayed;
+
 		spin_lock(&mdsc->cap_dirty_lock);
 		list_move_tail(&ci->i_flushing_item,
 			       &cap->session->s_cap_flushing);
 		spin_unlock(&mdsc->cap_dirty_lock);
 
-		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-				     __ceph_caps_used(ci),
-				     __ceph_caps_wanted(ci),
-				     cap->issued | cap->implemented,
-				     ci->i_flushing_caps, NULL);
+		spin_unlock(&ci->i_ceph_lock);
+
+		delayed = __kick_flushing_caps(mdsc, session, ci, true);
 		if (delayed) {
 			spin_lock(&ci->i_ceph_lock);
 			__cap_delay_requeue(mdsc, ci);
@@ -2073,7 +2279,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
  *
  * Protected by i_ceph_lock.
  */
-static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+static void __take_cap_refs(struct ceph_inode_info *ci, int got,
+			    bool snap_rwsem_locked)
 {
 	if (got & CEPH_CAP_PIN)
 		ci->i_pin_ref++;
@@ -2081,8 +2288,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
 		ci->i_rd_ref++;
 	if (got & CEPH_CAP_FILE_CACHE)
 		ci->i_rdcache_ref++;
-	if (got & CEPH_CAP_FILE_WR)
+	if (got & CEPH_CAP_FILE_WR) {
+		if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
+			BUG_ON(!snap_rwsem_locked);
+			ci->i_head_snapc = ceph_get_snap_context(
+					ci->i_snap_realm->cached_context);
+		}
 		ci->i_wr_ref++;
+	}
 	if (got & CEPH_CAP_FILE_BUFFER) {
 		if (ci->i_wb_ref == 0)
 			ihold(&ci->vfs_inode);
@@ -2100,16 +2313,19 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
  * requested from the MDS.
  */
 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-			    loff_t endoff, int *got, int *check_max, int *err)
+			    loff_t endoff, bool nonblock, int *got, int *err)
 {
 	struct inode *inode = &ci->vfs_inode;
+	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	int ret = 0;
 	int have, implemented;
 	int file_wanted;
+	bool snap_rwsem_locked = false;
 
 	dout("get_cap_refs %p need %s want %s\n", inode,
 	     ceph_cap_string(need), ceph_cap_string(want));
 
+again:
 	spin_lock(&ci->i_ceph_lock);
 
 	/* make sure file is actually open */
@@ -2125,6 +2341,10 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
 	/* finish pending truncate */
 	while (ci->i_truncate_pending) {
 		spin_unlock(&ci->i_ceph_lock);
+		if (snap_rwsem_locked) {
+			up_read(&mdsc->snap_rwsem);
+			snap_rwsem_locked = false;
+		}
 		__ceph_do_pending_vmtruncate(inode);
 		spin_lock(&ci->i_ceph_lock);
 	}
@@ -2136,7 +2356,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
 			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
 			     inode, endoff, ci->i_max_size);
 			if (endoff > ci->i_requested_max_size) {
-				*check_max = 1;
+				*err = -EAGAIN;
 				ret = 1;
 			}
 			goto out_unlock;
@@ -2164,8 +2384,29 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
 		     inode, ceph_cap_string(have), ceph_cap_string(not),
 		     ceph_cap_string(revoking));
 		if ((revoking & not) == 0) {
+			if (!snap_rwsem_locked &&
+			    !ci->i_head_snapc &&
+			    (need & CEPH_CAP_FILE_WR)) {
+				if (!down_read_trylock(&mdsc->snap_rwsem)) {
+					/*
+					 * we can not call down_read() when
+					 * task isn't in TASK_RUNNING state
+					 */
+					if (nonblock) {
+						*err = -EAGAIN;
+						ret = 1;
+						goto out_unlock;
+					}
+
+					spin_unlock(&ci->i_ceph_lock);
+					down_read(&mdsc->snap_rwsem);
+					snap_rwsem_locked = true;
+					goto again;
+				}
+				snap_rwsem_locked = true;
+			}
 			*got = need | (have & want);
-			__take_cap_refs(ci, *got);
+			__take_cap_refs(ci, *got, true);
 			ret = 1;
 		}
 	} else {
@@ -2189,6 +2430,8 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
 	}
 out_unlock:
 	spin_unlock(&ci->i_ceph_lock);
+	if (snap_rwsem_locked)
+		up_read(&mdsc->snap_rwsem);
 
 	dout("get_cap_refs %p ret %d got %s\n", inode,
 	     ret, ceph_cap_string(*got));
@@ -2231,50 +2474,70 @@ static void check_max_size(struct inode *inode, loff_t endoff)
 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
 		  loff_t endoff, int *got, struct page **pinned_page)
 {
-	int _got, check_max, ret, err = 0;
+	int _got, ret, err = 0;
 
-retry:
-	if (endoff > 0)
-		check_max_size(&ci->vfs_inode, endoff);
-	_got = 0;
-	check_max = 0;
-	ret = wait_event_interruptible(ci->i_cap_wq,
-				try_get_cap_refs(ci, need, want, endoff,
-						 &_got, &check_max, &err));
-	if (err)
-		ret = err;
+	ret = ceph_pool_perm_check(ci, need);
 	if (ret < 0)
 		return ret;
 
-	if (check_max)
-		goto retry;
+	while (true) {
+		if (endoff > 0)
+			check_max_size(&ci->vfs_inode, endoff);
 
-	if (ci->i_inline_version != CEPH_INLINE_NONE &&
-	    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
-	    i_size_read(&ci->vfs_inode) > 0) {
-		struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
-		if (page) {
-			if (PageUptodate(page)) {
-				*pinned_page = page;
-				goto out;
-			}
-			page_cache_release(page);
-		}
-		/*
-		 * drop cap refs first because getattr while holding
-		 * caps refs can cause deadlock.
-		 */
-		ceph_put_cap_refs(ci, _got);
+		err = 0;
 		_got = 0;
+		ret = try_get_cap_refs(ci, need, want, endoff,
+				       false, &_got, &err);
+		if (ret) {
+			if (err == -EAGAIN)
+				continue;
+			if (err < 0)
+				return err;
+		} else {
+			ret = wait_event_interruptible(ci->i_cap_wq,
+					try_get_cap_refs(ci, need, want, endoff,
+							 true, &_got, &err));
+			if (err == -EAGAIN)
+				continue;
+			if (err < 0)
+				ret = err;
+			if (ret < 0)
+				return ret;
+		}
 
-		/* getattr request will bring inline data into page cache */
-		ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
-					CEPH_STAT_CAP_INLINE_DATA, true);
-		if (ret < 0)
-			return ret;
-		goto retry;
+		if (ci->i_inline_version != CEPH_INLINE_NONE &&
+		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+		    i_size_read(&ci->vfs_inode) > 0) {
+			struct page *page =
+				find_get_page(ci->vfs_inode.i_mapping, 0);
+			if (page) {
+				if (PageUptodate(page)) {
+					*pinned_page = page;
+					break;
+				}
+				page_cache_release(page);
+			}
+			/*
+			 * drop cap refs first because getattr while
+			 * holding * caps refs can cause deadlock.
+			 */
+			ceph_put_cap_refs(ci, _got);
+			_got = 0;
+
+			/*
+			 * getattr request will bring inline data into
+			 * page cache
+			 */
+			ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+						CEPH_STAT_CAP_INLINE_DATA,
+						true);
+			if (ret < 0)
+				return ret;
+			continue;
+		}
+		break;
 	}
-out:
+
 	*got = _got;
 	return 0;
 }
@@ -2286,10 +2549,31 @@ out:
 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
 {
 	spin_lock(&ci->i_ceph_lock);
-	__take_cap_refs(ci, caps);
+	__take_cap_refs(ci, caps, false);
 	spin_unlock(&ci->i_ceph_lock);
 }
 
+
+/*
+ * drop cap_snap that is not associated with any snapshot.
+ * we don't need to send FLUSHSNAP message for it.
+ */
+static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap)
+{
+	if (!capsnap->need_flush &&
+	    !capsnap->writing && !capsnap->dirty_pages) {
+
+		dout("dropping cap_snap %p follows %llu\n",
+		     capsnap, capsnap->follows);
+		ceph_put_snap_context(capsnap->context);
+		list_del(&capsnap->ci_item);
+		list_del(&capsnap->flushing_item);
+		ceph_put_cap_snap(capsnap);
+		return 1;
+	}
+	return 0;
+}
+
 /*
  * Release cap refs.
  *
@@ -2303,7 +2587,6 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 {
 	struct inode *inode = &ci->vfs_inode;
 	int last = 0, put = 0, flushsnaps = 0, wake = 0;
-	struct ceph_cap_snap *capsnap;
 
 	spin_lock(&ci->i_ceph_lock);
 	if (had & CEPH_CAP_PIN)
@@ -2325,17 +2608,24 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 	if (had & CEPH_CAP_FILE_WR)
 		if (--ci->i_wr_ref == 0) {
 			last++;
-			if (!list_empty(&ci->i_cap_snaps)) {
-				capsnap = list_first_entry(&ci->i_cap_snaps,
-						     struct ceph_cap_snap,
-						     ci_item);
-				if (capsnap->writing) {
-					capsnap->writing = 0;
-					flushsnaps =
-						__ceph_finish_cap_snap(ci,
-								       capsnap);
-					wake = 1;
-				}
+			if (__ceph_have_pending_cap_snap(ci)) {
+				struct ceph_cap_snap *capsnap =
+					list_last_entry(&ci->i_cap_snaps,
+							struct ceph_cap_snap,
+							ci_item);
+				capsnap->writing = 0;
+				if (ceph_try_drop_cap_snap(capsnap))
+					put++;
+				else if (__ceph_finish_cap_snap(ci, capsnap))
+					flushsnaps = 1;
+				wake = 1;
+			}
+			if (ci->i_wrbuffer_ref_head == 0 &&
+			    ci->i_dirty_caps == 0 &&
+			    ci->i_flushing_caps == 0) {
+				BUG_ON(!ci->i_head_snapc);
+				ceph_put_snap_context(ci->i_head_snapc);
+				ci->i_head_snapc = NULL;
 			}
 			/* see comment in __ceph_remove_cap() */
 			if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
@@ -2352,7 +2642,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 		ceph_flush_snaps(ci);
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
-	if (put)
+	while (put-- > 0)
 		iput(inode);
 }
 
@@ -2380,7 +2670,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 	if (ci->i_head_snapc == snapc) {
 		ci->i_wrbuffer_ref_head -= nr;
 		if (ci->i_wrbuffer_ref_head == 0 &&
-		    ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
+		    ci->i_wr_ref == 0 &&
+		    ci->i_dirty_caps == 0 &&
+		    ci->i_flushing_caps == 0) {
 			BUG_ON(!ci->i_head_snapc);
 			ceph_put_snap_context(ci->i_head_snapc);
 			ci->i_head_snapc = NULL;
@@ -2401,25 +2693,15 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 		capsnap->dirty_pages -= nr;
 		if (capsnap->dirty_pages == 0) {
 			complete_capsnap = 1;
-			if (capsnap->dirty == 0)
-				/* cap writeback completed before we created
-				 * the cap_snap; no FLUSHSNAP is needed */
-				drop_capsnap = 1;
+			drop_capsnap = ceph_try_drop_cap_snap(capsnap);
 		}
 		dout("put_wrbuffer_cap_refs on %p cap_snap %p "
-		     " snap %lld %d/%d -> %d/%d %s%s%s\n",
+		     " snap %lld %d/%d -> %d/%d %s%s\n",
 		     inode, capsnap, capsnap->context->seq,
 		     ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
 		     ci->i_wrbuffer_ref, capsnap->dirty_pages,
 		     last ? " (wrbuffer last)" : "",
-		     complete_capsnap ? " (complete capsnap)" : "",
-		     drop_capsnap ? " (drop capsnap)" : "");
-		if (drop_capsnap) {
-			ceph_put_snap_context(capsnap->context);
-			list_del(&capsnap->ci_item);
-			list_del(&capsnap->flushing_item);
-			ceph_put_cap_snap(capsnap);
-		}
+		     complete_capsnap ? " (complete capsnap)" : "");
 	}
 
 	spin_unlock(&ci->i_ceph_lock);
@@ -2526,7 +2808,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 	 * try to invalidate (once).  (If there are dirty buffers, we
 	 * will invalidate _after_ writeback.)
 	 */
-	if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+	if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
+	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
 	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
 	    !ci->i_wrbuffer_ref) {
 		if (try_nonblocking_invalidate(inode)) {
@@ -2732,16 +3015,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+	LIST_HEAD(to_remove);
 	unsigned seq = le32_to_cpu(m->seq);
 	int dirty = le32_to_cpu(m->dirty);
 	int cleaned = 0;
 	int drop = 0;
-	int i;
 
-	for (i = 0; i < CEPH_CAP_BITS; i++)
-		if ((dirty & (1 << i)) &&
-		    (u16)flush_tid == ci->i_cap_flush_tid[i])
-			cleaned |= 1 << i;
+	n = rb_first(&ci->i_cap_flush_tree);
+	while (n) {
+		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+		n = rb_next(&cf->i_node);
+		if (cf->tid == flush_tid)
+			cleaned = cf->caps;
+		if (cf->tid <= flush_tid) {
+			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+			list_add_tail(&cf->list, &to_remove);
+		} else {
+			cleaned &= ~cf->caps;
+			if (!cleaned)
+				break;
+		}
+	}
 
 	dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
 	     " flushing %s -> %s\n",
@@ -2749,12 +3045,23 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 	     ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps),
 	     ceph_cap_string(ci->i_flushing_caps & ~cleaned));
 
-	if (ci->i_flushing_caps == (ci->i_flushing_caps & ~cleaned))
+	if (list_empty(&to_remove) && !cleaned)
 		goto out;
 
 	ci->i_flushing_caps &= ~cleaned;
 
 	spin_lock(&mdsc->cap_dirty_lock);
+
+	if (!list_empty(&to_remove)) {
+		list_for_each_entry(cf, &to_remove, list)
+			rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+
+		n = rb_first(&mdsc->cap_flush_tree);
+		cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
+		if (!cf || cf->tid > flush_tid)
+			wake_up_all(&mdsc->cap_flushing_wq);
+	}
+
 	if (ci->i_flushing_caps == 0) {
 		list_del_init(&ci->i_flushing_item);
 		if (!list_empty(&session->s_cap_flushing))
@@ -2764,14 +3071,14 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 					 struct ceph_inode_info,
 					 i_flushing_item)->vfs_inode);
 		mdsc->num_cap_flushing--;
-		wake_up_all(&mdsc->cap_flushing_wq);
 		dout(" inode %p now !flushing\n", inode);
 
 		if (ci->i_dirty_caps == 0) {
 			dout(" inode %p now clean\n", inode);
 			BUG_ON(!list_empty(&ci->i_dirty_item));
 			drop = 1;
-			if (ci->i_wrbuffer_ref_head == 0) {
+			if (ci->i_wr_ref == 0 &&
+			    ci->i_wrbuffer_ref_head == 0) {
 				BUG_ON(!ci->i_head_snapc);
 				ceph_put_snap_context(ci->i_head_snapc);
 				ci->i_head_snapc = NULL;
@@ -2785,6 +3092,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 
 out:
 	spin_unlock(&ci->i_ceph_lock);
+
+	while (!list_empty(&to_remove)) {
+		cf = list_first_entry(&to_remove,
+				      struct ceph_cap_flush, list);
+		list_del(&cf->list);
+		ceph_free_cap_flush(cf);
+	}
 	if (drop)
 		iput(inode);
 }
@@ -2800,6 +3114,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
 				     struct ceph_mds_session *session)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	u64 follows = le64_to_cpu(m->snap_follows);
 	struct ceph_cap_snap *capsnap;
 	int drop = 0;
@@ -2823,6 +3138,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
 			list_del(&capsnap->ci_item);
 			list_del(&capsnap->flushing_item);
 			ceph_put_cap_snap(capsnap);
+			wake_up_all(&mdsc->cap_flushing_wq);
 			drop = 1;
 			break;
 		} else {
@@ -2971,7 +3287,6 @@ retry:
 			mutex_lock_nested(&session->s_mutex,
 					  SINGLE_DEPTH_NESTING);
 		}
-		ceph_add_cap_releases(mdsc, tsession);
 		new_cap = ceph_get_cap(mdsc, NULL);
 	} else {
 		WARN_ON(1);
@@ -3167,16 +3482,20 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
 	     (unsigned)seq);
 
-	if (op == CEPH_CAP_OP_IMPORT)
-		ceph_add_cap_releases(mdsc, session);
-
 	if (!inode) {
 		dout(" i don't have ino %llx\n", vino.ino);
 
 		if (op == CEPH_CAP_OP_IMPORT) {
+			cap = ceph_get_cap(mdsc, NULL);
+			cap->cap_ino = vino.ino;
+			cap->queue_release = 1;
+			cap->cap_id = cap_id;
+			cap->mseq = mseq;
+			cap->seq = seq;
 			spin_lock(&session->s_cap_lock);
-			__queue_cap_release(session, vino.ino, cap_id,
-					    mseq, seq);
+			list_add_tail(&cap->session_caps,
+					&session->s_cap_releases);
+			session->s_num_cap_releases++;
 			spin_unlock(&session->s_cap_lock);
 		}
 		goto flush_cap_releases;
@@ -3252,11 +3571,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
 flush_cap_releases:
 	/*
-	 * send any full release message to try to move things
+	 * send any cap release message to try to move things
 	 * along for the mds (who clearly thinks we still have this
 	 * cap).
 	 */
-	ceph_add_cap_releases(mdsc, session);
 	ceph_send_cap_releases(mdsc, session);
 
 done:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 4248307fea90..9314b4ea2375 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry)
 	if (dentry->d_fsdata)
 		return 0;
 
-	di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO);
+	di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO);
 	if (!di)
 		return -ENOMEM;          /* oh well */
 
@@ -107,6 +107,27 @@ static int fpos_cmp(loff_t l, loff_t r)
 }
 
 /*
+ * make note of the last dentry we read, so we can
+ * continue at the same lexicographical point,
+ * regardless of what dir changes take place on the
+ * server.
+ */
+static int note_last_dentry(struct ceph_file_info *fi, const char *name,
+		            int len, unsigned next_offset)
+{
+	char *buf = kmalloc(len+1, GFP_KERNEL);
+	if (!buf)
+		return -ENOMEM;
+	kfree(fi->last_name);
+	fi->last_name = buf;
+	memcpy(fi->last_name, name, len);
+	fi->last_name[len] = 0;
+	fi->next_offset = next_offset;
+	dout("note_last_dentry '%s'\n", fi->last_name);
+	return 0;
+}
+
+/*
  * When possible, we try to satisfy a readdir by peeking at the
  * dcache.  We make this work by carefully ordering dentries on
  * d_child when we initially get results back from the MDS, and
@@ -123,123 +144,113 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
 	struct ceph_file_info *fi = file->private_data;
 	struct dentry *parent = file->f_path.dentry;
 	struct inode *dir = d_inode(parent);
-	struct list_head *p;
-	struct dentry *dentry, *last;
+	struct dentry *dentry, *last = NULL;
 	struct ceph_dentry_info *di;
+	unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry *);
 	int err = 0;
+	loff_t ptr_pos = 0;
+	struct ceph_readdir_cache_control cache_ctl = {};
 
-	/* claim ref on last dentry we returned */
-	last = fi->dentry;
-	fi->dentry = NULL;
-
-	dout("__dcache_readdir %p v%u at %llu (last %p)\n",
-	     dir, shared_gen, ctx->pos, last);
+	dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
 
-	spin_lock(&parent->d_lock);
-
-	/* start at beginning? */
-	if (ctx->pos == 2 || last == NULL ||
-	    fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) {
-		if (list_empty(&parent->d_subdirs))
-			goto out_unlock;
-		p = parent->d_subdirs.prev;
-		dout(" initial p %p/%p\n", p->prev, p->next);
-	} else {
-		p = last->d_child.prev;
+	/* we can calculate cache index for the first dirfrag */
+	if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) {
+		cache_ctl.index = fpos_off(ctx->pos) - 2;
+		BUG_ON(cache_ctl.index < 0);
+		ptr_pos = cache_ctl.index * sizeof(struct dentry *);
 	}
 
-more:
-	dentry = list_entry(p, struct dentry, d_child);
-	di = ceph_dentry(dentry);
-	while (1) {
-		dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next,
-		     d_unhashed(dentry) ? "!hashed" : "hashed",
-		     parent->d_subdirs.prev, parent->d_subdirs.next);
-		if (p == &parent->d_subdirs) {
+	while (true) {
+		pgoff_t pgoff;
+		bool emit_dentry;
+
+		if (ptr_pos >= i_size_read(dir)) {
 			fi->flags |= CEPH_F_ATEND;
-			goto out_unlock;
+			err = 0;
+			break;
+		}
+
+		err = -EAGAIN;
+		pgoff = ptr_pos >> PAGE_CACHE_SHIFT;
+		if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) {
+			ceph_readdir_cache_release(&cache_ctl);
+			cache_ctl.page = find_lock_page(&dir->i_data, pgoff);
+			if (!cache_ctl.page) {
+				dout(" page %lu not found\n", pgoff);
+				break;
+			}
+			/* reading/filling the cache are serialized by
+			 * i_mutex, no need to use page lock */
+			unlock_page(cache_ctl.page);
+			cache_ctl.dentries = kmap(cache_ctl.page);
 		}
-		spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
+
+		rcu_read_lock();
+		spin_lock(&parent->d_lock);
+		/* check i_size again here, because empty directory can be
+		 * marked as complete while not holding the i_mutex. */
+		if (ceph_dir_is_complete_ordered(dir) &&
+		    ptr_pos < i_size_read(dir))
+			dentry = cache_ctl.dentries[cache_ctl.index % nsize];
+		else
+			dentry = NULL;
+		spin_unlock(&parent->d_lock);
+		if (dentry && !lockref_get_not_dead(&dentry->d_lockref))
+			dentry = NULL;
+		rcu_read_unlock();
+		if (!dentry)
+			break;
+
+		emit_dentry = false;
+		di = ceph_dentry(dentry);
+		spin_lock(&dentry->d_lock);
 		if (di->lease_shared_gen == shared_gen &&
-		    !d_unhashed(dentry) && d_really_is_positive(dentry) &&
+		    d_really_is_positive(dentry) &&
 		    ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR &&
 		    ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&
-		    fpos_cmp(ctx->pos, di->offset) <= 0)
-			break;
-		dout(" skipping %p %pd at %llu (%llu)%s%s\n", dentry,
-		     dentry, di->offset,
-		     ctx->pos, d_unhashed(dentry) ? " unhashed" : "",
-		     !d_inode(dentry) ? " null" : "");
+		    fpos_cmp(ctx->pos, di->offset) <= 0) {
+			emit_dentry = true;
+		}
 		spin_unlock(&dentry->d_lock);
-		p = p->prev;
-		dentry = list_entry(p, struct dentry, d_child);
-		di = ceph_dentry(dentry);
-	}
-
-	dget_dlock(dentry);
-	spin_unlock(&dentry->d_lock);
-	spin_unlock(&parent->d_lock);
 
-	/* make sure a dentry wasn't dropped while we didn't have parent lock */
-	if (!ceph_dir_is_complete_ordered(dir)) {
-		dout(" lost dir complete on %p; falling back to mds\n", dir);
-		dput(dentry);
-		err = -EAGAIN;
-		goto out;
-	}
+		if (emit_dentry) {
+			dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+			     dentry, dentry, d_inode(dentry));
+			ctx->pos = di->offset;
+			if (!dir_emit(ctx, dentry->d_name.name,
+				      dentry->d_name.len,
+				      ceph_translate_ino(dentry->d_sb,
+							 d_inode(dentry)->i_ino),
+				      d_inode(dentry)->i_mode >> 12)) {
+				dput(dentry);
+				err = 0;
+				break;
+			}
+			ctx->pos++;
 
-	dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
-	     dentry, dentry, d_inode(dentry));
-	if (!dir_emit(ctx, dentry->d_name.name,
-		      dentry->d_name.len,
-		      ceph_translate_ino(dentry->d_sb, d_inode(dentry)->i_ino),
-		      d_inode(dentry)->i_mode >> 12)) {
-		if (last) {
-			/* remember our position */
-			fi->dentry = last;
-			fi->next_offset = fpos_off(di->offset);
+			if (last)
+				dput(last);
+			last = dentry;
+		} else {
+			dput(dentry);
 		}
-		dput(dentry);
-		return 0;
-	}
-
-	ctx->pos = di->offset + 1;
 
-	if (last)
-		dput(last);
-	last = dentry;
-
-	spin_lock(&parent->d_lock);
-	p = p->prev;	/* advance to next dentry */
-	goto more;
-
-out_unlock:
-	spin_unlock(&parent->d_lock);
-out:
-	if (last)
+		cache_ctl.index++;
+		ptr_pos += sizeof(struct dentry *);
+	}
+	ceph_readdir_cache_release(&cache_ctl);
+	if (last) {
+		int ret;
+		di = ceph_dentry(last);
+		ret = note_last_dentry(fi, last->d_name.name, last->d_name.len,
+				       fpos_off(di->offset) + 1);
+		if (ret < 0)
+			err = ret;
 		dput(last);
+	}
 	return err;
 }
 
-/*
- * make note of the last dentry we read, so we can
- * continue at the same lexicographical point,
- * regardless of what dir changes take place on the
- * server.
- */
-static int note_last_dentry(struct ceph_file_info *fi, const char *name,
-			    int len)
-{
-	kfree(fi->last_name);
-	fi->last_name = kmalloc(len+1, GFP_NOFS);
-	if (!fi->last_name)
-		return -ENOMEM;
-	memcpy(fi->last_name, name, len);
-	fi->last_name[len] = 0;
-	dout("note_last_dentry '%s'\n", fi->last_name);
-	return 0;
-}
-
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
 	struct ceph_file_info *fi = file->private_data;
@@ -280,8 +291,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
 
 	/* can we use the dcache? */
 	spin_lock(&ci->i_ceph_lock);
-	if ((ctx->pos == 2 || fi->dentry) &&
-	    ceph_test_mount_opt(fsc, DCACHE) &&
+	if (ceph_test_mount_opt(fsc, DCACHE) &&
 	    !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
 	    ceph_snap(inode) != CEPH_SNAPDIR &&
 	    __ceph_dir_is_complete_ordered(ci) &&
@@ -296,24 +306,8 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
 	} else {
 		spin_unlock(&ci->i_ceph_lock);
 	}
-	if (fi->dentry) {
-		err = note_last_dentry(fi, fi->dentry->d_name.name,
-				       fi->dentry->d_name.len);
-		if (err)
-			return err;
-		dput(fi->dentry);
-		fi->dentry = NULL;
-	}
 
 	/* proceed with a normal readdir */
-
-	if (ctx->pos == 2) {
-		/* note dir version at start of readdir so we can tell
-		 * if any dentries get dropped */
-		fi->dir_release_count = atomic_read(&ci->i_release_count);
-		fi->dir_ordered_count = ci->i_ordered_count;
-	}
-
 more:
 	/* do we have the correct frag content buffered? */
 	if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -342,12 +336,15 @@ more:
 		req->r_direct_hash = ceph_frag_value(frag);
 		req->r_direct_is_hash = true;
 		if (fi->last_name) {
-			req->r_path2 = kstrdup(fi->last_name, GFP_NOFS);
+			req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
 			if (!req->r_path2) {
 				ceph_mdsc_put_request(req);
 				return -ENOMEM;
 			}
 		}
+		req->r_dir_release_cnt = fi->dir_release_count;
+		req->r_dir_ordered_cnt = fi->dir_ordered_count;
+		req->r_readdir_cache_idx = fi->readdir_cache_idx;
 		req->r_readdir_offset = fi->next_offset;
 		req->r_args.readdir.frag = cpu_to_le32(frag);
 
@@ -364,26 +361,38 @@ more:
 		     (int)req->r_reply_info.dir_end,
 		     (int)req->r_reply_info.dir_complete);
 
-		if (!req->r_did_prepopulate) {
-			dout("readdir !did_prepopulate");
-			/* preclude from marking dir complete */
-			fi->dir_release_count--;
-		}
 
 		/* note next offset and last dentry name */
 		rinfo = &req->r_reply_info;
 		if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
 			frag = le32_to_cpu(rinfo->dir_dir->frag);
-			if (ceph_frag_is_leftmost(frag))
-				fi->next_offset = 2;
-			else
-				fi->next_offset = 0;
-			off = fi->next_offset;
+			off = req->r_readdir_offset;
+			fi->next_offset = off;
 		}
+
 		fi->frag = frag;
 		fi->offset = fi->next_offset;
 		fi->last_readdir = req;
 
+		if (req->r_did_prepopulate) {
+			fi->readdir_cache_idx = req->r_readdir_cache_idx;
+			if (fi->readdir_cache_idx < 0) {
+				/* preclude from marking dir ordered */
+				fi->dir_ordered_count = 0;
+			} else if (ceph_frag_is_leftmost(frag) && off == 2) {
+				/* note dir version at start of readdir so
+				 * we can tell if any dentries get dropped */
+				fi->dir_release_count = req->r_dir_release_cnt;
+				fi->dir_ordered_count = req->r_dir_ordered_cnt;
+			}
+		} else {
+			dout("readdir !did_prepopulate");
+			/* disable readdir cache */
+			fi->readdir_cache_idx = -1;
+			/* preclude from marking dir complete */
+			fi->dir_release_count = 0;
+		}
+
 		if (req->r_reply_info.dir_end) {
 			kfree(fi->last_name);
 			fi->last_name = NULL;
@@ -394,10 +403,10 @@ more:
 		} else {
 			err = note_last_dentry(fi,
 				       rinfo->dir_dname[rinfo->dir_nr-1],
-				       rinfo->dir_dname_len[rinfo->dir_nr-1]);
+				       rinfo->dir_dname_len[rinfo->dir_nr-1],
+				       fi->next_offset + rinfo->dir_nr);
 			if (err)
 				return err;
-			fi->next_offset += rinfo->dir_nr;
 		}
 	}
 
@@ -453,16 +462,22 @@ more:
 	 * were released during the whole readdir, and we should have
 	 * the complete dir contents in our cache.
 	 */
-	spin_lock(&ci->i_ceph_lock);
-	if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
-		if (ci->i_ordered_count == fi->dir_ordered_count)
+	if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) {
+		spin_lock(&ci->i_ceph_lock);
+		if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) {
 			dout(" marking %p complete and ordered\n", inode);
-		else
+			/* use i_size to track number of entries in
+			 * readdir cache */
+			BUG_ON(fi->readdir_cache_idx < 0);
+			i_size_write(inode, fi->readdir_cache_idx *
+				     sizeof(struct dentry*));
+		} else {
 			dout(" marking %p complete\n", inode);
+		}
 		__ceph_dir_set_complete(ci, fi->dir_release_count,
 					fi->dir_ordered_count);
+		spin_unlock(&ci->i_ceph_lock);
 	}
-	spin_unlock(&ci->i_ceph_lock);
 
 	dout("readdir %p file %p done.\n", inode, file);
 	return 0;
@@ -476,14 +491,12 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
 	}
 	kfree(fi->last_name);
 	fi->last_name = NULL;
+	fi->dir_release_count = 0;
+	fi->readdir_cache_idx = -1;
 	if (ceph_frag_is_leftmost(frag))
 		fi->next_offset = 2;  /* compensate for . and .. */
 	else
 		fi->next_offset = 0;
-	if (fi->dentry) {
-		dput(fi->dentry);
-		fi->dentry = NULL;
-	}
 	fi->flags &= ~CEPH_F_ATEND;
 }
 
@@ -497,13 +510,12 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 	mutex_lock(&inode->i_mutex);
 	retval = -EINVAL;
 	switch (whence) {
-	case SEEK_END:
-		offset += inode->i_size + 2;   /* FIXME */
-		break;
 	case SEEK_CUR:
 		offset += file->f_pos;
 	case SEEK_SET:
 		break;
+	case SEEK_END:
+		retval = -EOPNOTSUPP;
 	default:
 		goto out;
 	}
@@ -516,20 +528,18 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 		}
 		retval = offset;
 
-		/*
-		 * discard buffered readdir content on seekdir(0), or
-		 * seek to new frag, or seek prior to current chunk.
-		 */
 		if (offset == 0 ||
 		    fpos_frag(offset) != fi->frag ||
 		    fpos_off(offset) < fi->offset) {
+			/* discard buffered readdir content on seekdir(0), or
+			 * seek to new frag, or seek prior to current chunk */
 			dout("dir_llseek dropping %p content\n", file);
 			reset_readdir(fi, fpos_frag(offset));
+		} else if (fpos_cmp(offset, old_offset) > 0) {
+			/* reset dir_release_count if we did a forward seek */
+			fi->dir_release_count = 0;
+			fi->readdir_cache_idx = -1;
 		}
-
-		/* bump dir_release_count if we did a forward seek */
-		if (fpos_cmp(offset, old_offset) > 0)
-			fi->dir_release_count--;
 	}
 out:
 	mutex_unlock(&inode->i_mutex);
@@ -764,7 +774,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
 		err = PTR_ERR(req);
 		goto out;
 	}
-	req->r_path2 = kstrdup(dest, GFP_NOFS);
+	req->r_path2 = kstrdup(dest, GFP_KERNEL);
 	if (!req->r_path2) {
 		err = -ENOMEM;
 		ceph_mdsc_put_request(req);
@@ -985,16 +995,15 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 		 * to do it here.
 		 */
 
+		/* d_move screws up sibling dentries' offsets */
+		ceph_dir_clear_complete(old_dir);
+		ceph_dir_clear_complete(new_dir);
+
 		d_move(old_dentry, new_dentry);
 
 		/* ensure target dentry is invalidated, despite
 		   rehashing bug in vfs_rename_dir */
 		ceph_invalidate_dentry_lease(new_dentry);
-
-		/* d_move screws up sibling dentries' offsets */
-		ceph_dir_clear_complete(old_dir);
-		ceph_dir_clear_complete(new_dir);
-
 	}
 	ceph_mdsc_put_request(req);
 	return err;
@@ -1189,7 +1198,7 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
 		return -EISDIR;
 
 	if (!cf->dir_info) {
-		cf->dir_info = kmalloc(bufsize, GFP_NOFS);
+		cf->dir_info = kmalloc(bufsize, GFP_KERNEL);
 		if (!cf->dir_info)
 			return -ENOMEM;
 		cf->dir_info_len =
@@ -1224,66 +1233,6 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
 }
 
 /*
- * an fsync() on a dir will wait for any uncommitted directory
- * operations to commit.
- */
-static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end,
-			  int datasync)
-{
-	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct list_head *head = &ci->i_unsafe_dirops;
-	struct ceph_mds_request *req;
-	u64 last_tid;
-	int ret = 0;
-
-	dout("dir_fsync %p\n", inode);
-	ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
-	if (ret)
-		return ret;
-	mutex_lock(&inode->i_mutex);
-
-	spin_lock(&ci->i_unsafe_lock);
-	if (list_empty(head))
-		goto out;
-
-	req = list_entry(head->prev,
-			 struct ceph_mds_request, r_unsafe_dir_item);
-	last_tid = req->r_tid;
-
-	do {
-		ceph_mdsc_get_request(req);
-		spin_unlock(&ci->i_unsafe_lock);
-
-		dout("dir_fsync %p wait on tid %llu (until %llu)\n",
-		     inode, req->r_tid, last_tid);
-		if (req->r_timeout) {
-			unsigned long time_left = wait_for_completion_timeout(
-							&req->r_safe_completion,
-							req->r_timeout);
-			if (time_left > 0)
-				ret = 0;
-			else
-				ret = -EIO;  /* timed out */
-		} else {
-			wait_for_completion(&req->r_safe_completion);
-		}
-		ceph_mdsc_put_request(req);
-
-		spin_lock(&ci->i_unsafe_lock);
-		if (ret || list_empty(head))
-			break;
-		req = list_entry(head->next,
-				 struct ceph_mds_request, r_unsafe_dir_item);
-	} while (req->r_tid < last_tid);
-out:
-	spin_unlock(&ci->i_unsafe_lock);
-	mutex_unlock(&inode->i_mutex);
-
-	return ret;
-}
-
-/*
  * We maintain a private dentry LRU.
  *
  * FIXME: this needs to be changed to a per-mds lru to be useful.
@@ -1353,7 +1302,7 @@ const struct file_operations ceph_dir_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
 	.unlocked_ioctl = ceph_ioctl,
-	.fsync = ceph_dir_fsync,
+	.fsync = ceph_fsync,
 };
 
 const struct file_operations ceph_snapdir_fops = {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3b6b522b4b31..faf92095e105 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -89,13 +89,14 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 	case S_IFDIR:
 		dout("init_file %p %p 0%o (regular)\n", inode, file,
 		     inode->i_mode);
-		cf = kmem_cache_alloc(ceph_file_cachep, GFP_NOFS | __GFP_ZERO);
+		cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO);
 		if (cf == NULL) {
 			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 			return -ENOMEM;
 		}
 		cf->fmode = fmode;
 		cf->next_offset = 2;
+		cf->readdir_cache_idx = -1;
 		file->private_data = cf;
 		BUG_ON(inode->i_fop->release != ceph_release);
 		break;
@@ -324,7 +325,6 @@ int ceph_release(struct inode *inode, struct file *file)
 		ceph_mdsc_put_request(cf->last_readdir);
 	kfree(cf->last_name);
 	kfree(cf->dir_info);
-	dput(cf->dentry);
 	kmem_cache_free(ceph_file_cachep, cf);
 
 	/* wake up anyone waiting for caps on this inode */
@@ -483,7 +483,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
 		}
 	} else {
 		num_pages = calc_pages_for(off, len);
-		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
+		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 		if (IS_ERR(pages))
 			return PTR_ERR(pages);
 		ret = striped_read(inode, off, len, pages,
@@ -557,13 +557,13 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+		       struct ceph_snap_context *snapc)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_snap_context *snapc;
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	struct page **pages;
@@ -600,7 +600,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
 		size_t start;
 		ssize_t n;
 
-		snapc = ci->i_snap_realm->cached_context;
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 					    vino, pos, &len, 0,
@@ -614,7 +613,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
 			break;
 		}
 
-		osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
+		osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 
 		n = iov_iter_get_pages_alloc(from, &pages, len, &start);
 		if (unlikely(n < 0)) {
@@ -674,13 +673,13 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+		struct ceph_snap_context *snapc)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_snap_context *snapc;
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	struct page **pages;
@@ -717,7 +716,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
 		size_t left;
 		int n;
 
-		snapc = ci->i_snap_realm->cached_context;
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 					    vino, pos, &len, 0, 1,
@@ -736,7 +734,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
 		 */
 		num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
-		pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
+		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 		if (IS_ERR(pages)) {
 			ret = PTR_ERR(pages);
 			goto out;
@@ -860,7 +858,7 @@ again:
 		struct page *page = NULL;
 		loff_t i_size;
 		if (retry_op == READ_INLINE) {
-			page = __page_cache_alloc(GFP_NOFS);
+			page = __page_cache_alloc(GFP_KERNEL);
 			if (!page)
 				return -ENOMEM;
 		}
@@ -941,6 +939,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_client *osdc =
 		&ceph_sb_to_client(inode->i_sb)->client->osdc;
+	struct ceph_cap_flush *prealloc_cf;
 	ssize_t count, written = 0;
 	int err, want, got;
 	loff_t pos;
@@ -948,6 +947,10 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	mutex_lock(&inode->i_mutex);
 
 	/* We can write back this queue in page reclaim */
@@ -996,14 +999,30 @@ retry_snap:
 
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+		struct ceph_snap_context *snapc;
 		struct iov_iter data;
 		mutex_unlock(&inode->i_mutex);
+
+		spin_lock(&ci->i_ceph_lock);
+		if (__ceph_have_pending_cap_snap(ci)) {
+			struct ceph_cap_snap *capsnap =
+					list_last_entry(&ci->i_cap_snaps,
+							struct ceph_cap_snap,
+							ci_item);
+			snapc = ceph_get_snap_context(capsnap->context);
+		} else {
+			BUG_ON(!ci->i_head_snapc);
+			snapc = ceph_get_snap_context(ci->i_head_snapc);
+		}
+		spin_unlock(&ci->i_ceph_lock);
+
 		/* we might need to revert back to that point */
 		data = *from;
 		if (iocb->ki_flags & IOCB_DIRECT)
-			written = ceph_sync_direct_write(iocb, &data, pos);
+			written = ceph_sync_direct_write(iocb, &data, pos,
+							 snapc);
 		else
-			written = ceph_sync_write(iocb, &data, pos);
+			written = ceph_sync_write(iocb, &data, pos, snapc);
 		if (written == -EOLDSNAPC) {
 			dout("aio_write %p %llx.%llx %llu~%u"
 				"got EOLDSNAPC, retrying\n",
@@ -1014,6 +1033,7 @@ retry_snap:
 		}
 		if (written > 0)
 			iov_iter_advance(from, written);
+		ceph_put_snap_context(snapc);
 	} else {
 		loff_t old_size = inode->i_size;
 		/*
@@ -1035,7 +1055,8 @@ retry_snap:
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1059,6 +1080,7 @@ retry_snap:
 out:
 	mutex_unlock(&inode->i_mutex);
 out_unlocked:
+	ceph_free_cap_flush(prealloc_cf);
 	current->backing_dev_info = NULL;
 	return written ? written : err;
 }
@@ -1255,6 +1277,7 @@ static long ceph_fallocate(struct file *file, int mode,
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_client *osdc =
 		&ceph_inode_to_client(inode)->client->osdc;
+	struct ceph_cap_flush *prealloc_cf;
 	int want, got = 0;
 	int dirty;
 	int ret = 0;
@@ -1267,6 +1290,10 @@ static long ceph_fallocate(struct file *file, int mode,
 	if (!S_ISREG(inode->i_mode))
 		return -EOPNOTSUPP;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	mutex_lock(&inode->i_mutex);
 
 	if (ceph_snap(inode) != CEPH_NOSNAP) {
@@ -1313,7 +1340,8 @@ static long ceph_fallocate(struct file *file, int mode,
 	if (!ret) {
 		spin_lock(&ci->i_ceph_lock);
 		ci->i_inline_version = CEPH_INLINE_NONE;
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					       &prealloc_cf);
 		spin_unlock(&ci->i_ceph_lock);
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
@@ -1322,6 +1350,7 @@ static long ceph_fallocate(struct file *file, int mode,
 	ceph_put_cap_refs(ci, got);
 unlock:
 	mutex_unlock(&inode->i_mutex);
+	ceph_free_cap_flush(prealloc_cf);
 	return ret;
 }
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 571acd88606c..96d2bd829902 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -389,9 +389,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 	ci->i_inline_version = 0;
 	ci->i_time_warp_seq = 0;
 	ci->i_ceph_flags = 0;
-	ci->i_ordered_count = 0;
-	atomic_set(&ci->i_release_count, 1);
-	atomic_set(&ci->i_complete_count, 0);
+	atomic64_set(&ci->i_ordered_count, 1);
+	atomic64_set(&ci->i_release_count, 1);
+	atomic64_set(&ci->i_complete_seq[0], 0);
+	atomic64_set(&ci->i_complete_seq[1], 0);
 	ci->i_symlink = NULL;
 
 	memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
@@ -415,9 +416,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 	ci->i_flushing_caps = 0;
 	INIT_LIST_HEAD(&ci->i_dirty_item);
 	INIT_LIST_HEAD(&ci->i_flushing_item);
-	ci->i_cap_flush_seq = 0;
-	ci->i_cap_flush_last_tid = 0;
-	memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid));
+	ci->i_prealloc_cap_flush = NULL;
+	ci->i_cap_flush_tree = RB_ROOT;
 	init_waitqueue_head(&ci->i_cap_wq);
 	ci->i_hold_caps_min = 0;
 	ci->i_hold_caps_max = 0;
@@ -752,7 +752,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 
 	if (new_version ||
 	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
+		if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
+			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 		ci->i_layout = info->layout;
+
 		queue_trunc = ceph_fill_file_size(inode, issued,
 					le32_to_cpu(info->truncate_seq),
 					le64_to_cpu(info->truncate_size),
@@ -858,9 +861,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 			    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
 			    !__ceph_dir_is_complete(ci)) {
 				dout(" marking %p complete (empty)\n", inode);
+				i_size_write(inode, 0);
 				__ceph_dir_set_complete(ci,
-					atomic_read(&ci->i_release_count),
-					ci->i_ordered_count);
+					atomic64_read(&ci->i_release_count),
+					atomic64_read(&ci->i_ordered_count));
 			}
 
 			wake = true;
@@ -1212,6 +1216,10 @@ retry_lookup:
 			dout("fill_trace doing d_move %p -> %p\n",
 			     req->r_old_dentry, dn);
 
+			/* d_move screws up sibling dentries' offsets */
+			ceph_dir_clear_ordered(dir);
+			ceph_dir_clear_ordered(olddir);
+
 			d_move(req->r_old_dentry, dn);
 			dout(" src %p '%pd' dst %p '%pd'\n",
 			     req->r_old_dentry,
@@ -1222,10 +1230,6 @@ retry_lookup:
 			   rehashing bug in vfs_rename_dir */
 			ceph_invalidate_dentry_lease(dn);
 
-			/* d_move screws up sibling dentries' offsets */
-			ceph_dir_clear_ordered(dir);
-			ceph_dir_clear_ordered(olddir);
-
 			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
 			     ceph_dentry(req->r_old_dentry)->offset);
 
@@ -1333,6 +1337,49 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
 	return err;
 }
 
+void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl)
+{
+	if (ctl->page) {
+		kunmap(ctl->page);
+		page_cache_release(ctl->page);
+		ctl->page = NULL;
+	}
+}
+
+static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
+			      struct ceph_readdir_cache_control *ctl,
+			      struct ceph_mds_request *req)
+{
+	struct ceph_inode_info *ci = ceph_inode(dir);
+	unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*);
+	unsigned idx = ctl->index % nsize;
+	pgoff_t pgoff = ctl->index / nsize;
+
+	if (!ctl->page || pgoff != page_index(ctl->page)) {
+		ceph_readdir_cache_release(ctl);
+		ctl->page  = grab_cache_page(&dir->i_data, pgoff);
+		if (!ctl->page) {
+			ctl->index = -1;
+			return -ENOMEM;
+		}
+		/* reading/filling the cache are serialized by
+		 * i_mutex, no need to use page lock */
+		unlock_page(ctl->page);
+		ctl->dentries = kmap(ctl->page);
+	}
+
+	if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
+	    req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) {
+		dout("readdir cache dn %p idx %d\n", dn, ctl->index);
+		ctl->dentries[idx] = dn;
+		ctl->index++;
+	} else {
+		dout("disable readdir cache\n");
+		ctl->index = -1;
+	}
+	return 0;
+}
+
 int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 			     struct ceph_mds_session *session)
 {
@@ -1345,8 +1392,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 	struct inode *snapdir = NULL;
 	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
 	struct ceph_dentry_info *di;
-	u64 r_readdir_offset = req->r_readdir_offset;
 	u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+	struct ceph_readdir_cache_control cache_ctl = {};
+
+	if (req->r_aborted)
+		return readdir_prepopulate_inodes_only(req, session);
 
 	if (rinfo->dir_dir &&
 	    le32_to_cpu(rinfo->dir_dir->frag) != frag) {
@@ -1354,14 +1404,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 		     frag, le32_to_cpu(rinfo->dir_dir->frag));
 		frag = le32_to_cpu(rinfo->dir_dir->frag);
 		if (ceph_frag_is_leftmost(frag))
-			r_readdir_offset = 2;
+			req->r_readdir_offset = 2;
 		else
-			r_readdir_offset = 0;
+			req->r_readdir_offset = 0;
 	}
 
-	if (req->r_aborted)
-		return readdir_prepopulate_inodes_only(req, session);
-
 	if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
 		snapdir = ceph_get_snapdir(d_inode(parent));
 		parent = d_find_alias(snapdir);
@@ -1374,6 +1421,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 			ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);
 	}
 
+	if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
+		/* note dir version at start of readdir so we can tell
+		 * if any dentries get dropped */
+		struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
+		req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
+		req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
+		req->r_readdir_cache_idx = 0;
+	}
+
+	cache_ctl.index = req->r_readdir_cache_idx;
+
 	/* FIXME: release caps/leases if error occurs */
 	for (i = 0; i < rinfo->dir_nr; i++) {
 		struct ceph_vino vino;
@@ -1413,13 +1471,6 @@ retry_lookup:
 			d_delete(dn);
 			dput(dn);
 			goto retry_lookup;
-		} else {
-			/* reorder parent's d_subdirs */
-			spin_lock(&parent->d_lock);
-			spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
-			list_move(&dn->d_child, &parent->d_subdirs);
-			spin_unlock(&dn->d_lock);
-			spin_unlock(&parent->d_lock);
 		}
 
 		/* inode */
@@ -1436,13 +1487,15 @@ retry_lookup:
 			}
 		}
 
-		if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
-			       req->r_request_started, -1,
-			       &req->r_caps_reservation) < 0) {
+		ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session,
+				 req->r_request_started, -1,
+				 &req->r_caps_reservation);
+		if (ret < 0) {
 			pr_err("fill_inode badness on %p\n", in);
 			if (d_really_is_negative(dn))
 				iput(in);
 			d_drop(dn);
+			err = ret;
 			goto next_item;
 		}
 
@@ -1458,19 +1511,28 @@ retry_lookup:
 		}
 
 		di = dn->d_fsdata;
-		di->offset = ceph_make_fpos(frag, i + r_readdir_offset);
+		di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
 
 		update_dentry_lease(dn, rinfo->dir_dlease[i],
 				    req->r_session,
 				    req->r_request_started);
+
+		if (err == 0 && cache_ctl.index >= 0) {
+			ret = fill_readdir_cache(d_inode(parent), dn,
+						 &cache_ctl, req);
+			if (ret < 0)
+				err = ret;
+		}
 next_item:
 		if (dn)
 			dput(dn);
 	}
-	if (err == 0)
-		req->r_did_prepopulate = true;
-
 out:
+	if (err == 0) {
+		req->r_did_prepopulate = true;
+		req->r_readdir_cache_idx = cache_ctl.index;
+	}
+	ceph_readdir_cache_release(&cache_ctl);
 	if (snapdir) {
 		iput(snapdir);
 		dput(parent);
@@ -1712,11 +1774,13 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	const unsigned int ia_valid = attr->ia_valid;
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+	struct ceph_cap_flush *prealloc_cf;
 	int issued;
 	int release = 0, dirtied = 0;
 	int mask = 0;
 	int err = 0;
 	int inode_dirty_flags = 0;
+	bool lock_snap_rwsem = false;
 
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
@@ -1725,13 +1789,31 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	if (err != 0)
 		return err;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR,
 				       USE_AUTH_MDS);
-	if (IS_ERR(req))
+	if (IS_ERR(req)) {
+		ceph_free_cap_flush(prealloc_cf);
 		return PTR_ERR(req);
+	}
 
 	spin_lock(&ci->i_ceph_lock);
 	issued = __ceph_caps_issued(ci, NULL);
+
+	if (!ci->i_head_snapc &&
+	    (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) {
+		lock_snap_rwsem = true;
+		if (!down_read_trylock(&mdsc->snap_rwsem)) {
+			spin_unlock(&ci->i_ceph_lock);
+			down_read(&mdsc->snap_rwsem);
+			spin_lock(&ci->i_ceph_lock);
+			issued = __ceph_caps_issued(ci, NULL);
+		}
+	}
+
 	dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
 
 	if (ia_valid & ATTR_UID) {
@@ -1874,12 +1956,15 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 		dout("setattr %p ATTR_FILE ... hrm!\n", inode);
 
 	if (dirtied) {
-		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied);
+		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
+							   &prealloc_cf);
 		inode->i_ctime = CURRENT_TIME;
 	}
 
 	release &= issued;
 	spin_unlock(&ci->i_ceph_lock);
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 
 	if (inode_dirty_flags)
 		__mark_inode_dirty(inode, inode_dirty_flags);
@@ -1904,9 +1989,11 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	ceph_mdsc_put_request(req);
 	if (mask & CEPH_SETATTR_SIZE)
 		__ceph_do_pending_vmtruncate(inode);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 out_put:
 	ceph_mdsc_put_request(req);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 }
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 84f37f34f9aa..6aa07af67603 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -8,6 +8,7 @@
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
 #include <linux/utsname.h>
+#include <linux/ratelimit.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -458,7 +459,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
-	INIT_LIST_HEAD(&s->s_cap_releases_done);
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 
@@ -629,6 +629,9 @@ static void __register_request(struct ceph_mds_client *mdsc,
 	req->r_uid = current_fsuid();
 	req->r_gid = current_fsgid();
 
+	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
+		mdsc->oldest_tid = req->r_tid;
+
 	if (dir) {
 		struct ceph_inode_info *ci = ceph_inode(dir);
 
@@ -644,6 +647,21 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
 				 struct ceph_mds_request *req)
 {
 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
+
+	if (req->r_tid == mdsc->oldest_tid) {
+		struct rb_node *p = rb_next(&req->r_node);
+		mdsc->oldest_tid = 0;
+		while (p) {
+			struct ceph_mds_request *next_req =
+				rb_entry(p, struct ceph_mds_request, r_node);
+			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
+				mdsc->oldest_tid = next_req->r_tid;
+				break;
+			}
+			p = rb_next(p);
+		}
+	}
+
 	rb_erase(&req->r_node, &mdsc->request_tree);
 	RB_CLEAR_NODE(&req->r_node);
 
@@ -998,27 +1016,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
  * session caps
  */
 
-/*
- * Free preallocated cap messages assigned to this session
- */
-static void cleanup_cap_releases(struct ceph_mds_session *session)
+/* caller holds s_cap_lock, we drop it */
+static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
+				 struct ceph_mds_session *session)
+	__releases(session->s_cap_lock)
 {
-	struct ceph_msg *msg;
+	LIST_HEAD(tmp_list);
+	list_splice_init(&session->s_cap_releases, &tmp_list);
+	session->s_num_cap_releases = 0;
+	spin_unlock(&session->s_cap_lock);
 
-	spin_lock(&session->s_cap_lock);
-	while (!list_empty(&session->s_cap_releases)) {
-		msg = list_first_entry(&session->s_cap_releases,
-				       struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
-		ceph_msg_put(msg);
-	}
-	while (!list_empty(&session->s_cap_releases_done)) {
-		msg = list_first_entry(&session->s_cap_releases_done,
-				       struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
-		ceph_msg_put(msg);
+	dout("cleanup_cap_releases mds%d\n", session->s_mds);
+	while (!list_empty(&tmp_list)) {
+		struct ceph_cap *cap;
+		/* zero out the in-progress message */
+		cap = list_first_entry(&tmp_list,
+					struct ceph_cap, session_caps);
+		list_del(&cap->session_caps);
+		ceph_put_cap(mdsc, cap);
 	}
-	spin_unlock(&session->s_cap_lock);
 }
 
 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
@@ -1033,7 +1049,8 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc,
 		req = list_first_entry(&session->s_unsafe,
 				       struct ceph_mds_request, r_unsafe_item);
 		list_del_init(&req->r_unsafe_item);
-		pr_info(" dropping unsafe request %llu\n", req->r_tid);
+		pr_warn_ratelimited(" dropping unsafe request %llu\n",
+				    req->r_tid);
 		__unregister_request(mdsc, req);
 	}
 	/* zero r_attempts, so kick_requests() will re-send requests */
@@ -1095,10 +1112,16 @@ static int iterate_session_caps(struct ceph_mds_session *session,
 			dout("iterate_session_caps  finishing cap %p removal\n",
 			     cap);
 			BUG_ON(cap->session != session);
+			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
-			cap->session = NULL;
-			old_cap = cap;  /* put_cap it w/o locks held */
+			if (cap->queue_release) {
+				list_add_tail(&cap->session_caps,
+					      &session->s_cap_releases);
+				session->s_num_cap_releases++;
+			} else {
+				old_cap = cap;  /* put_cap it w/o locks held */
+			}
 		}
 		if (ret < 0)
 			goto out;
@@ -1119,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 				  void *arg)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	LIST_HEAD(to_remove);
 	int drop = 0;
 
 	dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1126,12 +1150,27 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	spin_lock(&ci->i_ceph_lock);
 	__ceph_remove_cap(cap, false);
 	if (!ci->i_auth_cap) {
+		struct ceph_cap_flush *cf;
 		struct ceph_mds_client *mdsc =
 			ceph_sb_to_client(inode->i_sb)->mdsc;
 
+		while (true) {
+			struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
+			if (!n)
+				break;
+			cf = rb_entry(n, struct ceph_cap_flush, i_node);
+			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+			list_add(&cf->list, &to_remove);
+		}
+
 		spin_lock(&mdsc->cap_dirty_lock);
+
+		list_for_each_entry(cf, &to_remove, list)
+			rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+
 		if (!list_empty(&ci->i_dirty_item)) {
-			pr_info(" dropping dirty %s state for %p %lld\n",
+			pr_warn_ratelimited(
+				" dropping dirty %s state for %p %lld\n",
 				ceph_cap_string(ci->i_dirty_caps),
 				inode, ceph_ino(inode));
 			ci->i_dirty_caps = 0;
@@ -1139,7 +1178,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			drop = 1;
 		}
 		if (!list_empty(&ci->i_flushing_item)) {
-			pr_info(" dropping dirty+flushing %s state for %p %lld\n",
+			pr_warn_ratelimited(
+				" dropping dirty+flushing %s state for %p %lld\n",
 				ceph_cap_string(ci->i_flushing_caps),
 				inode, ceph_ino(inode));
 			ci->i_flushing_caps = 0;
@@ -1148,8 +1188,20 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
 			drop = 1;
 		}
 		spin_unlock(&mdsc->cap_dirty_lock);
+
+		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
+			list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+			ci->i_prealloc_cap_flush = NULL;
+		}
 	}
 	spin_unlock(&ci->i_ceph_lock);
+	while (!list_empty(&to_remove)) {
+		struct ceph_cap_flush *cf;
+		cf = list_first_entry(&to_remove,
+				      struct ceph_cap_flush, list);
+		list_del(&cf->list);
+		ceph_free_cap_flush(cf);
+	}
 	while (drop--)
 		iput(inode);
 	return 0;
@@ -1191,11 +1243,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
 			spin_lock(&session->s_cap_lock);
 		}
 	}
-	spin_unlock(&session->s_cap_lock);
+
+	// drop cap expires and unlock s_cap_lock
+	cleanup_cap_releases(session->s_mdsc, session);
 
 	BUG_ON(session->s_nr_caps > 0);
 	BUG_ON(!list_empty(&session->s_cap_flushing));
-	cleanup_cap_releases(session);
 }
 
 /*
@@ -1371,7 +1424,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
 	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
 	     ceph_cap_string(used), ceph_cap_string(wanted));
 	if (cap == ci->i_auth_cap) {
-		if (ci->i_dirty_caps | ci->i_flushing_caps)
+		if (ci->i_dirty_caps || ci->i_flushing_caps ||
+		    !list_empty(&ci->i_cap_snaps))
 			goto out;
 		if ((used | wanted) & CEPH_CAP_ANY_WR)
 			goto out;
@@ -1417,121 +1471,80 @@ static int trim_caps(struct ceph_mds_client *mdsc,
 		session->s_trim_caps = 0;
 	}
 
-	ceph_add_cap_releases(mdsc, session);
 	ceph_send_cap_releases(mdsc, session);
 	return 0;
 }
 
-/*
- * Allocate cap_release messages.  If there is a partially full message
- * in the queue, try to allocate enough to cover it's remainder, so that
- * we can send it immediately.
- *
- * Called under s_mutex.
- */
-int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-			  struct ceph_mds_session *session)
+static int check_capsnap_flush(struct ceph_inode_info *ci,
+			       u64 want_snap_seq)
 {
-	struct ceph_msg *msg, *partial = NULL;
-	struct ceph_mds_cap_release *head;
-	int err = -ENOMEM;
-	int extra = mdsc->fsc->mount_options->cap_release_safety;
-	int num;
-
-	dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
-	     extra);
-
-	spin_lock(&session->s_cap_lock);
-
-	if (!list_empty(&session->s_cap_releases)) {
-		msg = list_first_entry(&session->s_cap_releases,
-				       struct ceph_msg,
-				 list_head);
-		head = msg->front.iov_base;
-		num = le32_to_cpu(head->num);
-		if (num) {
-			dout(" partial %p with (%d/%d)\n", msg, num,
-			     (int)CEPH_CAPS_PER_RELEASE);
-			extra += CEPH_CAPS_PER_RELEASE - num;
-			partial = msg;
-		}
-	}
-	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
-		spin_unlock(&session->s_cap_lock);
-		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
-				   GFP_NOFS, false);
-		if (!msg)
-			goto out_unlocked;
-		dout("add_cap_releases %p msg %p now %d\n", session, msg,
-		     (int)msg->front.iov_len);
-		head = msg->front.iov_base;
-		head->num = cpu_to_le32(0);
-		msg->front.iov_len = sizeof(*head);
-		spin_lock(&session->s_cap_lock);
-		list_add(&msg->list_head, &session->s_cap_releases);
-		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
-	}
-
-	if (partial) {
-		head = partial->front.iov_base;
-		num = le32_to_cpu(head->num);
-		dout(" queueing partial %p with %d/%d\n", partial, num,
-		     (int)CEPH_CAPS_PER_RELEASE);
-		list_move_tail(&partial->list_head,
-			       &session->s_cap_releases_done);
-		session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
+	int ret = 1;
+	spin_lock(&ci->i_ceph_lock);
+	if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
+		struct ceph_cap_snap *capsnap =
+			list_first_entry(&ci->i_cap_snaps,
+					 struct ceph_cap_snap, ci_item);
+		ret = capsnap->follows >= want_snap_seq;
 	}
-	err = 0;
-	spin_unlock(&session->s_cap_lock);
-out_unlocked:
-	return err;
+	spin_unlock(&ci->i_ceph_lock);
+	return ret;
 }
 
-static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
+static int check_caps_flush(struct ceph_mds_client *mdsc,
+			    u64 want_flush_tid)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int ret;
-	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_flushing_caps)
-		ret = ci->i_cap_flush_seq >= want_flush_seq;
-	else
-		ret = 1;
-	spin_unlock(&ci->i_ceph_lock);
+	struct rb_node *n;
+	struct ceph_cap_flush *cf;
+	int ret = 1;
+
+	spin_lock(&mdsc->cap_dirty_lock);
+	n = rb_first(&mdsc->cap_flush_tree);
+	cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
+	if (cf && cf->tid <= want_flush_tid) {
+		dout("check_caps_flush still flushing tid %llu <= %llu\n",
+		     cf->tid, want_flush_tid);
+		ret = 0;
+	}
+	spin_unlock(&mdsc->cap_dirty_lock);
 	return ret;
 }
 
 /*
  * flush all dirty inode data to disk.
  *
- * returns true if we've flushed through want_flush_seq
+ * returns true if we've flushed through want_flush_tid
  */
-static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
+static void wait_caps_flush(struct ceph_mds_client *mdsc,
+			    u64 want_flush_tid, u64 want_snap_seq)
 {
 	int mds;
 
-	dout("check_cap_flush want %lld\n", want_flush_seq);
+	dout("check_caps_flush want %llu snap want %llu\n",
+	     want_flush_tid, want_snap_seq);
 	mutex_lock(&mdsc->mutex);
-	for (mds = 0; mds < mdsc->max_sessions; mds++) {
+	for (mds = 0; mds < mdsc->max_sessions; ) {
 		struct ceph_mds_session *session = mdsc->sessions[mds];
 		struct inode *inode = NULL;
 
-		if (!session)
+		if (!session) {
+			mds++;
 			continue;
+		}
 		get_session(session);
 		mutex_unlock(&mdsc->mutex);
 
 		mutex_lock(&session->s_mutex);
-		if (!list_empty(&session->s_cap_flushing)) {
-			struct ceph_inode_info *ci =
-				list_entry(session->s_cap_flushing.next,
-					   struct ceph_inode_info,
-					   i_flushing_item);
-
-			if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
-				dout("check_cap_flush still flushing %p "
-				     "seq %lld <= %lld to mds%d\n",
-				     &ci->vfs_inode, ci->i_cap_flush_seq,
-				     want_flush_seq, session->s_mds);
+		if (!list_empty(&session->s_cap_snaps_flushing)) {
+			struct ceph_cap_snap *capsnap =
+				list_first_entry(&session->s_cap_snaps_flushing,
+						 struct ceph_cap_snap,
+						 flushing_item);
+			struct ceph_inode_info *ci = capsnap->ci;
+			if (!check_capsnap_flush(ci, want_snap_seq)) {
+				dout("check_cap_flush still flushing snap %p "
+				     "follows %lld <= %lld to mds%d\n",
+				     &ci->vfs_inode, capsnap->follows,
+				     want_snap_seq, mds);
 				inode = igrab(&ci->vfs_inode);
 			}
 		}
@@ -1540,15 +1553,21 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
 
 		if (inode) {
 			wait_event(mdsc->cap_flushing_wq,
-				   check_cap_flush(inode, want_flush_seq));
+				   check_capsnap_flush(ceph_inode(inode),
+						       want_snap_seq));
 			iput(inode);
+		} else {
+			mds++;
 		}
 
 		mutex_lock(&mdsc->mutex);
 	}
-
 	mutex_unlock(&mdsc->mutex);
-	dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
+
+	wait_event(mdsc->cap_flushing_wq,
+		   check_caps_flush(mdsc, want_flush_tid));
+
+	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
 }
 
 /*
@@ -1557,60 +1576,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
 			    struct ceph_mds_session *session)
 {
-	struct ceph_msg *msg;
+	struct ceph_msg *msg = NULL;
+	struct ceph_mds_cap_release *head;
+	struct ceph_mds_cap_item *item;
+	struct ceph_cap *cap;
+	LIST_HEAD(tmp_list);
+	int num_cap_releases;
 
-	dout("send_cap_releases mds%d\n", session->s_mds);
 	spin_lock(&session->s_cap_lock);
-	while (!list_empty(&session->s_cap_releases_done)) {
-		msg = list_first_entry(&session->s_cap_releases_done,
-				 struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
-		spin_unlock(&session->s_cap_lock);
-		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
-		ceph_con_send(&session->s_con, msg);
-		spin_lock(&session->s_cap_lock);
-	}
+again:
+	list_splice_init(&session->s_cap_releases, &tmp_list);
+	num_cap_releases = session->s_num_cap_releases;
+	session->s_num_cap_releases = 0;
 	spin_unlock(&session->s_cap_lock);
-}
 
-static void discard_cap_releases(struct ceph_mds_client *mdsc,
-				 struct ceph_mds_session *session)
-{
-	struct ceph_msg *msg;
-	struct ceph_mds_cap_release *head;
-	unsigned num;
-
-	dout("discard_cap_releases mds%d\n", session->s_mds);
+	while (!list_empty(&tmp_list)) {
+		if (!msg) {
+			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
+					PAGE_CACHE_SIZE, GFP_NOFS, false);
+			if (!msg)
+				goto out_err;
+			head = msg->front.iov_base;
+			head->num = cpu_to_le32(0);
+			msg->front.iov_len = sizeof(*head);
+		}
+		cap = list_first_entry(&tmp_list, struct ceph_cap,
+					session_caps);
+		list_del(&cap->session_caps);
+		num_cap_releases--;
 
-	if (!list_empty(&session->s_cap_releases)) {
-		/* zero out the in-progress message */
-		msg = list_first_entry(&session->s_cap_releases,
-					struct ceph_msg, list_head);
 		head = msg->front.iov_base;
-		num = le32_to_cpu(head->num);
-		dout("discard_cap_releases mds%d %p %u\n",
-		     session->s_mds, msg, num);
-		head->num = cpu_to_le32(0);
-		msg->front.iov_len = sizeof(*head);
-		session->s_num_cap_releases += num;
+		le32_add_cpu(&head->num, 1);
+		item = msg->front.iov_base + msg->front.iov_len;
+		item->ino = cpu_to_le64(cap->cap_ino);
+		item->cap_id = cpu_to_le64(cap->cap_id);
+		item->migrate_seq = cpu_to_le32(cap->mseq);
+		item->seq = cpu_to_le32(cap->issue_seq);
+		msg->front.iov_len += sizeof(*item);
+
+		ceph_put_cap(mdsc, cap);
+
+		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+			ceph_con_send(&session->s_con, msg);
+			msg = NULL;
+		}
 	}
 
-	/* requeue completed messages */
-	while (!list_empty(&session->s_cap_releases_done)) {
-		msg = list_first_entry(&session->s_cap_releases_done,
-				 struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
+	BUG_ON(num_cap_releases != 0);
 
-		head = msg->front.iov_base;
-		num = le32_to_cpu(head->num);
-		dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
-		     num);
-		session->s_num_cap_releases += num;
-		head->num = cpu_to_le32(0);
-		msg->front.iov_len = sizeof(*head);
-		list_add(&msg->list_head, &session->s_cap_releases);
+	spin_lock(&session->s_cap_lock);
+	if (!list_empty(&session->s_cap_releases))
+		goto again;
+	spin_unlock(&session->s_cap_lock);
+
+	if (msg) {
+		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+		ceph_con_send(&session->s_con, msg);
 	}
+	return;
+out_err:
+	pr_err("send_cap_releases mds%d, failed to allocate message\n",
+		session->s_mds);
+	spin_lock(&session->s_cap_lock);
+	list_splice(&tmp_list, &session->s_cap_releases);
+	session->s_num_cap_releases += num_cap_releases;
+	spin_unlock(&session->s_cap_lock);
 }
 
 /*
@@ -1635,7 +1668,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
 
 	order = get_order(size * num_entries);
 	while (order >= 0) {
-		rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN,
+		rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL |
+							__GFP_NOWARN,
 							order);
 		if (rinfo->dir_in)
 			break;
@@ -1697,13 +1731,9 @@ static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
 			struct ceph_mds_request, r_node);
 }
 
-static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
+static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
 {
-	struct ceph_mds_request *req = __get_oldest_req(mdsc);
-
-	if (req)
-		return req->r_tid;
-	return 0;
+	return mdsc->oldest_tid;
 }
 
 /*
@@ -2267,15 +2297,18 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
 	/* wait */
 	mutex_unlock(&mdsc->mutex);
 	dout("do_request waiting\n");
-	if (req->r_timeout) {
-		err = (long)wait_for_completion_killable_timeout(
-			&req->r_completion, req->r_timeout);
-		if (err == 0)
-			err = -EIO;
-	} else if (req->r_wait_for_completion) {
+	if (!req->r_timeout && req->r_wait_for_completion) {
 		err = req->r_wait_for_completion(mdsc, req);
 	} else {
-		err = wait_for_completion_killable(&req->r_completion);
+		long timeleft = wait_for_completion_killable_timeout(
+					&req->r_completion,
+					ceph_timeout_jiffies(req->r_timeout));
+		if (timeleft > 0)
+			err = 0;
+		else if (!timeleft)
+			err = -EIO;  /* timed out */
+		else
+			err = timeleft;  /* killed */
 	}
 	dout("do_request waited, got %d\n", err);
 	mutex_lock(&mdsc->mutex);
@@ -2496,7 +2529,6 @@ out_err:
 	}
 	mutex_unlock(&mdsc->mutex);
 
-	ceph_add_cap_releases(mdsc, req->r_session);
 	mutex_unlock(&session->s_mutex);
 
 	/* kick calling process */
@@ -2888,8 +2920,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	 */
 	session->s_cap_reconnect = 1;
 	/* drop old cap expires; we're about to reestablish that state */
-	discard_cap_releases(mdsc, session);
-	spin_unlock(&session->s_cap_lock);
+	cleanup_cap_releases(mdsc, session);
 
 	/* trim unused caps to reduce MDS's cache rejoin time */
 	if (mdsc->fsc->sb->s_root)
@@ -2956,6 +2987,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 
 	reply->hdr.data_len = cpu_to_le32(pagelist->length);
 	ceph_msg_data_add_pagelist(reply, pagelist);
+
+	ceph_early_kick_flushing_caps(mdsc, session);
+
 	ceph_con_send(&session->s_con, reply);
 
 	mutex_unlock(&session->s_mutex);
@@ -3352,7 +3386,6 @@ static void delayed_work(struct work_struct *work)
 			send_renew_caps(mdsc, s);
 		else
 			ceph_con_keepalive(&s->s_con);
-		ceph_add_cap_releases(mdsc, s);
 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
 		    s->s_state == CEPH_MDS_SESSION_HUNG)
 			ceph_send_cap_releases(mdsc, s);
@@ -3390,11 +3423,13 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	atomic_set(&mdsc->num_sessions, 0);
 	mdsc->max_sessions = 0;
 	mdsc->stopping = 0;
+	mdsc->last_snap_seq = 0;
 	init_rwsem(&mdsc->snap_rwsem);
 	mdsc->snap_realms = RB_ROOT;
 	INIT_LIST_HEAD(&mdsc->snap_empty);
 	spin_lock_init(&mdsc->snap_empty_lock);
 	mdsc->last_tid = 0;
+	mdsc->oldest_tid = 0;
 	mdsc->request_tree = RB_ROOT;
 	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
 	mdsc->last_renew_caps = jiffies;
@@ -3402,7 +3437,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	spin_lock_init(&mdsc->cap_delay_lock);
 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
 	spin_lock_init(&mdsc->snap_flush_lock);
-	mdsc->cap_flush_seq = 0;
+	mdsc->last_cap_flush_tid = 1;
+	mdsc->cap_flush_tree = RB_ROOT;
 	INIT_LIST_HEAD(&mdsc->cap_dirty);
 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
 	mdsc->num_cap_flushing = 0;
@@ -3414,6 +3450,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	ceph_caps_init(mdsc);
 	ceph_adjust_min_caps(mdsc, fsc->min_caps);
 
+	init_rwsem(&mdsc->pool_perm_rwsem);
+	mdsc->pool_perm_tree = RB_ROOT;
+
 	return 0;
 }
 
@@ -3423,8 +3462,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
  */
 static void wait_requests(struct ceph_mds_client *mdsc)
 {
+	struct ceph_options *opts = mdsc->fsc->client->options;
 	struct ceph_mds_request *req;
-	struct ceph_fs_client *fsc = mdsc->fsc;
 
 	mutex_lock(&mdsc->mutex);
 	if (__get_oldest_req(mdsc)) {
@@ -3432,7 +3471,7 @@ static void wait_requests(struct ceph_mds_client *mdsc)
 
 		dout("wait_requests waiting for requests\n");
 		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
-				    fsc->client->options->mount_timeout * HZ);
+				    ceph_timeout_jiffies(opts->mount_timeout));
 
 		/* tear down remaining requests */
 		mutex_lock(&mdsc->mutex);
@@ -3485,7 +3524,8 @@ restart:
 			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
 		else
 			nextreq = NULL;
-		if ((req->r_op & CEPH_MDS_OP_WRITE)) {
+		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
+		    (req->r_op & CEPH_MDS_OP_WRITE)) {
 			/* write op */
 			ceph_mdsc_get_request(req);
 			if (nextreq)
@@ -3513,7 +3553,7 @@ restart:
 
 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
-	u64 want_tid, want_flush;
+	u64 want_tid, want_flush, want_snap;
 
 	if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
 		return;
@@ -3525,13 +3565,18 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 
 	ceph_flush_dirty_caps(mdsc);
 	spin_lock(&mdsc->cap_dirty_lock);
-	want_flush = mdsc->cap_flush_seq;
+	want_flush = mdsc->last_cap_flush_tid;
 	spin_unlock(&mdsc->cap_dirty_lock);
 
-	dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
+	down_read(&mdsc->snap_rwsem);
+	want_snap = mdsc->last_snap_seq;
+	up_read(&mdsc->snap_rwsem);
+
+	dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
+	     want_tid, want_flush, want_snap);
 
 	wait_unsafe_requests(mdsc, want_tid);
-	wait_caps_flush(mdsc, want_flush);
+	wait_caps_flush(mdsc, want_flush, want_snap);
 }
 
 /*
@@ -3549,10 +3594,9 @@ static bool done_closing_sessions(struct ceph_mds_client *mdsc)
  */
 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 {
+	struct ceph_options *opts = mdsc->fsc->client->options;
 	struct ceph_mds_session *session;
 	int i;
-	struct ceph_fs_client *fsc = mdsc->fsc;
-	unsigned long timeout = fsc->client->options->mount_timeout * HZ;
 
 	dout("close_sessions\n");
 
@@ -3573,7 +3617,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 
 	dout("waiting for sessions to close\n");
 	wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
-			   timeout);
+			   ceph_timeout_jiffies(opts->mount_timeout));
 
 	/* tear down remaining sessions */
 	mutex_lock(&mdsc->mutex);
@@ -3607,6 +3651,7 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
 		ceph_mdsmap_destroy(mdsc->mdsmap);
 	kfree(mdsc->sessions);
 	ceph_caps_finalize(mdsc);
+	ceph_pool_perm_destroy(mdsc);
 }
 
 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 1875b5d985c6..762757e6cebf 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -139,7 +139,6 @@ struct ceph_mds_session {
 	int		  s_cap_reconnect;
 	int		  s_readonly;
 	struct list_head  s_cap_releases; /* waiting cap_release messages */
-	struct list_head  s_cap_releases_done; /* ready to send */
 	struct ceph_cap  *s_cap_iterator;
 
 	/* protected by mutex */
@@ -228,7 +227,7 @@ struct ceph_mds_request {
 	int r_err;
 	bool r_aborted;
 
-	unsigned long r_timeout;  /* optional.  jiffies */
+	unsigned long r_timeout;  /* optional.  jiffies, 0 is "wait forever" */
 	unsigned long r_started;  /* start time to measure timeout against */
 	unsigned long r_request_started; /* start time for mds request only,
 					    used to measure lease durations */
@@ -254,12 +253,21 @@ struct ceph_mds_request {
 	bool		  r_got_unsafe, r_got_safe, r_got_result;
 
 	bool              r_did_prepopulate;
+	long long	  r_dir_release_cnt;
+	long long	  r_dir_ordered_cnt;
+	int		  r_readdir_cache_idx;
 	u32               r_readdir_offset;
 
 	struct ceph_cap_reservation r_caps_reservation;
 	int r_num_caps;
 };
 
+struct ceph_pool_perm {
+	struct rb_node node;
+	u32 pool;
+	int perm;
+};
+
 /*
  * mds client state
  */
@@ -284,12 +292,15 @@ struct ceph_mds_client {
 	 * references (implying they contain no inodes with caps) that
 	 * should be destroyed.
 	 */
+	u64			last_snap_seq;
 	struct rw_semaphore     snap_rwsem;
 	struct rb_root          snap_realms;
 	struct list_head        snap_empty;
 	spinlock_t              snap_empty_lock;  /* protect snap_empty */
 
 	u64                    last_tid;      /* most recent mds request */
+	u64                    oldest_tid;    /* oldest incomplete mds request,
+						 excluding setfilelock requests */
 	struct rb_root         request_tree;  /* pending mds requests */
 	struct delayed_work    delayed_work;  /* delayed work */
 	unsigned long    last_renew_caps;  /* last time we renewed our caps */
@@ -298,7 +309,8 @@ struct ceph_mds_client {
 	struct list_head snap_flush_list;  /* cap_snaps ready to flush */
 	spinlock_t       snap_flush_lock;
 
-	u64               cap_flush_seq;
+	u64               last_cap_flush_tid;
+	struct rb_root    cap_flush_tree;
 	struct list_head  cap_dirty;        /* inodes with dirty caps */
 	struct list_head  cap_dirty_migrating; /* ...that are migration... */
 	int               num_cap_flushing; /* # caps we are flushing */
@@ -328,6 +340,9 @@ struct ceph_mds_client {
 	spinlock_t	  dentry_lru_lock;
 	struct list_head  dentry_lru;
 	int		  num_dentry;
+
+	struct rw_semaphore     pool_perm_rwsem;
+	struct rb_root		pool_perm_tree;
 };
 
 extern const char *ceph_mds_op_name(int op);
@@ -379,8 +394,6 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
 	kref_put(&req->r_kref, ceph_mdsc_release_request);
 }
 
-extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-				 struct ceph_mds_session *session);
 extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
 				   struct ceph_mds_session *session);
 
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index a97e39f09ba6..233d906aec02 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,7 +296,7 @@ static int cmpu64_rev(const void *a, const void *b)
 }
 
 
-static struct ceph_snap_context *empty_snapc;
+struct ceph_snap_context *ceph_empty_snapc;
 
 /*
  * build the snap context for a given realm.
@@ -338,9 +338,9 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 		return 0;
 	}
 
-	if (num == 0 && realm->seq == empty_snapc->seq) {
-		ceph_get_snap_context(empty_snapc);
-		snapc = empty_snapc;
+	if (num == 0 && realm->seq == ceph_empty_snapc->seq) {
+		ceph_get_snap_context(ceph_empty_snapc);
+		snapc = ceph_empty_snapc;
 		goto done;
 	}
 
@@ -436,6 +436,14 @@ static int dup_array(u64 **dst, __le64 *src, u32 num)
 	return 0;
 }
 
+static bool has_new_snaps(struct ceph_snap_context *o,
+			  struct ceph_snap_context *n)
+{
+	if (n->num_snaps == 0)
+		return false;
+	/* snaps are in descending order */
+	return n->snaps[0] > o->seq;
+}
 
 /*
  * When a snapshot is applied, the size/mtime inode metadata is queued
@@ -455,6 +463,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 {
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap_snap *capsnap;
+	struct ceph_snap_context *old_snapc, *new_snapc;
 	int used, dirty;
 
 	capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
@@ -467,6 +476,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 	used = __ceph_caps_used(ci);
 	dirty = __ceph_caps_dirty(ci);
 
+	old_snapc = ci->i_head_snapc;
+	new_snapc = ci->i_snap_realm->cached_context;
+
 	/*
 	 * If there is a write in progress, treat that as a dirty Fw,
 	 * even though it hasn't completed yet; by the time we finish
@@ -481,76 +493,95 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 		   writes in progress now were started before the previous
 		   cap_snap.  lucky us. */
 		dout("queue_cap_snap %p already pending\n", inode);
-		kfree(capsnap);
-	} else if (ci->i_snap_realm->cached_context == empty_snapc) {
-		dout("queue_cap_snap %p empty snapc\n", inode);
-		kfree(capsnap);
-	} else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
-			    CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
-		struct ceph_snap_context *snapc = ci->i_head_snapc;
-
-		/*
-		 * if we are a sync write, we may need to go to the snaprealm
-		 * to get the current snapc.
-		 */
-		if (!snapc)
-			snapc = ci->i_snap_realm->cached_context;
+		goto update_snapc;
+	}
+	if (ci->i_wrbuffer_ref_head == 0 &&
+	    !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) {
+		dout("queue_cap_snap %p nothing dirty|writing\n", inode);
+		goto update_snapc;
+	}
 
-		dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
-		     inode, capsnap, snapc, ceph_cap_string(dirty));
-		ihold(inode);
+	BUG_ON(!old_snapc);
 
-		atomic_set(&capsnap->nref, 1);
-		capsnap->ci = ci;
-		INIT_LIST_HEAD(&capsnap->ci_item);
-		INIT_LIST_HEAD(&capsnap->flushing_item);
-
-		capsnap->follows = snapc->seq;
-		capsnap->issued = __ceph_caps_issued(ci, NULL);
-		capsnap->dirty = dirty;
-
-		capsnap->mode = inode->i_mode;
-		capsnap->uid = inode->i_uid;
-		capsnap->gid = inode->i_gid;
-
-		if (dirty & CEPH_CAP_XATTR_EXCL) {
-			__ceph_build_xattrs_blob(ci);
-			capsnap->xattr_blob =
-				ceph_buffer_get(ci->i_xattrs.blob);
-			capsnap->xattr_version = ci->i_xattrs.version;
-		} else {
-			capsnap->xattr_blob = NULL;
-			capsnap->xattr_version = 0;
+	/*
+	 * There is no need to send FLUSHSNAP message to MDS if there is
+	 * no new snapshot. But when there is dirty pages or on-going
+	 * writes, we still need to create cap_snap. cap_snap is needed
+	 * by the write path and page writeback path.
+	 *
+	 * also see ceph_try_drop_cap_snap()
+	 */
+	if (has_new_snaps(old_snapc, new_snapc)) {
+		if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))
+			capsnap->need_flush = true;
+	} else {
+		if (!(used & CEPH_CAP_FILE_WR) &&
+		    ci->i_wrbuffer_ref_head == 0) {
+			dout("queue_cap_snap %p "
+			     "no new_snap|dirty_page|writing\n", inode);
+			goto update_snapc;
 		}
+	}
 
-		capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
-
-		/* dirty page count moved from _head to this cap_snap;
-		   all subsequent writes page dirties occur _after_ this
-		   snapshot. */
-		capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
-		ci->i_wrbuffer_ref_head = 0;
-		capsnap->context = snapc;
-		ci->i_head_snapc =
-			ceph_get_snap_context(ci->i_snap_realm->cached_context);
-		dout(" new snapc is %p\n", ci->i_head_snapc);
-		list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
-
-		if (used & CEPH_CAP_FILE_WR) {
-			dout("queue_cap_snap %p cap_snap %p snapc %p"
-			     " seq %llu used WR, now pending\n", inode,
-			     capsnap, snapc, snapc->seq);
-			capsnap->writing = 1;
-		} else {
-			/* note mtime, size NOW. */
-			__ceph_finish_cap_snap(ci, capsnap);
-		}
+	dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n",
+	     inode, capsnap, old_snapc, ceph_cap_string(dirty),
+	     capsnap->need_flush ? "" : "no_flush");
+	ihold(inode);
+
+	atomic_set(&capsnap->nref, 1);
+	capsnap->ci = ci;
+	INIT_LIST_HEAD(&capsnap->ci_item);
+	INIT_LIST_HEAD(&capsnap->flushing_item);
+
+	capsnap->follows = old_snapc->seq;
+	capsnap->issued = __ceph_caps_issued(ci, NULL);
+	capsnap->dirty = dirty;
+
+	capsnap->mode = inode->i_mode;
+	capsnap->uid = inode->i_uid;
+	capsnap->gid = inode->i_gid;
+
+	if (dirty & CEPH_CAP_XATTR_EXCL) {
+		__ceph_build_xattrs_blob(ci);
+		capsnap->xattr_blob =
+			ceph_buffer_get(ci->i_xattrs.blob);
+		capsnap->xattr_version = ci->i_xattrs.version;
 	} else {
-		dout("queue_cap_snap %p nothing dirty|writing\n", inode);
-		kfree(capsnap);
+		capsnap->xattr_blob = NULL;
+		capsnap->xattr_version = 0;
 	}
 
+	capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
+	/* dirty page count moved from _head to this cap_snap;
+	   all subsequent writes page dirties occur _after_ this
+	   snapshot. */
+	capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
+	ci->i_wrbuffer_ref_head = 0;
+	capsnap->context = old_snapc;
+	list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
+	old_snapc = NULL;
+
+	if (used & CEPH_CAP_FILE_WR) {
+		dout("queue_cap_snap %p cap_snap %p snapc %p"
+		     " seq %llu used WR, now pending\n", inode,
+		     capsnap, old_snapc, old_snapc->seq);
+		capsnap->writing = 1;
+	} else {
+		/* note mtime, size NOW. */
+		__ceph_finish_cap_snap(ci, capsnap);
+	}
+	capsnap = NULL;
+
+update_snapc:
+	if (ci->i_head_snapc) {
+		ci->i_head_snapc = ceph_get_snap_context(new_snapc);
+		dout(" new snapc is %p\n", new_snapc);
+	}
 	spin_unlock(&ci->i_ceph_lock);
+
+	kfree(capsnap);
+	ceph_put_snap_context(old_snapc);
 }
 
 /*
@@ -699,6 +730,8 @@ more:
 
 		/* queue realm for cap_snap creation */
 		list_add(&realm->dirty_item, &dirty_realms);
+		if (realm->seq > mdsc->last_snap_seq)
+			mdsc->last_snap_seq = realm->seq;
 
 		invalidate = 1;
 	} else if (!realm->cached_context) {
@@ -964,14 +997,14 @@ out:
 
 int __init ceph_snap_init(void)
 {
-	empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
-	if (!empty_snapc)
+	ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
+	if (!ceph_empty_snapc)
 		return -ENOMEM;
-	empty_snapc->seq = 1;
+	ceph_empty_snapc->seq = 1;
 	return 0;
 }
 
 void ceph_snap_exit(void)
 {
-	ceph_put_snap_context(empty_snapc);
+	ceph_put_snap_context(ceph_empty_snapc);
 }
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 4e9905374078..d1c833c321b9 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -134,10 +134,12 @@ enum {
 	Opt_noino32,
 	Opt_fscache,
 	Opt_nofscache,
+	Opt_poolperm,
+	Opt_nopoolperm,
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 	Opt_acl,
 #endif
-	Opt_noacl
+	Opt_noacl,
 };
 
 static match_table_t fsopt_tokens = {
@@ -165,6 +167,8 @@ static match_table_t fsopt_tokens = {
 	{Opt_noino32, "noino32"},
 	{Opt_fscache, "fsc"},
 	{Opt_nofscache, "nofsc"},
+	{Opt_poolperm, "poolperm"},
+	{Opt_nopoolperm, "nopoolperm"},
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 	{Opt_acl, "acl"},
 #endif
@@ -268,6 +272,13 @@ static int parse_fsopt_token(char *c, void *private)
 	case Opt_nofscache:
 		fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE;
 		break;
+	case Opt_poolperm:
+		fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM;
+		printk ("pool perm");
+		break;
+	case Opt_nopoolperm:
+		fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM;
+		break;
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 	case Opt_acl:
 		fsopt->sb_flags |= MS_POSIXACL;
@@ -436,6 +447,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_puts(m, ",nodcache");
 	if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE)
 		seq_puts(m, ",fsc");
+	if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM)
+		seq_puts(m, ",nopoolperm");
 
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 	if (fsopt->sb_flags & MS_POSIXACL)
@@ -609,6 +622,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
  */
 struct kmem_cache *ceph_inode_cachep;
 struct kmem_cache *ceph_cap_cachep;
+struct kmem_cache *ceph_cap_flush_cachep;
 struct kmem_cache *ceph_dentry_cachep;
 struct kmem_cache *ceph_file_cachep;
 
@@ -634,6 +648,10 @@ static int __init init_caches(void)
 				     SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
 	if (ceph_cap_cachep == NULL)
 		goto bad_cap;
+	ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush,
+					   SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
+	if (ceph_cap_flush_cachep == NULL)
+		goto bad_cap_flush;
 
 	ceph_dentry_cachep = KMEM_CACHE(ceph_dentry_info,
 					SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
@@ -652,6 +670,8 @@ static int __init init_caches(void)
 bad_file:
 	kmem_cache_destroy(ceph_dentry_cachep);
 bad_dentry:
+	kmem_cache_destroy(ceph_cap_flush_cachep);
+bad_cap_flush:
 	kmem_cache_destroy(ceph_cap_cachep);
 bad_cap:
 	kmem_cache_destroy(ceph_inode_cachep);
@@ -668,6 +688,7 @@ static void destroy_caches(void)
 
 	kmem_cache_destroy(ceph_inode_cachep);
 	kmem_cache_destroy(ceph_cap_cachep);
+	kmem_cache_destroy(ceph_cap_flush_cachep);
 	kmem_cache_destroy(ceph_dentry_cachep);
 	kmem_cache_destroy(ceph_file_cachep);
 
@@ -729,7 +750,7 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc,
 	req->r_ino1.ino = CEPH_INO_ROOT;
 	req->r_ino1.snap = CEPH_NOSNAP;
 	req->r_started = started;
-	req->r_timeout = fsc->client->options->mount_timeout * HZ;
+	req->r_timeout = fsc->client->options->mount_timeout;
 	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
 	req->r_num_caps = 2;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index fa20e1318939..860cc016e70d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -35,6 +35,7 @@
 #define CEPH_MOUNT_OPT_INO32           (1<<8) /* 32 bit inos */
 #define CEPH_MOUNT_OPT_DCACHE          (1<<9) /* use dcache for readdir etc */
 #define CEPH_MOUNT_OPT_FSCACHE         (1<<10) /* use fscache */
+#define CEPH_MOUNT_OPT_NOPOOLPERM      (1<<11) /* no pool permission check */
 
 #define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES | \
 				   CEPH_MOUNT_OPT_DCACHE)
@@ -121,11 +122,21 @@ struct ceph_cap {
 	struct rb_node ci_node;          /* per-ci cap tree */
 	struct ceph_mds_session *session;
 	struct list_head session_caps;   /* per-session caplist */
-	int mds;
 	u64 cap_id;       /* unique cap id (mds provided) */
-	int issued;       /* latest, from the mds */
-	int implemented;  /* implemented superset of issued (for revocation) */
-	int mds_wanted;
+	union {
+		/* in-use caps */
+		struct {
+			int issued;       /* latest, from the mds */
+			int implemented;  /* implemented superset of
+					     issued (for revocation) */
+			int mds, mds_wanted;
+		};
+		/* caps to release */
+		struct {
+			u64 cap_ino;
+			int queue_release;
+		};
+	};
 	u32 seq, issue_seq, mseq;
 	u32 cap_gen;      /* active/stale cycle */
 	unsigned long last_used;
@@ -163,6 +174,7 @@ struct ceph_cap_snap {
 	int writing;   /* a sync write is still in progress */
 	int dirty_pages;     /* dirty pages awaiting writeback */
 	bool inline_data;
+	bool need_flush;
 };
 
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -174,6 +186,17 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
 	}
 }
 
+struct ceph_cap_flush {
+	u64 tid;
+	int caps;
+	bool kick;
+	struct rb_node g_node; // global
+	union {
+		struct rb_node i_node; // inode
+		struct list_head list;
+	};
+};
+
 /*
  * The frag tree describes how a directory is fragmented, potentially across
  * multiple metadata servers.  It is also used to indicate points where
@@ -259,9 +282,9 @@ struct ceph_inode_info {
 	u32 i_time_warp_seq;
 
 	unsigned i_ceph_flags;
-	int i_ordered_count;
-	atomic_t i_release_count;
-	atomic_t i_complete_count;
+	atomic64_t i_release_count;
+	atomic64_t i_ordered_count;
+	atomic64_t i_complete_seq[2];
 
 	struct ceph_dir_layout i_dir_layout;
 	struct ceph_file_layout i_layout;
@@ -283,11 +306,11 @@ struct ceph_inode_info {
 	struct ceph_cap *i_auth_cap;     /* authoritative cap, if any */
 	unsigned i_dirty_caps, i_flushing_caps;     /* mask of dirtied fields */
 	struct list_head i_dirty_item, i_flushing_item;
-	u64 i_cap_flush_seq;
 	/* we need to track cap writeback on a per-cap-bit basis, to allow
 	 * overlapping, pipelined cap flushes to the mds.  we can probably
 	 * reduce the tid to 8 bits if we're concerned about inode size. */
-	u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS];
+	struct ceph_cap_flush *i_prealloc_cap_flush;
+	struct rb_root i_cap_flush_tree;
 	wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
 	unsigned long i_hold_caps_min; /* jiffies */
 	unsigned long i_hold_caps_max; /* jiffies */
@@ -438,36 +461,46 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 /*
  * Ceph inode.
  */
-#define CEPH_I_DIR_ORDERED	1  /* dentries in dir are ordered */
-#define CEPH_I_NODELAY		4  /* do not delay cap release */
-#define CEPH_I_FLUSH		8  /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH		16 /* do not flush dirty caps */
+#define CEPH_I_DIR_ORDERED	(1 << 0)  /* dentries in dir are ordered */
+#define CEPH_I_NODELAY		(1 << 1)  /* do not delay cap release */
+#define CEPH_I_FLUSH		(1 << 2)  /* do not delay flush of dirty metadata */
+#define CEPH_I_NOFLUSH		(1 << 3)  /* do not flush dirty caps */
+#define CEPH_I_POOL_PERM	(1 << 4)  /* pool rd/wr bits are valid */
+#define CEPH_I_POOL_RD		(1 << 5)  /* can read from pool */
+#define CEPH_I_POOL_WR		(1 << 6)  /* can write to pool */
+
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
-					   int release_count, int ordered_count)
+					   long long release_count,
+					   long long ordered_count)
 {
-	atomic_set(&ci->i_complete_count, release_count);
-	if (ci->i_ordered_count == ordered_count)
-		ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
-	else
-		ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
+	smp_mb__before_atomic();
+	atomic64_set(&ci->i_complete_seq[0], release_count);
+	atomic64_set(&ci->i_complete_seq[1], ordered_count);
 }
 
 static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
 {
-	atomic_inc(&ci->i_release_count);
+	atomic64_inc(&ci->i_release_count);
+}
+
+static inline void __ceph_dir_clear_ordered(struct ceph_inode_info *ci)
+{
+	atomic64_inc(&ci->i_ordered_count);
 }
 
 static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci)
 {
-	return atomic_read(&ci->i_complete_count) ==
-		atomic_read(&ci->i_release_count);
+	return atomic64_read(&ci->i_complete_seq[0]) ==
+		atomic64_read(&ci->i_release_count);
 }
 
 static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
 {
-	return __ceph_dir_is_complete(ci) &&
-		(ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
+	return  atomic64_read(&ci->i_complete_seq[0]) ==
+		atomic64_read(&ci->i_release_count) &&
+		atomic64_read(&ci->i_complete_seq[1]) ==
+		atomic64_read(&ci->i_ordered_count);
 }
 
 static inline void ceph_dir_clear_complete(struct inode *inode)
@@ -477,20 +510,13 @@ static inline void ceph_dir_clear_complete(struct inode *inode)
 
 static inline void ceph_dir_clear_ordered(struct inode *inode)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	spin_lock(&ci->i_ceph_lock);
-	ci->i_ordered_count++;
-	ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
-	spin_unlock(&ci->i_ceph_lock);
+	__ceph_dir_clear_ordered(ceph_inode(inode));
 }
 
 static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	bool ret;
-	spin_lock(&ci->i_ceph_lock);
-	ret = __ceph_dir_is_complete_ordered(ci);
-	spin_unlock(&ci->i_ceph_lock);
+	bool ret = __ceph_dir_is_complete_ordered(ceph_inode(inode));
+	smp_rmb();
 	return ret;
 }
 
@@ -552,7 +578,10 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci)
 {
 	return ci->i_dirty_caps | ci->i_flushing_caps;
 }
-extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask);
+extern struct ceph_cap_flush *ceph_alloc_cap_flush(void);
+extern void ceph_free_cap_flush(struct ceph_cap_flush *cf);
+extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
+				  struct ceph_cap_flush **pcf);
 
 extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
 				      struct ceph_cap *ocap, int mask);
@@ -606,16 +635,20 @@ struct ceph_file_info {
 	unsigned offset;       /* offset of last chunk, adjusted for . and .. */
 	unsigned next_offset;  /* offset of next chunk (last_name's + 1) */
 	char *last_name;       /* last entry in previous chunk */
-	struct dentry *dentry; /* next dentry (for dcache readdir) */
-	int dir_release_count;
-	int dir_ordered_count;
+	long long dir_release_count;
+	long long dir_ordered_count;
+	int readdir_cache_idx;
 
 	/* used for -o dirstat read() on directory thing */
 	char *dir_info;
 	int dir_info_len;
 };
 
-
+struct ceph_readdir_cache_control {
+	struct page  *page;
+	struct dentry **dentries;
+	int index;
+};
 
 /*
  * A "snap realm" describes a subset of the file hierarchy sharing
@@ -687,6 +720,7 @@ static inline int default_congestion_kb(void)
 
 
 /* snap.c */
+extern struct ceph_snap_context *ceph_empty_snapc;
 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
 					       u64 ino);
 extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
@@ -713,8 +747,8 @@ extern void ceph_snap_exit(void);
 static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci)
 {
 	return !list_empty(&ci->i_cap_snaps) &&
-		list_entry(ci->i_cap_snaps.prev, struct ceph_cap_snap,
-			   ci_item)->writing;
+	       list_last_entry(&ci->i_cap_snaps, struct ceph_cap_snap,
+			       ci_item)->writing;
 }
 
 /* inode.c */
@@ -838,12 +872,12 @@ extern void ceph_put_cap(struct ceph_mds_client *mdsc,
 			 struct ceph_cap *cap);
 extern int ceph_is_any_caps(struct inode *inode);
 
-extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino,
-				u64 cap_id, u32 migrate_seq, u32 issue_seq);
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
 		      int datasync);
+extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
+					  struct ceph_mds_session *session);
 extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 				    struct ceph_mds_session *session);
 extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
@@ -879,6 +913,9 @@ extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
 /* addr.c */
 extern const struct address_space_operations ceph_aops;
 extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
+extern int ceph_uninline_data(struct file *filp, struct page *locked_page);
+extern int ceph_pool_perm_check(struct ceph_inode_info *ci, int need);
+extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);
 
 /* file.c */
 extern const struct file_operations ceph_file_fops;
@@ -890,7 +927,6 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 extern int ceph_release(struct inode *inode, struct file *filp);
 extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
 				  char *data, size_t len);
-int ceph_uninline_data(struct file *filp, struct page *locked_page);
 /* dir.c */
 extern const struct file_operations ceph_dir_fops;
 extern const struct file_operations ceph_snapdir_fops;
@@ -911,6 +947,7 @@ extern void ceph_dentry_lru_del(struct dentry *dn);
 extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
 extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
 extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
+extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
 
 /*
  * our d_ops vary depending on whether the inode is live,
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index cd7ffad4041d..819163d8313b 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -911,6 +911,8 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
 	struct inode *inode = d_inode(dentry);
 	struct ceph_vxattr *vxattr;
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+	struct ceph_cap_flush *prealloc_cf = NULL;
 	int issued;
 	int err;
 	int dirty = 0;
@@ -920,6 +922,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
 	char *newval = NULL;
 	struct ceph_inode_xattr *xattr = NULL;
 	int required_blob_size;
+	bool lock_snap_rwsem = false;
 
 	if (!ceph_is_valid_xattr(name))
 		return -EOPNOTSUPP;
@@ -948,12 +951,27 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
 	if (!xattr)
 		goto out;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		goto out;
+
 	spin_lock(&ci->i_ceph_lock);
 retry:
 	issued = __ceph_caps_issued(ci, NULL);
-	dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
 	if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
 		goto do_sync;
+
+	if (!lock_snap_rwsem && !ci->i_head_snapc) {
+		lock_snap_rwsem = true;
+		if (!down_read_trylock(&mdsc->snap_rwsem)) {
+			spin_unlock(&ci->i_ceph_lock);
+			down_read(&mdsc->snap_rwsem);
+			spin_lock(&ci->i_ceph_lock);
+			goto retry;
+		}
+	}
+
+	dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
 	__build_xattrs(inode);
 
 	required_blob_size = __get_required_blob_size(ci, name_len, val_len);
@@ -966,7 +984,7 @@ retry:
 		dout(" preaallocating new blob size=%d\n", required_blob_size);
 		blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
 		if (!blob)
-			goto out;
+			goto do_sync_unlocked;
 		spin_lock(&ci->i_ceph_lock);
 		if (ci->i_xattrs.prealloc_blob)
 			ceph_buffer_put(ci->i_xattrs.prealloc_blob);
@@ -978,21 +996,28 @@ retry:
 			  flags, value ? 1 : -1, &xattr);
 
 	if (!err) {
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
+					       &prealloc_cf);
 		ci->i_xattrs.dirty = true;
 		inode->i_ctime = CURRENT_TIME;
 	}
 
 	spin_unlock(&ci->i_ceph_lock);
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 	if (dirty)
 		__mark_inode_dirty(inode, dirty);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
 do_sync_unlocked:
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 	err = ceph_sync_setxattr(dentry, name, value, size, flags);
 out:
+	ceph_free_cap_flush(prealloc_cf);
 	kfree(newname);
 	kfree(newval);
 	kfree(xattr);
@@ -1044,10 +1069,13 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
 	struct inode *inode = d_inode(dentry);
 	struct ceph_vxattr *vxattr;
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
+	struct ceph_cap_flush *prealloc_cf = NULL;
 	int issued;
 	int err;
 	int required_blob_size;
 	int dirty;
+	bool lock_snap_rwsem = false;
 
 	if (!ceph_is_valid_xattr(name))
 		return -EOPNOTSUPP;
@@ -1060,14 +1088,29 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
 	if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN))
 		goto do_sync_unlocked;
 
+	prealloc_cf = ceph_alloc_cap_flush();
+	if (!prealloc_cf)
+		return -ENOMEM;
+
 	err = -ENOMEM;
 	spin_lock(&ci->i_ceph_lock);
 retry:
 	issued = __ceph_caps_issued(ci, NULL);
-	dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
-
 	if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
 		goto do_sync;
+
+	if (!lock_snap_rwsem && !ci->i_head_snapc) {
+		lock_snap_rwsem = true;
+		if (!down_read_trylock(&mdsc->snap_rwsem)) {
+			spin_unlock(&ci->i_ceph_lock);
+			down_read(&mdsc->snap_rwsem);
+			spin_lock(&ci->i_ceph_lock);
+			goto retry;
+		}
+	}
+
+	dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
+
 	__build_xattrs(inode);
 
 	required_blob_size = __get_required_blob_size(ci, 0, 0);
@@ -1080,7 +1123,7 @@ retry:
 		dout(" preaallocating new blob size=%d\n", required_blob_size);
 		blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
 		if (!blob)
-			goto out;
+			goto do_sync_unlocked;
 		spin_lock(&ci->i_ceph_lock);
 		if (ci->i_xattrs.prealloc_blob)
 			ceph_buffer_put(ci->i_xattrs.prealloc_blob);
@@ -1090,18 +1133,24 @@ retry:
 
 	err = __remove_xattr_by_name(ceph_inode(inode), name);
 
-	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
+	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
+				       &prealloc_cf);
 	ci->i_xattrs.dirty = true;
 	inode->i_ctime = CURRENT_TIME;
 	spin_unlock(&ci->i_ceph_lock);
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 	if (dirty)
 		__mark_inode_dirty(inode, dirty);
+	ceph_free_cap_flush(prealloc_cf);
 	return err;
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
 do_sync_unlocked:
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
+	ceph_free_cap_flush(prealloc_cf);
 	err = ceph_send_removexattr(dentry, name);
-out:
 	return err;
 }