summary refs log tree commit diff
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-02-08 11:38:59 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2018-02-08 11:38:59 -0800
commit9e95dae76b53e67b64bb8e8468d2285b1dc34720 (patch)
tree482551f8e837b34486d20b3c2273bf5f89b2000b /fs
parenta8c6db00bff2bdb466d80b7ec7ff25d327071f9b (diff)
parent16515a6d54183349b858b9c05e483afc55fa7948 (diff)
downloadlinux-9e95dae76b53e67b64bb8e8468d2285b1dc34720.tar.gz
Merge tag 'ceph-for-4.16-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "Things have been very quiet on the rbd side, as work continues on the
  big ticket items slated for the next merge window.

  On the CephFS side we have a large number of cap handling
  improvements, a fix for our long-standing abuse of ->journal_info in
  ceph_readpages() and yet another dentry pointer management patch"

* tag 'ceph-for-4.16-rc1' of git://github.com/ceph/ceph-client:
  ceph: improving efficiency of syncfs
  libceph: check kstrndup() return value
  ceph: try to allocate enough memory for reserved caps
  ceph: fix race of queuing delayed caps
  ceph: delete unreachable code in ceph_check_caps()
  ceph: limit rate of cap import/export error messages
  ceph: fix incorrect snaprealm when adding caps
  ceph: fix un-balanced fsc->writeback_count update
  ceph: track read contexts in ceph_file_info
  ceph: avoid dereferencing invalid pointer during cached readdir
  ceph: use atomic_t for ceph_inode_info::i_shared_gen
  ceph: cleanup traceless reply handling for rename
  ceph: voluntarily drop Fx cap for readdir request
  ceph: properly drop caps for setattr request
  ceph: voluntarily drop Lx cap for link/rename requests
  ceph: voluntarily drop Ax cap for requests that create new inode
  rbd: whitelist RBD_FEATURE_OPERATIONS feature bit
  rbd: don't NULL out ->obj_request in rbd_img_obj_parent_read_full()
  rbd: use kmem_cache_zalloc() in rbd_img_request_create()
  rbd: obj_request->completion is unused
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/addr.c28
-rw-r--r--fs/ceph/caps.c170
-rw-r--r--fs/ceph/dir.c79
-rw-r--r--fs/ceph/file.c12
-rw-r--r--fs/ceph/inode.c56
-rw-r--r--fs/ceph/mds_client.c33
-rw-r--r--fs/ceph/mds_client.h3
-rw-r--r--fs/ceph/snap.c8
-rw-r--r--fs/ceph/super.h53
9 files changed, 313 insertions, 129 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index dbf07051aacd..b4336b42ce3b 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -299,7 +299,8 @@ unlock:
  * start an async read(ahead) operation.  return nr_pages we submitted
  * a read for on success, or negative error code.
  */
