summary refs log tree commit diff
path: root/fs/ceph/caps.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/caps.c')
-rw-r--r--fs/ceph/caps.c536
1 files changed, 292 insertions, 244 deletions
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 28ae0c134700..185db76300b3 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -490,13 +490,10 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
 			       struct ceph_inode_info *ci)
 {
 	struct ceph_mount_options *opt = mdsc->fsc->mount_options;
-
-	ci->i_hold_caps_min = round_jiffies(jiffies +
-					    opt->caps_wanted_delay_min * HZ);
 	ci->i_hold_caps_max = round_jiffies(jiffies +
 					    opt->caps_wanted_delay_max * HZ);
-	dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
-	     ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
+	dout("__cap_set_timeouts %p %lu\n", &ci->vfs_inode,
+	     ci->i_hold_caps_max - jiffies);
 }
 
 /*
@@ -508,10 +505,9 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
  *    -> we take mdsc->cap_delay_lock
  */
 static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
-				struct ceph_inode_info *ci,
-				bool set_timeout)
+				struct ceph_inode_info *ci)
 {
-	dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
+	dout("__cap_delay_requeue %p flags 0x%lx at %lu\n", &ci->vfs_inode,
 	     ci->i_ceph_flags, ci->i_hold_caps_max);
 	if (!mdsc->stopping) {
 		spin_lock(&mdsc->cap_delay_lock);
@@ -520,8 +516,7 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
 				goto no_change;
 			list_del_init(&ci->i_cap_delay_list);
 		}
-		if (set_timeout)
-			__cap_set_timeouts(mdsc, ci);
+		__cap_set_timeouts(mdsc, ci);
 		list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
 no_change:
 		spin_unlock(&mdsc->cap_delay_lock);
@@ -561,19 +556,20 @@ static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
 	spin_unlock(&mdsc->cap_delay_lock);
 }
 