-static int start_read(struct inode *inode, struct list_head *page_list, int max)
+static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx,
+		      struct list_head *page_list, int max)
 {
 	struct ceph_osd_client *osdc =
 		&ceph_inode_to_client(inode)->client->osdc;
@@ -316,7 +317,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 	int got = 0;
 	int ret = 0;
 
-	if (!current->journal_info) {
+	if (!rw_ctx) {
 		/* caller of readpages does not hold buffer and read caps
 		 * (fadvise, madvise and readahead cases) */
 		int want = CEPH_CAP_FILE_CACHE;
@@ -437,6 +438,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 {
 	struct inode *inode = file_inode(file);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_file_info *ci = file->private_data;
+	struct ceph_rw_context *rw_ctx;
 	int rc = 0;
 	int max = 0;
 
@@ -449,11 +452,12 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 	if (rc == 0)
 		goto out;
 
+	rw_ctx = ceph_find_rw_context(ci);
 	max = fsc->mount_options->rsize >> PAGE_SHIFT;
-	dout("readpages %p file %p nr_pages %d max %d\n",
-	     inode, file, nr_pages, max);
+	dout("readpages %p file %p ctx %p nr_pages %d max %d\n",
+	     inode, file, rw_ctx, nr_pages, max);
 	while (!list_empty(page_list)) {
-		rc = start_read(inode, page_list, max);
+		rc = start_read(inode, rw_ctx, page_list, max);
 		if (rc < 0)
 			goto out;
 	}
@@ -574,7 +578,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	struct ceph_fs_client *fsc;
 	struct ceph_snap_context *snapc, *oldest;
 	loff_t page_off = page_offset(page);
-	long writeback_stat;
 	int err, len = PAGE_SIZE;
 	struct ceph_writeback_ctl ceph_wbc;
 
@@ -615,8 +618,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
 	     inode, page, page->index, page_off, len, snapc, snapc->seq);
 
-	writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
-	if (writeback_stat >
+	if (atomic_long_inc_return(&fsc->writeback_count) >
 	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
 		set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
 
@@ -651,6 +653,11 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	end_page_writeback(page);
 	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
 	ceph_put_snap_context(snapc);  /* page's reference */
+
+	if (atomic_long_dec_return(&fsc->writeback_count) <
+	    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
+		clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
+
 	return err;
 }
 
@@ -1450,9 +1457,10 @@ static int ceph_filemap_fault(struct vm_fault *vmf)
 
 	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
 	    ci->i_inline_version == CEPH_INLINE_NONE) {
-		current->journal_info = vma->vm_file;
+		CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
+		ceph_add_rw_context(fi, &rw_ctx);
 		ret = filemap_fault(vmf);
-		current->journal_info = NULL;
+		ceph_del_rw_context(fi, &rw_ctx);
 	} else
 		ret = -EAGAIN;
 
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index a14b2c974c9e..6582c4507e6c 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -154,13 +154,19 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
 	spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_reserve_caps(struct ceph_mds_client *mdsc,
+/*
+ * Called under mdsc->mutex.
+ */
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 		      struct ceph_cap_reservation *ctx, int need)
 {
-	int i;
+	int i, j;
 	struct ceph_cap *cap;
 	int have;
 	int alloc = 0;
+	int max_caps;
+	bool trimmed = false;
+	struct ceph_mds_session *s;
 	LIST_HEAD(newcaps);
 
 	dout("reserve caps ctx=%p need=%d\n", ctx, need);
@@ -179,16 +185,37 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc,
 	spin_unlock(&mdsc->caps_list_lock);
 
 	for (i = have; i < need; i++) {
+retry:
 		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
-		if (!cap)
-			break;
+		if (!cap) {
+			if (!trimmed) {
+				for (j = 0; j < mdsc->max_sessions; j++) {
+					s = __ceph_lookup_mds_session(mdsc, j);
+					if (!s)
+						continue;
+					mutex_unlock(&mdsc->mutex);
+
+					mutex_lock(&s->s_mutex);
+					max_caps = s->s_nr_caps - (need - i);
+					ceph_trim_caps(mdsc, s, max_caps);
+					mutex_unlock(&s->s_mutex);
+
+					ceph_put_mds_session(s);
+					mutex_lock(&mdsc->mutex);
+				}
+				trimmed = true;
+				goto retry;
+			} else {
+				pr_warn("reserve caps ctx=%p ENOMEM "
+					"need=%d got=%d\n",
+					ctx, need, have + alloc);
+				goto out_nomem;
+			}
+		}
 		list_add(&cap->caps_item, &newcaps);
 		alloc++;
 	}
-	/* we didn't manage to reserve as much as we needed */
-	if (have + alloc != need)
-		pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
-			ctx, need, have + alloc);
+	BUG_ON(have + alloc != need);
 
 	spin_lock(&mdsc->caps_list_lock);
 	mdsc->caps_total_count += alloc;
@@ -204,6 +231,24 @@ void ceph_reserve_caps(struct ceph_mds_client *mdsc,
 	dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
 	     ctx, mdsc->caps_total_count, mdsc->caps_use_count,
 	     mdsc->caps_reserve_count, mdsc->caps_avail_count);
+	return 0;
+
+out_nomem:
+	while (!list_empty(&newcaps)) {
+		cap = list_first_entry(&newcaps,
+				struct ceph_cap, caps_item);
+		list_del(&cap->caps_item);
+		kmem_cache_free(ceph_cap_cachep, cap);
+	}
+
+	spin_lock(&mdsc->caps_list_lock);
+	mdsc->caps_avail_count += have;
+	mdsc->caps_reserve_count -= have;
+	BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+					 mdsc->caps_reserve_count +
+					 mdsc->caps_avail_count);
+	spin_unlock(&mdsc->caps_list_lock);
+	return -ENOMEM;
 }
 
 int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
@@ -498,7 +543,7 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 	 */
 	if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
 		if (issued & CEPH_CAP_FILE_SHARED)
-			ci->i_shared_gen++;
+			atomic_inc(&ci->i_shared_gen);
 		if (S_ISDIR(ci->vfs_inode.i_mode)) {
 			dout(" marking %p NOT complete\n", &ci->vfs_inode);
 			__ceph_dir_clear_complete(ci);
@@ -577,18 +622,30 @@ void ceph_add_cap(struct inode *inode,
 		}
 	}
 
-	if (!ci->i_snap_realm) {
+	if (!ci->i_snap_realm ||
+	    ((flags & CEPH_CAP_FLAG_AUTH) &&
+	     realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
 		/*
 		 * add this inode to the appropriate snap realm
 		 */
 		struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
 							       realmino);
 		if (realm) {
+			struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
+			if (oldrealm) {
+				spin_lock(&oldrealm->inodes_with_caps_lock);
+				list_del_init(&ci->i_snap_realm_item);
+				spin_unlock(&oldrealm->inodes_with_caps_lock);
+			}
+
 			spin_lock(&realm->inodes_with_caps_lock);
 			ci->i_snap_realm = realm;
 			list_add(&ci->i_snap_realm_item,
 				 &realm->inodes_with_caps);
 			spin_unlock(&realm->inodes_with_caps_lock);
+
+			if (oldrealm)
+				ceph_put_snap_realm(mdsc, oldrealm);
 		} else {
 			pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
 			       realmino);
@@ -890,6 +947,11 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
 /*
  * called under i_ceph_lock
  */
+static int __ceph_is_single_caps(struct ceph_inode_info *ci)
+{
+	return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
+}
+
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
 	return !RB_EMPTY_ROOT(&ci->i_caps);
@@ -1703,21 +1765,24 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
 			   to avoid an infinite loop on retry */
 	struct rb_node *p;
-	int delayed = 0, sent = 0, num;
-	bool is_delayed = flags & CHECK_CAPS_NODELAY;
+	int delayed = 0, sent = 0;
+	bool no_delay = flags & CHECK_CAPS_NODELAY;
 	bool queue_invalidate = false;
-	bool force_requeue = false;
 	bool tried_invalidate = false;
 
 	/* if we are unmounting, flush any unused caps immediately. */
 	if (mdsc->stopping)
-		is_delayed = true;
+		no_delay = true;
 
 	spin_lock(&ci->i_ceph_lock);
 
 	if (ci->i_ceph_flags & CEPH_I_FLUSH)
 		flags |= CHECK_CAPS_FLUSH;
 
+	if (!(flags & CHECK_CAPS_AUTHONLY) ||
+	    (ci->i_auth_cap && __ceph_is_single_caps(ci)))
+		__cap_delay_cancel(mdsc, ci);
+
 	goto retry_locked;
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1772,7 +1837,7 @@ retry_locked:
 	 * have cached pages, but don't want them, then try to invalidate.
 	 * If we fail, it's because pages are locked.... try again later.
 	 */
-	if ((!is_delayed || mdsc->stopping) &&
+	if ((!no_delay || mdsc->stopping) &&
 	    !S_ISDIR(inode->i_mode) &&		/* ignore readdir cache */
 	    !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
 	    inode->i_data.nrpages &&		/* have cached pages */
@@ -1781,27 +1846,16 @@ retry_locked:
 	    !tried_invalidate) {
 		dout("check_caps trying to invalidate on %p\n", inode);
 		if (try_nonblocking_invalidate(inode) < 0) {
-			if (revoking & (CEPH_CAP_FILE_CACHE|
-					CEPH_CAP_FILE_LAZYIO)) {
-				dout("check_caps queuing invalidate\n");
-				queue_invalidate = true;
-				ci->i_rdcache_revoking = ci->i_rdcache_gen;
-			} else {
-				dout("check_caps failed to invalidate pages\n");
-				/* we failed to invalidate pages.  check these
-				   caps again later. */
-				force_requeue = true;
-				__cap_set_timeouts(mdsc, ci);
-			}
+			dout("check_caps queuing invalidate\n");
+			queue_invalidate = true;
+			ci->i_rdcache_revoking = ci->i_rdcache_gen;
 		}
 		tried_invalidate = true;
 		goto retry_locked;
 	}
 
-	num = 0;
 	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
 		cap = rb_entry(p, struct ceph_cap, ci_node);
-		num++;
 
 		/* avoid looping forever */
 		if (mds >= cap->mds ||
@@ -1864,7 +1918,7 @@ retry_locked:
 		    cap->mds_wanted == want)
 			continue;     /* nope, all good */
 
-		if (is_delayed)
+		if (no_delay)
 			goto ack;
 
 		/* delay? */
@@ -1955,15 +2009,8 @@ ack:
 		goto retry; /* retake i_ceph_lock and restart our cap scan. */
 	}
 
-	/*
-	 * Reschedule delayed caps release if we delayed anything,
-	 * otherwise cancel.
-	 */
-	if (delayed && is_delayed)
-		force_requeue = true;   /* __send_cap delayed release; requeue */
-	if (!delayed && !is_delayed)
-		__cap_delay_cancel(mdsc, ci);
-	else if (!is_delayed || force_requeue)
+	/* Reschedule delayed caps release if we delayed anything */
+	if (delayed)
 		__cap_delay_requeue(mdsc, ci);
 
 	spin_unlock(&ci->i_ceph_lock);
@@ -2160,7 +2207,7 @@ int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 	u64 flush_tid;
 	int err = 0;
 	int dirty;
-	int wait = wbc->sync_mode == WB_SYNC_ALL;
+	int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
 
 	dout("write_inode %p wait=%d\n", inode, wait);
 	if (wait) {
@@ -3426,7 +3473,14 @@ retry:
 	 */
 
 	issued = cap->issued;
-	WARN_ON(issued != cap->implemented);
+	if (issued != cap->implemented)
+		pr_err_ratelimited("handle_cap_export: issued != implemented: "
+				"ino (%llx.%llx) mds%d seq %d mseq %d "
+				"issued %s implemented %s\n",
+				ceph_vinop(inode), mds, cap->seq, cap->mseq,
+				ceph_cap_string(issued),
+				ceph_cap_string(cap->implemented));
+
 
 	tcap = __get_cap_for_mds(ci, target);
 	if (tcap) {
@@ -3572,12 +3626,13 @@ retry:
 		if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
 		    (ocap->seq != le32_to_cpu(ph->seq) ||
 		     ocap->mseq != le32_to_cpu(ph->mseq))) {
-			pr_err("handle_cap_import: mismatched seq/mseq: "
-			       "ino (%llx.%llx) mds%d seq %d mseq %d "
-			       "importer mds%d has peer seq %d mseq %d\n",
-			       ceph_vinop(inode), peer, ocap->seq,
-			       ocap->mseq, mds, le32_to_cpu(ph->seq),
-			       le32_to_cpu(ph->mseq));
+			pr_err_ratelimited("handle_cap_import: "
+					"mismatched seq/mseq: ino (%llx.%llx) "
+					"mds%d seq %d mseq %d importer mds%d "
+					"has peer seq %d mseq %d\n",
+					ceph_vinop(inode), peer, ocap->seq,
+					ocap->mseq, mds, le32_to_cpu(ph->seq),
+					le32_to_cpu(ph->mseq));
 		}
 		__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
 	}
@@ -3939,11 +3994,20 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
 
 	cap = __get_cap_for_mds(ci, mds);
 	if (cap && __cap_is_valid(cap)) {
-		if (force ||
-		    ((cap->issued & drop) &&
-		     (cap->issued & unless) == 0)) {
-			if ((cap->issued & drop) &&
-			    (cap->issued & unless) == 0) {
+		unless &= cap->issued;
+		if (unless) {
+			if (unless & CEPH_CAP_AUTH_EXCL)
+				drop &= ~CEPH_CAP_AUTH_SHARED;
+			if (unless & CEPH_CAP_LINK_EXCL)
+				drop &= ~CEPH_CAP_LINK_SHARED;
+			if (unless & CEPH_CAP_XATTR_EXCL)
+				drop &= ~CEPH_CAP_XATTR_SHARED;
+			if (unless & CEPH_CAP_FILE_EXCL)
+				drop &= ~CEPH_CAP_FILE_SHARED;
+		}
+
+		if (force || (cap->issued & drop)) {
+			if (cap->issued & drop) {
 				int wanted = __ceph_caps_wanted(ci);
 				if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
 					wanted |= cap->mds_wanted;
@@ -3975,7 +4039,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
 			*p += sizeof(*rel);
 			ret = 1;
 		} else {
-			dout("encode_inode_release %p cap %p %s\n",
+			dout("encode_inode_release %p cap %p %s (noop)\n",
 			     inode, cap, ceph_cap_string(cap->issued));
 		}
 	}
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 8a5266699b67..0c4346806e17 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -173,7 +173,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx,
  * the MDS if/when the directory is modified).
  */
 static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
-			    u32 shared_gen)
+			    int shared_gen)
 {
 	struct ceph_file_info *fi = file->private_data;
 	struct dentry *parent = file->f_path.dentry;
@@ -184,7 +184,7 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
 	u64 idx = 0;
 	int err = 0;
 
-	dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
+	dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos);
 
 	/* search start position */
 	if (ctx->pos > 2) {
@@ -231,11 +231,17 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,
 			goto out;
 		}
 
-		di = ceph_dentry(dentry);
 		spin_lock(&dentry->d_lock);
-		if (di->lease_shared_gen == shared_gen &&
-		    d_really_is_positive(dentry) &&
-		    fpos_cmp(ctx->pos, di->offset) <= 0) {
+		di = ceph_dentry(dentry);
+		if (d_unhashed(dentry) ||
+		    d_really_is_negative(dentry) ||
+		    di->lease_shared_gen != shared_gen) {
+			spin_unlock(&dentry->d_lock);
+			dput(dentry);
+			err = -EAGAIN;
+			goto out;
+		}
+		if (fpos_cmp(ctx->pos, di->offset) <= 0) {
 			emit_dentry = true;
 		}
 		spin_unlock(&dentry->d_lock);
@@ -333,7 +339,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
 	    ceph_snap(inode) != CEPH_SNAPDIR &&
 	    __ceph_dir_is_complete_ordered(ci) &&
 	    __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
-		u32 shared_gen = ci->i_shared_gen;
+		int shared_gen = atomic_read(&ci->i_shared_gen);
 		spin_unlock(&ci->i_ceph_lock);
 		err = __dcache_readdir(file, ctx, shared_gen);
 		if (err != -EAGAIN)
@@ -381,6 +387,7 @@ more:
 		if (op == CEPH_MDS_OP_READDIR) {
 			req->r_direct_hash = ceph_frag_value(frag);
 			__set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
+			req->r_inode_drop = CEPH_CAP_FILE_EXCL;
 		}
 		if (fi->last_name) {
 			req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
@@ -750,7 +757,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 			spin_unlock(&ci->i_ceph_lock);
 			dout(" dir %p complete, -ENOENT\n", dir);
 			d_add(dentry, NULL);
-			di->lease_shared_gen = ci->i_shared_gen;
+			di->lease_shared_gen = atomic_read(&ci->i_shared_gen);
 			return NULL;
 		}
 		spin_unlock(&ci->i_ceph_lock);
@@ -835,7 +842,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_args.mknod.mode = cpu_to_le32(mode);
 	req->r_args.mknod.rdev = cpu_to_le32(rdev);
-	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 	if (acls.pagelist) {
 		req->r_pagelist = acls.pagelist;
@@ -887,7 +894,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
-	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 	err = ceph_mdsc_do_request(mdsc, dir, req);
 	if (!err && !req->r_reply_info.head->is_dentry)
@@ -936,7 +943,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
 	req->r_parent = dir;
 	set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
 	req->r_args.mkdir.mode = cpu_to_le32(mode);
-	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+	req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 	if (acls.pagelist) {
 		req->r_pagelist = acls.pagelist;
@@ -983,7 +990,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 	/* release LINK_SHARED on source inode (mds will lock it) */
-	req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
+	req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
 	err = ceph_mdsc_do_request(mdsc, dir, req);
 	if (err) {
 		d_drop(dentry);
@@ -1096,7 +1103,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 	/* release LINK_RDCACHE on source inode (mds will lock it) */
-	req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
+	req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
 	if (d_really_is_positive(new_dentry))
 		req->r_inode_drop = drop_caps_for_unlink(d_inode(new_dentry));
 	err = ceph_mdsc_do_request(mdsc, old_dir, req);
@@ -1106,16 +1113,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 		 * do_request, above).  If there is no trace, we need
 		 * to do it here.
 		 */
-
-		/* d_move screws up sibling dentries' offsets */
-		ceph_dir_clear_complete(old_dir);
-		ceph_dir_clear_complete(new_dir);
-
 		d_move(old_dentry, new_dentry);
-
-		/* ensure target dentry is invalidated, despite
-		   rehashing bug in vfs_rename_dir */
-		ceph_invalidate_dentry_lease(new_dentry);
 	}
 	ceph_mdsc_put_request(req);
 	return err;
@@ -1199,12 +1197,12 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
 	int valid = 0;
 
 	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_shared_gen == di->lease_shared_gen)
+	if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen)
 		valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1);
 	spin_unlock(&ci->i_ceph_lock);
 	dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n",
-	     dir, (unsigned)ci->i_shared_gen, dentry,
-	     (unsigned)di->lease_shared_gen, valid);
+	     dir, (unsigned)atomic_read(&ci->i_shared_gen),
+	     dentry, (unsigned)di->lease_shared_gen, valid);
 	return valid;
 }
 
@@ -1332,24 +1330,37 @@ static void ceph_d_release(struct dentry *dentry)
  */
 static void ceph_d_prune(struct dentry *dentry)
 {
-	dout("ceph_d_prune %p\n", dentry);
+	struct ceph_inode_info *dir_ci;
+	struct ceph_dentry_info *di;
+
+	dout("ceph_d_prune %pd %p\n", dentry, dentry);
 
 	/* do we have a valid parent? */
 	if (IS_ROOT(dentry))
 		return;
 
-	/* if we are not hashed, we don't affect dir's completeness */
-	if (d_unhashed(dentry))
+	/* we hold d_lock, so d_parent is stable */
+	dir_ci = ceph_inode(d_inode(dentry->d_parent));
+	if (dir_ci->i_vino.snap == CEPH_SNAPDIR)
 		return;
 
-	if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_SNAPDIR)
+	/* who calls d_delete() should also disable dcache readdir */
+	if (d_really_is_negative(dentry))
 		return;
 
-	/*
-	 * we hold d_lock, so d_parent is stable, and d_fsdata is never
-	 * cleared until d_release
-	 */
-	ceph_dir_clear_complete(d_inode(dentry->d_parent));
+	/* d_fsdata does not get cleared until d_release */
+	if (!d_unhashed(dentry)) {
+		__ceph_dir_clear_complete(dir_ci);
+		return;
+	}
+
+	/* Disable dcache readdir just in case that someone called d_drop()
+	 * or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED
+	 * properly (dcache readdir is still enabled) */
+	di = ceph_dentry(dentry);
+	if (di->offset > 0 &&
+	    di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen))
+		__ceph_dir_clear_ordered(dir_ci);
 }
 
 /*
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 5c17125f45c7..6639926eed4e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -181,6 +181,10 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 			return -ENOMEM;
 		}
 		cf->fmode = fmode;
+
+		spin_lock_init(&cf->rw_contexts_lock);
+		INIT_LIST_HEAD(&cf->rw_contexts);
+
 		cf->next_offset = 2;
 		cf->readdir_cache_idx = -1;
 		file->private_data = cf;
@@ -396,7 +400,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
 	if (flags & O_CREAT) {
-		req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
+		req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL;
 		req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 		if (acls.pagelist) {
 			req->r_pagelist = acls.pagelist;
@@ -464,6 +468,7 @@ int ceph_release(struct inode *inode, struct file *file)
 		ceph_mdsc_put_request(cf->last_readdir);
 	kfree(cf->last_name);
 	kfree(cf->dir_info);
+	WARN_ON(!list_empty(&cf->rw_contexts));
 	kmem_cache_free(ceph_file_cachep, cf);
 
 	/* wake up anyone waiting for caps on this inode */
@@ -1199,12 +1204,13 @@ again:
 			retry_op = READ_INLINE;
 		}
 	} else {
+		CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
 		dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
 		     ceph_cap_string(got));
-		current->journal_info = filp;
+		ceph_add_rw_context(fi, &rw_ctx);
 		ret = generic_file_read_iter(iocb, to);
-		current->journal_info = NULL;
+		ceph_del_rw_context(fi, &rw_ctx);
 	}
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
 	     inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ab81652198c4..c6ec5aa46100 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -494,7 +494,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 	ci->i_wrbuffer_ref = 0;
 	ci->i_wrbuffer_ref_head = 0;
 	atomic_set(&ci->i_filelock_ref, 0);
-	ci->i_shared_gen = 0;
+	atomic_set(&ci->i_shared_gen, 0);
 	ci->i_rdcache_gen = 0;
 	ci->i_rdcache_revoking = 0;
 
@@ -1041,7 +1041,7 @@ static void update_dentry_lease(struct dentry *dentry,
 	if (ceph_snap(dir) != CEPH_NOSNAP)
 		goto out_unlock;
 
-	di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
+	di->lease_shared_gen = atomic_read(&ceph_inode(dir)->i_shared_gen);
 
 	if (duration == 0)
 		goto out_unlock;
@@ -1080,6 +1080,27 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
 
 	BUG_ON(d_inode(dn));
 
+	if (S_ISDIR(in->i_mode)) {
+		/* If inode is directory, d_splice_alias() below will remove
+		 * 'realdn' from its origin parent. We need to ensure that
+		 * origin parent's readdir cache will not reference 'realdn'
+		 */
+		realdn = d_find_any_alias(in);
+		if (realdn) {
+			struct ceph_dentry_info *di = ceph_dentry(realdn);
+			spin_lock(&realdn->d_lock);
+
+			realdn->d_op->d_prune(realdn);
+
+			di->time = jiffies;
+			di->lease_shared_gen = 0;
+			di->offset = 0;
+
+			spin_unlock(&realdn->d_lock);
+			dput(realdn);
+		}
+	}
+
 	/* dn must be unhashed */
 	if (!d_unhashed(dn))
 		d_drop(dn);
@@ -1295,8 +1316,8 @@ retry_lookup:
 		if (!rinfo->head->is_target) {
 			dout("fill_trace null dentry\n");
 			if (d_really_is_positive(dn)) {
-				ceph_dir_clear_ordered(dir);
 				dout("d_delete %p\n", dn);
+				ceph_dir_clear_ordered(dir);
 				d_delete(dn);
 			} else if (have_lease) {
 				if (d_unhashed(dn))
@@ -1323,7 +1344,6 @@ retry_lookup:
 			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
 			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
 			     ceph_vinop(in));
-			ceph_dir_clear_ordered(dir);
 			d_invalidate(dn);
 			have_lease = false;
 		}
@@ -1573,9 +1593,19 @@ retry_lookup:
 		} else if (d_really_is_positive(dn) &&
 			   (ceph_ino(d_inode(dn)) != tvino.ino ||
 			    ceph_snap(d_inode(dn)) != tvino.snap)) {
+			struct ceph_dentry_info *di = ceph_dentry(dn);
 			dout(" dn %p points to wrong inode %p\n",
 			     dn, d_inode(dn));
-			__ceph_dir_clear_ordered(ci);
+
+			spin_lock(&dn->d_lock);
+			if (di->offset > 0 &&
+			    di->lease_shared_gen ==
+			    atomic_read(&ci->i_shared_gen)) {
+				__ceph_dir_clear_ordered(ci);
+				di->offset = 0;
+			}
+			spin_unlock(&dn->d_lock);
+
 			d_delete(dn);
 			dput(dn);
 			goto retry_lookup;
@@ -1600,9 +1630,7 @@ retry_lookup:
 				 &req->r_caps_reservation);
 		if (ret < 0) {
 			pr_err("fill_inode badness on %p\n", in);
-			if (d_really_is_positive(dn))
-				__ceph_dir_clear_ordered(ci);
-			else
+			if (d_really_is_negative(dn))
 				iput(in);
 			d_drop(dn);
 			err = ret;
@@ -2000,8 +2028,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
 			ceph_encode_timespec(&req->r_args.setattr.atime,
 					     &attr->ia_atime);
 			mask |= CEPH_SETATTR_ATIME;
-			release |= CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_RD |
-				CEPH_CAP_FILE_WR;
+			release |= CEPH_CAP_FILE_SHARED |
+				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
 		}
 	}
 	if (ia_valid & ATTR_MTIME) {
@@ -2022,8 +2050,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
 			ceph_encode_timespec(&req->r_args.setattr.mtime,
 					     &attr->ia_mtime);
 			mask |= CEPH_SETATTR_MTIME;
-			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
-				CEPH_CAP_FILE_WR;
+			release |= CEPH_CAP_FILE_SHARED |
+				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
 		}
 	}
 	if (ia_valid & ATTR_SIZE) {
@@ -2041,8 +2069,8 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)
 			req->r_args.setattr.old_size =
 				cpu_to_le64(inode->i_size);
 			mask |= CEPH_SETATTR_SIZE;