-/*
- * Common issue checks for add_cap, handle_cap_grant.
- */
+/* Common issue checks for add_cap, handle_cap_grant. */
 static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 			      unsigned issued)
 {
 	unsigned had = __ceph_caps_issued(ci, NULL);
 
+	lockdep_assert_held(&ci->i_ceph_lock);
+
 	/*
 	 * Each time we receive FILE_CACHE anew, we increment
 	 * i_rdcache_gen.
 	 */
-	if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+	if (S_ISREG(ci->vfs_inode.i_mode) &&
+	    (issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
 	    (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
 		ci->i_rdcache_gen++;
 	}
@@ -592,6 +588,13 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 			__ceph_dir_clear_complete(ci);
 		}
 	}
+
+	/* Wipe saved layout if we're losing DIR_CREATE caps */
+	if (S_ISDIR(ci->vfs_inode.i_mode) && (had & CEPH_CAP_DIR_CREATE) &&
+		!(issued & CEPH_CAP_DIR_CREATE)) {
+	     ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
+	     memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
+	}
 }
 
 /*
@@ -605,7 +608,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
  */
 void ceph_add_cap(struct inode *inode,
 		  struct ceph_mds_session *session, u64 cap_id,
-		  int fmode, unsigned issued, unsigned wanted,
+		  unsigned issued, unsigned wanted,
 		  unsigned seq, unsigned mseq, u64 realmino, int flags,
 		  struct ceph_cap **new_cap)
 {
@@ -621,13 +624,6 @@ void ceph_add_cap(struct inode *inode,
 	dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
 	     session->s_mds, cap_id, ceph_cap_string(issued), seq);
 
-	/*
-	 * If we are opening the file, include file mode wanted bits
-	 * in wanted.
-	 */
-	if (fmode >= 0)
-		wanted |= ceph_caps_for_mode(fmode);
-
 	spin_lock(&session->s_gen_ttl_lock);
 	gen = session->s_cap_gen;
 	spin_unlock(&session->s_gen_ttl_lock);
@@ -725,7 +721,7 @@ void ceph_add_cap(struct inode *inode,
 		dout(" issued %s, mds wanted %s, actual %s, queueing\n",
 		     ceph_cap_string(issued), ceph_cap_string(wanted),
 		     ceph_cap_string(actual_wanted));
-		__cap_delay_requeue(mdsc, ci, true);
+		__cap_delay_requeue(mdsc, ci);
 	}
 
 	if (flags & CEPH_CAP_FLAG_AUTH) {
@@ -752,9 +748,6 @@ void ceph_add_cap(struct inode *inode,
 	cap->issue_seq = seq;
 	cap->mseq = mseq;
 	cap->cap_gen = gen;
-
-	if (fmode >= 0)
-		__ceph_get_fmode(ci, fmode);
 }
 
 /*
@@ -958,29 +951,97 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
 	if (ci->i_rd_ref)
 		used |= CEPH_CAP_FILE_RD;
 	if (ci->i_rdcache_ref ||
-	    (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
+	    (S_ISREG(ci->vfs_inode.i_mode) &&
 	     ci->vfs_inode.i_data.nrpages))
 		used |= CEPH_CAP_FILE_CACHE;
 	if (ci->i_wr_ref)
 		used |= CEPH_CAP_FILE_WR;
 	if (ci->i_wb_ref || ci->i_wrbuffer_ref)
 		used |= CEPH_CAP_FILE_BUFFER;
+	if (ci->i_fx_ref)
+		used |= CEPH_CAP_FILE_EXCL;
 	return used;
 }
 
+#define FMODE_WAIT_BIAS 1000
+
 /*
  * wanted, by virtue of open file modes
  */
 int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
 {
-	int i, bits = 0;
-	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
-		if (ci->i_nr_by_mode[i])
-			bits |= 1 << i;
+	const int PIN_SHIFT = ffs(CEPH_FILE_MODE_PIN);
+	const int RD_SHIFT = ffs(CEPH_FILE_MODE_RD);
+	const int WR_SHIFT = ffs(CEPH_FILE_MODE_WR);
+	const int LAZY_SHIFT = ffs(CEPH_FILE_MODE_LAZY);
+	struct ceph_mount_options *opt =
+		ceph_inode_to_client(&ci->vfs_inode)->mount_options;
+	unsigned long used_cutoff = jiffies - opt->caps_wanted_delay_max * HZ;
+	unsigned long idle_cutoff = jiffies - opt->caps_wanted_delay_min * HZ;
+
+	if (S_ISDIR(ci->vfs_inode.i_mode)) {
+		int want = 0;
+
+		/* use used_cutoff here, to keep dir's wanted caps longer */
+		if (ci->i_nr_by_mode[RD_SHIFT] > 0 ||
+		    time_after(ci->i_last_rd, used_cutoff))
+			want |= CEPH_CAP_ANY_SHARED;
+
+		if (ci->i_nr_by_mode[WR_SHIFT] > 0 ||
+		    time_after(ci->i_last_wr, used_cutoff)) {
+			want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			if (opt->flags & CEPH_MOUNT_OPT_ASYNC_DIROPS)
+				want |= CEPH_CAP_ANY_DIR_OPS;
+		}
+
+		if (want || ci->i_nr_by_mode[PIN_SHIFT] > 0)
+			want |= CEPH_CAP_PIN;
+
+		return want;
+	} else {
+		int bits = 0;
+
+		if (ci->i_nr_by_mode[RD_SHIFT] > 0) {
+			if (ci->i_nr_by_mode[RD_SHIFT] >= FMODE_WAIT_BIAS ||
+			    time_after(ci->i_last_rd, used_cutoff))
+				bits |= 1 << RD_SHIFT;
+		} else if (time_after(ci->i_last_rd, idle_cutoff)) {
+			bits |= 1 << RD_SHIFT;
+		}
+
+		if (ci->i_nr_by_mode[WR_SHIFT] > 0) {
+			if (ci->i_nr_by_mode[WR_SHIFT] >= FMODE_WAIT_BIAS ||
+			    time_after(ci->i_last_wr, used_cutoff))
+				bits |= 1 << WR_SHIFT;
+		} else if (time_after(ci->i_last_wr, idle_cutoff)) {
+			bits |= 1 << WR_SHIFT;
+		}
+
+		/* check lazyio only when read/write is wanted */
+		if ((bits & (CEPH_FILE_MODE_RDWR << 1)) &&
+		    ci->i_nr_by_mode[LAZY_SHIFT] > 0)
+			bits |= 1 << LAZY_SHIFT;
+
+		return bits ? ceph_caps_for_mode(bits >> 1) : 0;
 	}
-	if (bits == 0)
-		return 0;
-	return ceph_caps_for_mode(bits >> 1);
+}
+
+/*
+ * wanted, by virtue of open file modes AND cap refs (buffered/cached data)
+ */
+int __ceph_caps_wanted(struct ceph_inode_info *ci)
+{
+	int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
+	if (S_ISDIR(ci->vfs_inode.i_mode)) {
+		/* we want EXCL if holding caps of dir ops */
+		if (w & CEPH_CAP_ANY_DIR_OPS)
+			w |= CEPH_CAP_FILE_EXCL;
+	} else {
+		/* we want EXCL if dirty data */
+		if (w & CEPH_CAP_FILE_BUFFER)
+			w |= CEPH_CAP_FILE_EXCL;
+	}
+	return w;
 }
 
 /*
@@ -1004,14 +1065,6 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
 	return mds_wanted;
 }
 
-/*
- * called under i_ceph_lock
- */
-static int __ceph_is_single_caps(struct ceph_inode_info *ci)
-{
-	return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
-}
-
 int ceph_is_any_caps(struct inode *inode)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
@@ -1274,9 +1327,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	struct cap_msg_args arg;
 	int held, revoking;
 	int wake = 0;
-	int delayed = 0;
 	int ret;
 
+	/* Don't send anything if it's still being created. Return delayed */
+	if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE) {
+		spin_unlock(&ci->i_ceph_lock);
+		dout("%s async create in flight for %p\n", __func__, inode);
+		return 1;
+	}
+
 	held = cap->issued | cap->implemented;
 	revoking = cap->implemented & ~cap->issued;
 	retain &= ~revoking;
@@ -1287,28 +1346,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	     ceph_cap_string(revoking));
 	BUG_ON((retain & CEPH_CAP_PIN) == 0);
 