-			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_RD |
-				CEPH_CAP_FILE_WR;
+			release |= CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL |
+				   CEPH_CAP_FILE_RD | CEPH_CAP_FILE_WR;
 		}
 	}
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 1b468250e947..2e8f90f96540 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -604,10 +604,20 @@ static void __register_request(struct ceph_mds_client *mdsc,
 			       struct ceph_mds_request *req,
 			       struct inode *dir)
 {
+	int ret = 0;
+
 	req->r_tid = ++mdsc->last_tid;
-	if (req->r_num_caps)
-		ceph_reserve_caps(mdsc, &req->r_caps_reservation,
-				  req->r_num_caps);
+	if (req->r_num_caps) {
+		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+					req->r_num_caps);
+		if (ret < 0) {
+			pr_err("__register_request %p "
+			       "failed to reserve caps: %d\n", req, ret);
+			/* set req->r_err to fail early from __do_request */
+			req->r_err = ret;
+			return;
+		}
+	}
 	dout("__register_request %p tid %lld\n", req, req->r_tid);
 	ceph_mdsc_get_request(req);
 	insert_request(&mdsc->request_tree, req);
@@ -1545,9 +1555,9 @@ out:
 /*
  * Trim session cap count down to some max number.
  */
-static int trim_caps(struct ceph_mds_client *mdsc,
-		     struct ceph_mds_session *session,
-		     int max_caps)
+int ceph_trim_caps(struct ceph_mds_client *mdsc,
+		   struct ceph_mds_session *session,
+		   int max_caps)
 {
 	int trim_caps = session->s_nr_caps - max_caps;
 
@@ -2438,11 +2448,14 @@ out:
  */
 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
 {
-	struct inode *inode = req->r_parent;
+	struct inode *dir = req->r_parent;
+	struct inode *old_dir = req->r_old_dentry_dir;
 
-	dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
+	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
 
-	ceph_dir_clear_complete(inode);
+	ceph_dir_clear_complete(dir);
+	if (old_dir)
+		ceph_dir_clear_complete(old_dir);
 	if (req->r_dentry)
 		ceph_invalidate_dentry_lease(req->r_dentry);
 	if (req->r_old_dentry)
@@ -2773,7 +2786,7 @@ static void handle_session(struct ceph_mds_session *session,
 		break;
 
 	case CEPH_SESSION_RECALL_STATE:
-		trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
+		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
 		break;
 
 	case CEPH_SESSION_FLUSHMSG:
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 837ac4b087a0..71e3b783ee6f 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -444,4 +444,7 @@ ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
 extern void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
 					  struct ceph_mds_session *session);
 
+extern int ceph_trim_caps(struct ceph_mds_client *mdsc,
+			  struct ceph_mds_session *session,
+			  int max_caps);
 #endif
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 8a2ca41e4b97..07cf95e6413d 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -922,13 +922,17 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
 			/*
 			 * Move the inode to the new realm
 			 */
-			spin_lock(&realm->inodes_with_caps_lock);
+			oldrealm = ci->i_snap_realm;
+			spin_lock(&oldrealm->inodes_with_caps_lock);
 			list_del_init(&ci->i_snap_realm_item);
+			spin_unlock(&oldrealm->inodes_with_caps_lock);
+
+			spin_lock(&realm->inodes_with_caps_lock);
 			list_add(&ci->i_snap_realm_item,
 				 &realm->inodes_with_caps);
-			oldrealm = ci->i_snap_realm;
 			ci->i_snap_realm = realm;
 			spin_unlock(&realm->inodes_with_caps_lock);
+
 			spin_unlock(&ci->i_ceph_lock);
 
 			ceph_get_snap_realm(mdsc, realm);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 2beeec07fa76..21b2e5b004eb 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -256,7 +256,8 @@ struct ceph_inode_xattr {
  */
 struct ceph_dentry_info {
 	struct ceph_mds_session *lease_session;
-	u32 lease_gen, lease_shared_gen;
+	int lease_shared_gen;
+	u32 lease_gen;
 	u32 lease_seq;
 	unsigned long lease_renew_after, lease_renew_from;
 	struct list_head lru;
@@ -353,7 +354,7 @@ struct ceph_inode_info {
 	int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref;
 	int i_wrbuffer_ref, i_wrbuffer_ref_head;
 	atomic_t i_filelock_ref;
-	u32 i_shared_gen;       /* increment each time we get FILE_SHARED */
+	atomic_t i_shared_gen;       /* increment each time we get FILE_SHARED */
 	u32 i_rdcache_gen;      /* incremented each time we get FILE_CACHE. */
 	u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
 
@@ -648,7 +649,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check);
 extern void ceph_caps_init(struct ceph_mds_client *mdsc);
 extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
 extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
-extern void ceph_reserve_caps(struct ceph_mds_client *mdsc,
+extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 			     struct ceph_cap_reservation *ctx, int need);
 extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
 			       struct ceph_cap_reservation *ctx);
@@ -668,6 +669,9 @@ struct ceph_file_info {
 	short fmode;     /* initialized on open */
 	short flags;     /* CEPH_F_* */
 
+	spinlock_t rw_contexts_lock;
+	struct list_head rw_contexts;
+
 	/* readdir: position within the dir */
 	u32 frag;
 	struct ceph_mds_request *last_readdir;
@@ -684,6 +688,49 @@ struct ceph_file_info {
 	int dir_info_len;
 };
 
+struct ceph_rw_context {
+	struct list_head list;
+	struct task_struct *thread;
+	int caps;
+};
+
+#define CEPH_DEFINE_RW_CONTEXT(_name, _caps)	\
+	struct ceph_rw_context _name = {	\
+		.thread = current,		\
+		.caps = _caps,			\
+	}
+
+static inline void ceph_add_rw_context(struct ceph_file_info *cf,
+				       struct ceph_rw_context *ctx)
+{
+	spin_lock(&cf->rw_contexts_lock);
+	list_add(&ctx->list, &cf->rw_contexts);
+	spin_unlock(&cf->rw_contexts_lock);
+}
+
+static inline void ceph_del_rw_context(struct ceph_file_info *cf,
+				       struct ceph_rw_context *ctx)
+{
+	spin_lock(&cf->rw_contexts_lock);
+	list_del(&ctx->list);
+	spin_unlock(&cf->rw_contexts_lock);
+}
+
+static inline struct ceph_rw_context*
+ceph_find_rw_context(struct ceph_file_info *cf)
+{
+	struct ceph_rw_context *ctx, *found = NULL;
+	spin_lock(&cf->rw_contexts_lock);
+	list_for_each_entry(ctx, &cf->rw_contexts, list) {
+		if (ctx->thread == current) {
+			found = ctx;
+			break;
+		}
+	}
+	spin_unlock(&cf->rw_contexts_lock);
+	return found;
+}
+
 struct ceph_readdir_cache_control {
 	struct page  *page;
 	struct dentry **dentries;