-	arg.session = cap->session;
-
-	/* don't release wanted unless we've waited a bit. */
-	if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
-	    time_before(jiffies, ci->i_hold_caps_min)) {
-		dout(" delaying issued %s -> %s, wanted %s -> %s on send\n",
-		     ceph_cap_string(cap->issued),
-		     ceph_cap_string(cap->issued & retain),
-		     ceph_cap_string(cap->mds_wanted),
-		     ceph_cap_string(want));
-		want |= cap->mds_wanted;
-		retain |= cap->issued;
-		delayed = 1;
-	}
-	ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH);
-	if (want & ~cap->mds_wanted) {
-		/* user space may open/close single file frequently.
-		 * This avoids droping mds_wanted immediately after
-		 * requesting new mds_wanted.
-		 */
-		__cap_set_timeouts(mdsc, ci);
-	}
+	ci->i_ceph_flags &= ~CEPH_I_FLUSH;
 
 	cap->issued &= retain;  /* drop bits we don't want */
 	if (cap->implemented & ~cap->issued) {
@@ -1323,6 +1361,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	cap->implemented &= cap->issued | used;
 	cap->mds_wanted = want;
 
+	arg.session = cap->session;
 	arg.ino = ceph_vino(inode).ino;
 	arg.cid = cap->cap_id;
 	arg.follows = flushing ? ci->i_head_snapc->seq : 0;
@@ -1332,7 +1371,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	arg.size = inode->i_size;
 	ci->i_reported_size = arg.size;
 	arg.max_size = ci->i_wanted_max_size;
-	ci->i_requested_max_size = arg.max_size;
+	if (cap == ci->i_auth_cap)
+		ci->i_requested_max_size = arg.max_size;
 
 	if (flushing & CEPH_CAP_XATTR_EXCL) {
 		old_blob = __ceph_build_xattrs_blob(ci);
@@ -1383,14 +1423,19 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 
 	ret = send_cap_msg(&arg);
 	if (ret < 0) {
-		dout("error sending cap msg, must requeue %p\n", inode);
-		delayed = 1;
+		pr_err("error sending cap msg, ino (%llx.%llx) "
+		       "flushing %s tid %llu, requeue\n",
+		       ceph_vinop(inode), ceph_cap_string(flushing),
+		       flush_tid);
+		spin_lock(&ci->i_ceph_lock);
+		__cap_delay_requeue(mdsc, ci);
+		spin_unlock(&ci->i_ceph_lock);
 	}
 
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 
-	return delayed;
+	return ret;
 }
 
 static inline int __send_flush_snap(struct inode *inode,
@@ -1617,6 +1662,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
 	int was = ci->i_dirty_caps;
 	int dirty = 0;
 
+	lockdep_assert_held(&ci->i_ceph_lock);
+
 	if (!ci->i_auth_cap) {
 		pr_warn("__mark_dirty_caps %p %llx mask %s, "
 			"but no auth cap (session was closed?)\n",
@@ -1654,7 +1701,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
 	if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
 	    (mask & CEPH_CAP_FILE_BUFFER))
 		dirty |= I_DIRTY_DATASYNC;
-	__cap_delay_requeue(mdsc, ci, true);
+	__cap_delay_requeue(mdsc, ci);
 	return dirty;
 }
 
@@ -1726,6 +1773,7 @@ static u64 __mark_caps_flushing(struct inode *inode,
 	struct ceph_cap_flush *cf = NULL;
 	int flushing;
 
+	lockdep_assert_held(&ci->i_ceph_lock);
 	BUG_ON(ci->i_dirty_caps == 0);
 	BUG_ON(list_empty(&ci->i_dirty_item));
 	BUG_ON(!ci->i_prealloc_cap_flush);
@@ -1805,8 +1853,6 @@ bool __ceph_should_report_size(struct ceph_inode_info *ci)
  * versus held caps.  Release, flush, ack revoked caps to mds as
  * appropriate.
  *
- *  CHECK_CAPS_NODELAY - caller is delayed work and we should not delay
- *    cap release further.
  *  CHECK_CAPS_AUTHONLY - we should only check the auth cap
  *  CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
  *    further delay.
@@ -1825,24 +1871,13 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
 			   to avoid an infinite loop on retry */
 	struct rb_node *p;
-	int delayed = 0, sent = 0;
-	bool no_delay = flags & CHECK_CAPS_NODELAY;
 	bool queue_invalidate = false;
 	bool tried_invalidate = false;
 
-	/* if we are unmounting, flush any unused caps immediately. */
-	if (mdsc->stopping)
-		no_delay = true;
-
 	spin_lock(&ci->i_ceph_lock);
-
 	if (ci->i_ceph_flags & CEPH_I_FLUSH)
 		flags |= CHECK_CAPS_FLUSH;
 
-	if (!(flags & CHECK_CAPS_AUTHONLY) ||
-	    (ci->i_auth_cap && __ceph_is_single_caps(ci)))
-		__cap_delay_cancel(mdsc, ci);
-
 	goto retry_locked;
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1866,10 +1901,11 @@ retry_locked:
 			 * revoking the shared cap on every create/unlink
 			 * operation.
 			 */
-			if (IS_RDONLY(inode))
+			if (IS_RDONLY(inode)) {
 				want = CEPH_CAP_ANY_SHARED;
-			else
-				want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			} else {
+				want |= CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
+			}
 			retain |= want;
 		} else {
 
@@ -1885,14 +1921,13 @@ retry_locked:
 	}
 
 	dout("check_caps %p file_want %s used %s dirty %s flushing %s"
-	     " issued %s revoking %s retain %s %s%s%s\n", inode,
+	     " issued %s revoking %s retain %s %s%s\n", inode,
 	     ceph_cap_string(file_wanted),
 	     ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
 	     ceph_cap_string(ci->i_flushing_caps),
 	     ceph_cap_string(issued), ceph_cap_string(revoking),
 	     ceph_cap_string(retain),
 	     (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
-	     (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
 	     (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
 
 	/*
@@ -1900,8 +1935,8 @@ retry_locked:
 	 * have cached pages, but don't want them, then try to invalidate.
 	 * If we fail, it's because pages are locked.... try again later.
 	 */
-	if ((!no_delay || mdsc->stopping) &&
-	    !S_ISDIR(inode->i_mode) &&		/* ignore readdir cache */
+	if ((!(flags & CHECK_CAPS_NOINVAL) || mdsc->stopping) &&
+	    S_ISREG(inode->i_mode) &&
 	    !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
 	    inode->i_data.nrpages &&		/* have cached pages */
 	    (revoking & (CEPH_CAP_FILE_CACHE|
@@ -1973,28 +2008,17 @@ retry_locked:
 		}
 
 		/* want more caps from mds? */
-		if (want & ~(cap->mds_wanted | cap->issued))
-			goto ack;
+		if (want & ~cap->mds_wanted) {
+			if (want & ~(cap->mds_wanted | cap->issued))
+				goto ack;
+			if (!__cap_is_valid(cap))
+				goto ack;
+		}
 
 		/* things we might delay */
 		if ((cap->issued & ~retain) == 0)
 			continue;     /* nope, all good */
 
-		if (no_delay)
-			goto ack;
-
-		/* delay? */
-		if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
-		    time_before(jiffies, ci->i_hold_caps_max)) {
-			dout(" delaying issued %s -> %s, wanted %s -> %s\n",
-			     ceph_cap_string(cap->issued),
-			     ceph_cap_string(cap->issued & retain),
-			     ceph_cap_string(cap->mds_wanted),
-			     ceph_cap_string(want));
-			delayed++;
-			continue;
-		}
-
 ack:
 		if (session && session != cap->session) {
 			dout("oops, wrong session %p mutex\n", session);
@@ -2055,18 +2079,20 @@ ack:
 		}
 
 		mds = cap->mds;  /* remember mds, so we don't repeat */
-		sent++;
 
 		/* __send_cap drops i_ceph_lock */
-		delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0,
-				cap_used, want, retain, flushing,
-				flush_tid, oldest_flush_tid);
+		__send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, 0, cap_used, want,
+			   retain, flushing, flush_tid, oldest_flush_tid);
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
-	/* Reschedule delayed caps release if we delayed anything */
-	if (delayed)
-		__cap_delay_requeue(mdsc, ci, false);
+	/* periodically re-calculate caps wanted by open files */
+	if (__ceph_is_any_real_caps(ci) &&
+	    list_empty(&ci->i_cap_delay_list) &&
+	    (file_wanted & ~CEPH_CAP_PIN) &&
+	    !(used & (CEPH_CAP_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
+		__cap_delay_requeue(mdsc, ci);
+	}
 
 	spin_unlock(&ci->i_ceph_lock);
 
@@ -2095,7 +2121,6 @@ retry:
 retry_locked:
 	if (ci->i_dirty_caps && ci->i_auth_cap) {
 		struct ceph_cap *cap = ci->i_auth_cap;
-		int delayed;
 
 		if (session != cap->session) {
 			spin_unlock(&ci->i_ceph_lock);
@@ -2124,18 +2149,10 @@ retry_locked:
 						 &oldest_flush_tid);
 
 		/* __send_cap drops i_ceph_lock */
-		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-				     CEPH_CLIENT_CAPS_SYNC,
-				     __ceph_caps_used(ci),
-				     __ceph_caps_wanted(ci),
-				     (cap->issued | cap->implemented),
-				     flushing, flush_tid, oldest_flush_tid);
-
-		if (delayed) {
-			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci, true);
-			spin_unlock(&ci->i_ceph_lock);
-		}
+		__send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, CEPH_CLIENT_CAPS_SYNC,
+			   __ceph_caps_used(ci), __ceph_caps_wanted(ci),
+			   (cap->issued | cap->implemented),
+			   flushing, flush_tid, oldest_flush_tid);
 	} else {
 		if (!list_empty(&ci->i_cap_flush_list)) {
 			struct ceph_cap_flush *cf =
@@ -2233,6 +2250,10 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 	if (datasync)
 		goto out;
 
+	ret = ceph_wait_on_async_create(inode);
+	if (ret)
+		goto out;
+
 	dirty = try_flush_caps(inode, &flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
@@ -2335,22 +2356,13 @@ static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
 		if (cf->caps) {
 			dout("kick_flushing_caps %p cap %p tid %llu %s\n",
 			     inode, cap, cf->tid, ceph_cap_string(cf->caps));
-			ci->i_ceph_flags |= CEPH_I_NODELAY;
-
-			ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+			__send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
 					 (cf->tid < last_snap_flush ?
 					  CEPH_CLIENT_CAPS_PENDING_CAPSNAP : 0),
 					  __ceph_caps_used(ci),
 					  __ceph_caps_wanted(ci),
 					  (cap->issued | cap->implemented),
 					  cf->caps, cf->tid, oldest_flush_tid);
-			if (ret) {
-				pr_err("kick_flushing_caps: error sending "
-					"cap flush, ino (%llx.%llx) "
-					"tid %llu flushing %s\n",
-					ceph_vinop(inode), cf->tid,
-					ceph_cap_string(cf->caps));
-			}
 		} else {
 			struct ceph_cap_snap *capsnap =
 					container_of(cf, struct ceph_cap_snap,
@@ -2457,16 +2469,15 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 	}
 }
 
-static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
-				     struct ceph_mds_session *session,
-				     struct inode *inode)
-	__releases(ci->i_ceph_lock)
+void ceph_kick_flushing_inode_caps(struct ceph_mds_session *session,
+				   struct ceph_inode_info *ci)
 {
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_cap *cap;
+	struct ceph_mds_client *mdsc = session->s_mdsc;
+	struct ceph_cap *cap = ci->i_auth_cap;
+
+	lockdep_assert_held(&ci->i_ceph_lock);
 
-	cap = ci->i_auth_cap;
-	dout("kick_flushing_inode_caps %p flushing %s\n", inode,
+	dout("%s %p flushing %s\n", __func__, &ci->vfs_inode,
 	     ceph_cap_string(ci->i_flushing_caps));
 
 	if (!list_empty(&ci->i_cap_flush_list)) {
@@ -2478,9 +2489,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
 		spin_unlock(&mdsc->cap_dirty_lock);
 
 		__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
-		spin_unlock(&ci->i_ceph_lock);
-	} else {
-		spin_unlock(&ci->i_ceph_lock);
 	}
 }
 
@@ -2488,18 +2496,20 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
 /*
  * Take references to capabilities we hold, so that we don't release
  * them to the MDS prematurely.
- *
- * Protected by i_ceph_lock.
  */
-static void __take_cap_refs(struct ceph_inode_info *ci, int got,
+void ceph_take_cap_refs(struct ceph_inode_info *ci, int got,
 			    bool snap_rwsem_locked)
 {
+	lockdep_assert_held(&ci->i_ceph_lock);
+
 	if (got & CEPH_CAP_PIN)
 		ci->i_pin_ref++;
 	if (got & CEPH_CAP_FILE_RD)
 		ci->i_rd_ref++;
 	if (got & CEPH_CAP_FILE_CACHE)
 		ci->i_rdcache_ref++;
+	if (got & CEPH_CAP_FILE_EXCL)
+		ci->i_fx_ref++;
 	if (got & CEPH_CAP_FILE_WR) {
 		if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
 			BUG_ON(!snap_rwsem_locked);
@@ -2512,7 +2522,7 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got,
 		if (ci->i_wb_ref == 0)
 			ihold(&ci->vfs_inode);
 		ci->i_wb_ref++;
-		dout("__take_cap_refs %p wb %d -> %d (?)\n",
+		dout("%s %p wb %d -> %d (?)\n", __func__,
 		     &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref);
 	}
 }
@@ -2524,14 +2534,16 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got,
  * Note that caller is responsible for ensuring max_size increases are
  * requested from the MDS.
  *
- * Returns 0 if caps were not able to be acquired (yet), a 1 if they were,
- * or a negative error code.
- *
- * FIXME: how does a 0 return differ from -EAGAIN?
+ * Returns 0 if caps were not able to be acquired (yet), 1 if succeed,
+ * or a negative error code. There are 3 speical error codes:
+ *  -EAGAIN: need to sleep but non-blocking is specified
+ *  -EFBIG:  ask caller to call check_max_size() and try again.
+ *  -ESTALE: ask caller to call ceph_renew_caps() and try again.
  */
 enum {
-	NON_BLOCKING	= 1,
-	CHECK_FILELOCK	= 2,
+	/* first 8 bits are reserved for CEPH_FILE_MODE_FOO */
+	NON_BLOCKING	= (1 << 8),
+	CHECK_FILELOCK	= (1 << 9),
 };
 
 static int try_get_cap_refs(struct inode *inode, int need, int want,
@@ -2541,7 +2553,6 @@ static int try_get_cap_refs(struct inode *inode, int need, int want,
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	int ret = 0;
 	int have, implemented;
-	int file_wanted;
 	bool snap_rwsem_locked = false;
 
 	dout("get_cap_refs %p need %s want %s\n", inode,
@@ -2557,15 +2568,6 @@ again:
 		goto out_unlock;
 	}
 
-	/* make sure file is actually open */
-	file_wanted = __ceph_caps_file_wanted(ci);
-	if ((file_wanted & need) != need) {
-		dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
-		     ceph_cap_string(need), ceph_cap_string(file_wanted));
-		ret = -EBADF;
-		goto out_unlock;
-	}
-
 	/* finish pending truncate */
 	while (ci->i_truncate_pending) {
 		spin_unlock(&ci->i_ceph_lock);
@@ -2584,7 +2586,7 @@ again:
 			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
 			     inode, endoff, ci->i_max_size);
 			if (endoff > ci->i_requested_max_size)
-				ret = -EAGAIN;
+				ret = ci->i_auth_cap ? -EFBIG : -ESTALE;
 			goto out_unlock;
 		}
 		/*
@@ -2630,51 +2632,55 @@ again:
 				}
 				snap_rwsem_locked = true;
 			}
-			*got = need | (have & want);
-			if ((need & CEPH_CAP_FILE_RD) &&
+			if ((have & want) == want)
+				*got = need | want;
+			else
+				*got = need;
+			if (S_ISREG(inode->i_mode) &&
+			    (need & CEPH_CAP_FILE_RD) &&
 			    !(*got & CEPH_CAP_FILE_CACHE))
 				ceph_disable_fscache_readpage(ci);
-			__take_cap_refs(ci, *got, true);
+			ceph_take_cap_refs(ci, *got, true);
 			ret = 1;
 		}
 	} else {
 		int session_readonly = false;
-		if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
+		int mds_wanted;
+		if (ci->i_auth_cap &&
+		    (need & (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_EXCL))) {
 			struct ceph_mds_session *s = ci->i_auth_cap->session;
 			spin_lock(&s->s_cap_lock);
 			session_readonly = s->s_readonly;
 			spin_unlock(&s->s_cap_lock);
 		}
 		if (session_readonly) {
-			dout("get_cap_refs %p needed %s but mds%d readonly\n",
+			dout("get_cap_refs %p need %s but mds%d readonly\n",
 			     inode, ceph_cap_string(need), ci->i_auth_cap->mds);
 			ret = -EROFS;
 			goto out_unlock;
 		}
 
-		if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
-			int mds_wanted;
-			if (READ_ONCE(mdsc->fsc->mount_state) ==
-			    CEPH_MOUNT_SHUTDOWN) {
-				dout("get_cap_refs %p forced umount\n", inode);
-				ret = -EIO;
-				goto out_unlock;
-			}
-			mds_wanted = __ceph_caps_mds_wanted(ci, false);
-			if (need & ~(mds_wanted & need)) {
-				dout("get_cap_refs %p caps were dropped"
-				     " (session killed?)\n", inode);
-				ret = -ESTALE;
-				goto out_unlock;
-			}
-			if (!(file_wanted & ~mds_wanted))
-				ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
+		if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
+			dout("get_cap_refs %p forced umount\n", inode);
+			ret = -EIO;
+			goto out_unlock;
+		}
+		mds_wanted = __ceph_caps_mds_wanted(ci, false);
+		if (need & ~mds_wanted) {
+			dout("get_cap_refs %p need %s > mds_wanted %s\n",
+			     inode, ceph_cap_string(need),
+			     ceph_cap_string(mds_wanted));
+			ret = -ESTALE;
+			goto out_unlock;
 		}
 
-		dout("get_cap_refs %p have %s needed %s\n", inode,
+		dout("get_cap_refs %p have %s need %s\n", inode,
 		     ceph_cap_string(have), ceph_cap_string(need));
 	}
 out_unlock:
+
+	__ceph_touch_fmode(ci, mdsc, flags);
+
 	spin_unlock(&ci->i_ceph_lock);
 	if (snap_rwsem_locked)
 		up_read(&mdsc->snap_rwsem);
@@ -2712,20 +2718,40 @@ static void check_max_size(struct inode *inode, loff_t endoff)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
+static inline int get_used_fmode(int caps)
+{
+	int fmode = 0;
+	if (caps & CEPH_CAP_FILE_RD)
+		fmode |= CEPH_FILE_MODE_RD;
+	if (caps & CEPH_CAP_FILE_WR)
+		fmode |= CEPH_FILE_MODE_WR;
+	return fmode;
+}
+
 int ceph_try_get_caps(struct inode *inode, int need, int want,
 		      bool nonblock, int *got)
 {
-	int ret;
+	int ret, flags;
 
 	BUG_ON(need & ~CEPH_CAP_FILE_RD);
-	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED));
-	ret = ceph_pool_perm_check(inode, need);
-	if (ret < 0)
-		return ret;
+	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO |
+			CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+			CEPH_CAP_ANY_DIR_OPS));
+	if (need) {
+		ret = ceph_pool_perm_check(inode, need);
+		if (ret < 0)
+			return ret;
+	}
+
+	flags = get_used_fmode(need | want);
+	if (nonblock)
+		flags |= NON_BLOCKING;
 
-	ret = try_get_cap_refs(inode, need, want, 0,
-			       (nonblock ? NON_BLOCKING : 0), got);
-	return ret == -EAGAIN ? 0 : ret;
+	ret = try_get_cap_refs(inode, need, want, 0, flags, got);
+	/* three special error codes */
+	if (ret == -EAGAIN || ret == -EFBIG || ret == -EAGAIN)
+		ret = 0;
+	return ret;
 }
 
 /*
@@ -2750,16 +2776,16 @@ int ceph_get_caps(struct file *filp, int need, int want,
 	    fi->filp_gen != READ_ONCE(fsc->filp_gen))
 		return -EBADF;
 
-	while (true) {
-		if (endoff > 0)
-			check_max_size(inode, endoff);
+	flags = get_used_fmode(need | want);
 
-		flags = atomic_read(&fi->num_locks) ? CHECK_FILELOCK : 0;
+	while (true) {
+		flags &= CEPH_FILE_MODE_MASK;
+		if (atomic_read(&fi->num_locks))
+			flags |= CHECK_FILELOCK;
 		_got = 0;
 		ret = try_get_cap_refs(inode, need, want, endoff,
 				       flags, &_got);
-		if (ret == -EAGAIN)
-			continue;
+		WARN_ON_ONCE(ret == -EAGAIN);
 		if (!ret) {
 			struct ceph_mds_client *mdsc = fsc->mdsc;
 			struct cap_wait cw;
@@ -2774,6 +2800,8 @@ int ceph_get_caps(struct file *filp, int need, int want,
 			list_add(&cw.list, &mdsc->cap_wait_list);
 			spin_unlock(&mdsc->caps_list_lock);
 
+			/* make sure used fmode not timeout */
+			ceph_get_fmode(ci, flags, FMODE_WAIT_BIAS);
 			add_wait_queue(&ci->i_cap_wq, &wait);
 
 			flags |= NON_BLOCKING;
@@ -2787,6 +2815,7 @@ int ceph_get_caps(struct file *filp, int need, int want,
 			}
 
 			remove_wait_queue(&ci->i_cap_wq, &wait);
+			ceph_put_fmode(ci, flags, FMODE_WAIT_BIAS);
 
 			spin_lock(&mdsc->caps_list_lock);
 			list_del(&cw.list);
@@ -2804,16 +2833,26 @@ int ceph_get_caps(struct file *filp, int need, int want,
 		}
 
 		if (ret < 0) {
+			if (ret == -EFBIG || ret == -ESTALE) {
+				int ret2 = ceph_wait_on_async_create(inode);
+				if (ret2 < 0)
+					return ret2;
+			}
+			if (ret == -EFBIG) {
+				check_max_size(inode, endoff);
+				continue;
+			}
 			if (ret == -ESTALE) {
 				/* session was killed, try renew caps */
-				ret = ceph_renew_caps(inode);
+				ret = ceph_renew_caps(inode, flags);
 				if (ret == 0)
 					continue;
 			}
 			return ret;
 		}
 
-		if (ci->i_inline_version != CEPH_INLINE_NONE &&
+		if (S_ISREG(ci->vfs_inode.i_mode) &&
+		    ci->i_inline_version != CEPH_INLINE_NONE &&
 		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
 		    i_size_read(inode) > 0) {
 			struct page *page =
@@ -2846,7 +2885,8 @@ int ceph_get_caps(struct file *filp, int need, int want,
 		break;
 	}
 
-	if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
+	if (S_ISREG(ci->vfs_inode.i_mode) &&
+	    (_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
 		ceph_fscache_revalidate_cookie(ci);
 
 	*got = _got;
@@ -2860,7 +2900,7 @@ int ceph_get_caps(struct file *filp, int need, int want,
 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
 {
 	spin_lock(&ci->i_ceph_lock);
-	__take_cap_refs(ci, caps, false);
+	ceph_take_cap_refs(ci, caps, false);
 	spin_unlock(&ci->i_ceph_lock);
 }
 
@@ -2911,6 +2951,9 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 	if (had & CEPH_CAP_FILE_CACHE)
 		if (--ci->i_rdcache_ref == 0)
 			last++;
+	if (had & CEPH_CAP_FILE_EXCL)
+		if (--ci->i_fx_ref == 0)
+			last++;
 	if (had & CEPH_CAP_FILE_BUFFER) {
 		if (--ci->i_wb_ref == 0) {
 			last++;
@@ -2950,7 +2993,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
 	dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
 	     last ? " last" : "", put ? " put" : "");
 
-	if (last && !flushsnaps)
+	if (last)
 		ceph_check_caps(ci, 0, NULL);
 	else if (flushsnaps)
 		ceph_flush_snaps(ci, NULL);
@@ -3032,7 +3075,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 	spin_unlock(&ci->i_ceph_lock);
 
 	if (last) {
-		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
+		ceph_check_caps(ci, 0, NULL);
 	} else if (flush_snaps) {
 		ceph_flush_snaps(ci, NULL);
 	}
@@ -3133,7 +3176,7 @@ static void handle_cap_grant(struct inode *inode,
 	 * try to invalidate (once).  (If there are dirty buffers, we
 	 * will invalidate _after_ writeback.)
 	 */
-	if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
+	if (S_ISREG(inode->i_mode) && /* don't invalidate readdir cache */
 	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
 	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
 	    !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
@@ -3297,11 +3340,12 @@ static void handle_cap_grant(struct inode *inode,
 		     ceph_cap_string(cap->issued),
 		     ceph_cap_string(newcaps),
 		     ceph_cap_string(revoking));
-		if (revoking & used & CEPH_CAP_FILE_BUFFER)
+		if (S_ISREG(inode->i_mode) &&
+		    (revoking & used & CEPH_CAP_FILE_BUFFER))
 			writeback = true;  /* initiate writeback; will delay ack */
-		else if (revoking == CEPH_CAP_FILE_CACHE &&
-			 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
-			 queue_invalidate)
+		else if (queue_invalidate &&
+			 revoking == CEPH_CAP_FILE_CACHE &&
+			 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0)
 			; /* do nothing yet, invalidation will be queued */
 		else if (cap == ci->i_auth_cap)
 			check_caps = 1; /* check auth cap only */
@@ -3339,7 +3383,8 @@ static void handle_cap_grant(struct inode *inode,
 	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
 		if (newcaps & ~extra_info->issued)
 			wake = true;
-		kick_flushing_inode_caps(session->s_mdsc, session, inode);
+		ceph_kick_flushing_inode_caps(session, ci);
+		spin_unlock(&ci->i_ceph_lock);
 		up_read(&session->s_mdsc->snap_rwsem);
 	} else {
 		spin_unlock(&ci->i_ceph_lock);
@@ -3367,10 +3412,10 @@ static void handle_cap_grant(struct inode *inode,
 		wake_up_all(&ci->i_cap_wq);
 
 	if (check_caps == 1)
-		ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
+		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY | CHECK_CAPS_NOINVAL,
 				session);
 	else if (check_caps == 2)
-		ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
+		ceph_check_caps(ci, CHECK_CAPS_NOINVAL, session);
 	else
 		mutex_unlock(&session->s_mutex);
 }
@@ -3619,8 +3664,6 @@ retry:
 		goto out_unlock;
 
 	if (target < 0) {
-		if (cap->mds_wanted | cap->issued)
-			ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
 		__ceph_remove_cap(cap, false);
 		goto out_unlock;
 	}
@@ -3668,7 +3711,7 @@ retry:
 		/* add placeholder for the export tagert */
 		int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
 		tcap = new_cap;
-		ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
+		ceph_add_cap(inode, tsession, t_cap_id, issued, 0,
 			     t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
 
 		if (!list_empty(&ci->i_cap_flush_list) &&
@@ -3773,7 +3816,7 @@ retry:
 	__ceph_caps_issued(ci, &issued);
 	issued |= __ceph_caps_dirty(ci);
 
-	ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
+	ceph_add_cap(inode, session, cap_id, caps, wanted, seq, mseq,
 		     realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
 
 	ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
@@ -4047,7 +4090,6 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 {
 	struct inode *inode;
 	struct ceph_inode_info *ci;
-	int flags = CHECK_CAPS_NODELAY;
 
 	dout("check_delayed_caps\n");
 	while (1) {
@@ -4067,7 +4109,7 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
 
 		if (inode) {
 			dout("check_delayed_caps on %p\n", inode);
-			ceph_check_caps(ci, flags, NULL);
+			ceph_check_caps(ci, 0, NULL);
 			/* avoid calling iput_final() in tick thread */
 			ceph_async_iput(inode);
 		}
@@ -4092,7 +4134,7 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 		ihold(inode);
 		dout("flush_dirty_caps %p\n", inode);
 		spin_unlock(&mdsc->cap_dirty_lock);
-		ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL);
+		ceph_check_caps(ci, CHECK_CAPS_FLUSH, NULL);
 		iput(inode);
 		spin_lock(&mdsc->cap_dirty_lock);
 	}
@@ -4100,14 +4142,31 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 	dout("flush_dirty_caps done\n");
 }
 
-void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
+void __ceph_touch_fmode(struct ceph_inode_info *ci,
+			struct ceph_mds_client *mdsc, int fmode)
+{
+	unsigned long now = jiffies;
+	if (fmode & CEPH_FILE_MODE_RD)
+		ci->i_last_rd = now;
+	if (fmode & CEPH_FILE_MODE_WR)
+		ci->i_last_wr = now;
+	/* queue periodic check */
+	if (fmode &&
+	    __ceph_is_any_real_caps(ci) &&
+	    list_empty(&ci->i_cap_delay_list))
+		__cap_delay_requeue(mdsc, ci);
+}
+
+void ceph_get_fmode(struct ceph_inode_info *ci, int fmode, int count)
 {
 	int i;
 	int bits = (fmode << 1) | 1;
+	spin_lock(&ci->i_ceph_lock);
 	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
 		if (bits & (1 << i))
-			ci->i_nr_by_mode[i]++;
+			ci->i_nr_by_mode[i] += count;
 	}
+	spin_unlock(&ci->i_ceph_lock);
 }
 
 /*
@@ -4115,26 +4174,18 @@ void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
  * we may need to release capabilities to the MDS (or schedule
  * their delayed release).
  */
-void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
+void ceph_put_fmode(struct ceph_inode_info *ci, int fmode, int count)
 {
-	int i, last = 0;
+	int i;
 	int bits = (fmode << 1) | 1;
 	spin_lock(&ci->i_ceph_lock);
 	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
 		if (bits & (1 << i)) {
-			BUG_ON(ci->i_nr_by_mode[i] == 0);
-			if (--ci->i_nr_by_mode[i] == 0)
-				last++;
+			BUG_ON(ci->i_nr_by_mode[i] < count);
+			ci->i_nr_by_mode[i] -= count;
 		}
 	}
-	dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n",
-	     &ci->vfs_inode, fmode,
-	     ci->i_nr_by_mode[0], ci->i_nr_by_mode[1],
-	     ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]);
 	spin_unlock(&ci->i_ceph_lock);
-
-	if (last && ci->i_vino.snap == CEPH_NOSNAP)
-		ceph_check_caps(ci, 0, NULL);
 }
 
 /*
@@ -4152,7 +4203,6 @@ int ceph_drop_caps_for_unlink(struct inode *inode)
 	if (inode->i_nlink == 1) {
 		drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
 
-		ci->i_ceph_flags |= CEPH_I_NODELAY;
 		if (__ceph_caps_dirty(ci)) {
 			struct ceph_mds_client *mdsc =
 				ceph_inode_to_client(inode)->mdsc;
@@ -4208,8 +4258,6 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
 		if (force || (cap->issued & drop)) {
 			if (cap->issued & drop) {
 				int wanted = __ceph_caps_wanted(ci);
-				if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
-					wanted |= cap->mds_wanted;
 				dout("encode_inode_release %p cap %p "
 				     "%s -> %s, wanted %s -> %s\n", inode, cap,
 				     ceph_cap_string(cap->issued),