summary refs log tree commit diff
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2011-07-26 13:38:50 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2011-07-26 13:38:50 -0700
commitba5b56cb3e3d2cab73d4fee9a022bb69462a8cd9 (patch)
treeeda7ea059a41ae5d68e2ad5a36a87069187ef22a /fs
parent243dd2809a5edd2e0e3e62781083aa44049af37d (diff)
parentd79698da32b317e96216236f265a9b72b78ae568 (diff)
downloadlinux-ba5b56cb3e3d2cab73d4fee9a022bb69462a8cd9.tar.gz
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
  ceph: document unlocked d_parent accesses
  ceph: explicitly reference rename old_dentry parent dir in request
  ceph: document locking for ceph_set_dentry_offset
  ceph: avoid d_parent in ceph_dentry_hash; fix ceph_encode_fh() hashing bug
  ceph: protect d_parent access in ceph_d_revalidate
  ceph: protect access to d_parent
  ceph: handle racing calls to ceph_init_dentry
  ceph: set dir complete frag after adding capability
  rbd: set blk_queue request sizes to object size
  ceph: set up readahead size when rsize is not passed
  rbd: cancel watch request when releasing the device
  ceph: ignore lease mask
  ceph: fix ceph_lookup_open intent usage
  ceph: only link open operations to directory unsafe list if O_CREAT|O_TRUNC
  ceph: fix bad parent_inode calc in ceph_lookup_open
  ceph: avoid carrying Fw cap during write into page cache
  libceph: don't time out osd requests that haven't been received
  ceph: report f_bfree based on kb_avail rather than diffing.
  ceph: only queue capsnap if caps are dirty
  ceph: fix snap writeback when racing with writes
  ...
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/debugfs.c2
-rw-r--r--fs/ceph/dir.c116
-rw-r--r--fs/ceph/export.c24
-rw-r--r--fs/ceph/file.c61
-rw-r--r--fs/ceph/inode.c48
-rw-r--r--fs/ceph/ioctl.c15
-rw-r--r--fs/ceph/ioctl.h1
-rw-r--r--fs/ceph/mds_client.c56
-rw-r--r--fs/ceph/mds_client.h3
-rw-r--r--fs/ceph/snap.c25
-rw-r--r--fs/ceph/super.c7
-rw-r--r--fs/ceph/super.h20
-rw-r--r--fs/ceph/xattr.c8
13 files changed, 249 insertions, 137 deletions
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 0dba6915712b..fb962efdacee 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -102,7 +102,7 @@ static int mdsc_show(struct seq_file *s, void *p)
 				path = NULL;
 			spin_lock(&req->r_old_dentry->d_lock);
 			seq_printf(s, " #%llx/%.*s (%s)",
-			   ceph_ino(req->r_old_dentry->d_parent->d_inode),
+			   ceph_ino(req->r_old_dentry_dir),
 				   req->r_old_dentry->d_name.len,
 				   req->r_old_dentry->d_name.name,
 				   path ? path : "");
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 1065ac779840..382abc9a6a54 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -40,14 +40,6 @@ int ceph_init_dentry(struct dentry *dentry)
 	if (dentry->d_fsdata)
 		return 0;
 
-	if (dentry->d_parent == NULL ||   /* nfs fh_to_dentry */
-	    ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
-		d_set_d_op(dentry, &ceph_dentry_ops);
-	else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
-		d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
-	else
-		d_set_d_op(dentry, &ceph_snap_dentry_ops);
-
 	di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO);
 	if (!di)
 		return -ENOMEM;          /* oh well */
@@ -58,16 +50,42 @@ int ceph_init_dentry(struct dentry *dentry)
 		kmem_cache_free(ceph_dentry_cachep, di);
 		goto out_unlock;
 	}
+
+	if (dentry->d_parent == NULL ||   /* nfs fh_to_dentry */
+	    ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
+		d_set_d_op(dentry, &ceph_dentry_ops);
+	else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
+		d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
+	else
+		d_set_d_op(dentry, &ceph_snap_dentry_ops);
+
 	di->dentry = dentry;
 	di->lease_session = NULL;
-	dentry->d_fsdata = di;
 	dentry->d_time = jiffies;
+	/* avoid reordering d_fsdata setup so that the check above is safe */
+	smp_mb();
+	dentry->d_fsdata = di;
 	ceph_dentry_lru_add(dentry);
 out_unlock:
 	spin_unlock(&dentry->d_lock);
 	return 0;
 }
 
+struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
+{
+	struct inode *inode = NULL;
+
+	if (!dentry)
+		return NULL;
+
+	spin_lock(&dentry->d_lock);
+	if (dentry->d_parent) {
+		inode = dentry->d_parent->d_inode;
+		ihold(inode);
+	}
+	spin_unlock(&dentry->d_lock);
+	return inode;
+}
 
 
 /*
@@ -133,7 +151,7 @@ more:
 		     d_unhashed(dentry) ? "!hashed" : "hashed",
 		     parent->d_subdirs.prev, parent->d_subdirs.next);
 		if (p == &parent->d_subdirs) {
-			fi->at_end = 1;
+			fi->flags |= CEPH_F_ATEND;
 			goto out_unlock;
 		}
 		spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
@@ -234,7 +252,7 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
 	const int max_bytes = fsc->mount_options->max_readdir_bytes;
 
 	dout("readdir %p filp %p frag %u off %u\n", inode, filp, frag, off);
-	if (fi->at_end)
+	if (fi->flags & CEPH_F_ATEND)
 		return 0;
 
 	/* always start with . and .. */
@@ -403,7 +421,7 @@ more:
 		dout("readdir next frag is %x\n", frag);
 		goto more;
 	}
-	fi->at_end = 1;
+	fi->flags |= CEPH_F_ATEND;
 
 	/*
 	 * if dir_release_count still matches the dir, no dentries
@@ -435,7 +453,7 @@ static void reset_readdir(struct ceph_file_info *fi)
 		dput(fi->dentry);
 		fi->dentry = NULL;
 	}
-	fi->at_end = 0;
+	fi->flags &= ~CEPH_F_ATEND;
 }
 
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
@@ -463,7 +481,7 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int origin)
 		if (offset != file->f_pos) {
 			file->f_pos = offset;
 			file->f_version = 0;
-			fi->at_end = 0;
+			fi->flags &= ~CEPH_F_ATEND;
 		}
 		retval = offset;
 
@@ -488,21 +506,13 @@ out:
 }
 
 /*
- * Process result of a lookup/open request.
- *
- * Mainly, make sure we return the final req->r_dentry (if it already
- * existed) in place of the original VFS-provided dentry when they
- * differ.
- *
- * Gracefully handle the case where the MDS replies with -ENOENT and
- * no trace (which it may do, at its discretion, e.g., if it doesn't
- * care to issue a lease on the negative dentry).
+ * Handle lookups for the hidden .snap directory.
  */
-struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
-				  struct dentry *dentry, int err)
+int ceph_handle_snapdir(struct ceph_mds_request *req,
+			struct dentry *dentry, int err)
 {
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
-	struct inode *parent = dentry->d_parent->d_inode;
+	struct inode *parent = dentry->d_parent->d_inode; /* we hold i_mutex */
 
 	/* .snap dir? */
 	if (err == -ENOENT &&
@@ -516,7 +526,23 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
 		d_add(dentry, inode);
 		err = 0;
 	}
+	return err;
+}
 
+/*
+ * Figure out final result of a lookup/open request.
+ *
+ * Mainly, make sure we return the final req->r_dentry (if it already
+ * existed) in place of the original VFS-provided dentry when they
+ * differ.
+ *
+ * Gracefully handle the case where the MDS replies with -ENOENT and
+ * no trace (which it may do, at its discretion, e.g., if it doesn't
+ * care to issue a lease on the negative dentry).
+ */
+struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
+				  struct dentry *dentry, int err)
+{
 	if (err == -ENOENT) {
 		/* no trace? */
 		err = 0;
@@ -610,6 +636,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
 	req->r_locked_dir = dir;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
+	err = ceph_handle_snapdir(req, dentry, err);
 	dentry = ceph_finish_lookup(req, dentry, err);
 	ceph_mdsc_put_request(req);  /* will dput(dentry) */
 	dout("lookup result=%p\n", dentry);
@@ -789,6 +816,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
 	req->r_old_dentry = dget(old_dentry); /* or inode? hrm. */
+	req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
 	req->r_locked_dir = dir;
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -887,6 +915,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
 	req->r_dentry = dget(new_dentry);
 	req->r_num_caps = 2;
 	req->r_old_dentry = dget(old_dentry);
+	req->r_old_dentry_dir = ceph_get_dentry_parent_inode(old_dentry);
 	req->r_locked_dir = new_dir;
 	req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -1002,36 +1031,38 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
  */
 static int ceph_d_revalidate(struct dentry *dentry, struct nameidata *nd)
 {
+	int valid = 0;
 	struct inode *dir;
 
 	if (nd && nd->flags & LOOKUP_RCU)
 		return -ECHILD;
 
-	dir = dentry->d_parent->d_inode;
-
 	dout("d_revalidate %p '%.*s' inode %p offset %lld\n", dentry,
 	     dentry->d_name.len, dentry->d_name.name, dentry->d_inode,
 	     ceph_dentry(dentry)->offset);
 
+	dir = ceph_get_dentry_parent_inode(dentry);
+
 	/* always trust cached snapped dentries, snapdir dentry */
 	if (ceph_snap(dir) != CEPH_NOSNAP) {
 		dout("d_revalidate %p '%.*s' inode %p is SNAPPED\n", dentry,
 		     dentry->d_name.len, dentry->d_name.name, dentry->d_inode);
-		goto out_touch;
+		valid = 1;
+	} else if (dentry->d_inode &&
+		   ceph_snap(dentry->d_inode) == CEPH_SNAPDIR) {
+		valid = 1;
+	} else if (dentry_lease_is_valid(dentry) ||
+		   dir_lease_is_valid(dir, dentry)) {
+		valid = 1;
 	}
-	if (dentry->d_inode && ceph_snap(dentry->d_inode) == CEPH_SNAPDIR)
-		goto out_touch;
-
-	if (dentry_lease_is_valid(dentry) ||
-	    dir_lease_is_valid(dir, dentry))
-		goto out_touch;
 
-	dout("d_revalidate %p invalid\n", dentry);
-	d_drop(dentry);
-	return 0;
-out_touch:
-	ceph_dentry_lru_touch(dentry);
-	return 1;
+	dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
+	if (valid)
+		ceph_dentry_lru_touch(dentry);
+	else
+		d_drop(dentry);
+	iput(dir);
+	return valid;
 }
 
 /*
@@ -1228,9 +1259,8 @@ void ceph_dentry_lru_del(struct dentry *dn)
  * Return name hash for a given dentry.  This is dependent on
  * the parent directory's hash function.
  */
-unsigned ceph_dentry_hash(struct dentry *dn)
+unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn)
 {
-	struct inode *dir = dn->d_parent->d_inode;
 	struct ceph_inode_info *dci = ceph_inode(dir);
 
 	switch (dci->i_dir_layout.dl_dir_hash) {
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index f67b687550de..9fbcdecaaccd 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -46,7 +46,7 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
 	int type;
 	struct ceph_nfs_fh *fh = (void *)rawfh;
 	struct ceph_nfs_confh *cfh = (void *)rawfh;
-	struct dentry *parent = dentry->d_parent;
+	struct dentry *parent;
 	struct inode *inode = dentry->d_inode;
 	int connected_handle_length = sizeof(*cfh)/4;
 	int handle_length = sizeof(*fh)/4;
@@ -55,26 +55,33 @@ static int ceph_encode_fh(struct dentry *dentry, u32 *rawfh, int *max_len,
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EINVAL;
 
+	spin_lock(&dentry->d_lock);
+	parent = dget(dentry->d_parent);
+	spin_unlock(&dentry->d_lock);
+
 	if (*max_len >= connected_handle_length) {
 		dout("encode_fh %p connectable\n", dentry);
 		cfh->ino = ceph_ino(dentry->d_inode);
 		cfh->parent_ino = ceph_ino(parent->d_inode);
-		cfh->parent_name_hash = ceph_dentry_hash(parent);
+		cfh->parent_name_hash = ceph_dentry_hash(parent->d_inode,
+							 dentry);
 		*max_len = connected_handle_length;
 		type = 2;
 	} else if (*max_len >= handle_length) {
 		if (connectable) {
 			*max_len = connected_handle_length;
-			return 255;
+			type = 255;
+		} else {
+			dout("encode_fh %p\n", dentry);
+			fh->ino = ceph_ino(dentry->d_inode);
+			*max_len = handle_length;
+			type = 1;
 		}
-		dout("encode_fh %p\n", dentry);
-		fh->ino = ceph_ino(dentry->d_inode);
-		*max_len = handle_length;
-		type = 1;
 	} else {
 		*max_len = handle_length;
-		return 255;
+		type = 255;
 	}
+	dput(parent);
 	return type;
 }
 
@@ -123,7 +130,6 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
 		return dentry;
 	}
 	err = ceph_init_dentry(dentry);
-
 	if (err < 0) {
 		iput(inode);
 		return ERR_PTR(err);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0d0eae05598f..ce549d31eeb7 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -122,7 +122,7 @@ int ceph_open(struct inode *inode, struct file *file)
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
 	struct ceph_file_info *cf = file->private_data;
-	struct inode *parent_inode = file->f_dentry->d_parent->d_inode;
+	struct inode *parent_inode = NULL;
 	int err;
 	int flags, fmode, wanted;
 
@@ -194,7 +194,10 @@ int ceph_open(struct inode *inode, struct file *file)
 	req->r_inode = inode;
 	ihold(inode);
 	req->r_num_caps = 1;
+	if (flags & (O_CREAT|O_TRUNC))
+		parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
 	err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+	iput(parent_inode);
 	if (!err)
 		err = ceph_init_file(inode, file, req->r_fmode);
 	ceph_mdsc_put_request(req);
@@ -222,9 +225,9 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
 {
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
-	struct file *file = nd->intent.open.file;
-	struct inode *parent_inode = get_dentry_parent_inode(file->f_dentry);
+	struct file *file;
 	struct ceph_mds_request *req;
+	struct dentry *ret;
 	int err;
 	int flags = nd->intent.open.flags;
 
@@ -242,16 +245,24 @@ struct dentry *ceph_lookup_open(struct inode *dir, struct dentry *dentry,
 		req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
 	}
 	req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
-	err = ceph_mdsc_do_request(mdsc, parent_inode, req);
-	dentry = ceph_finish_lookup(req, dentry, err);
-	if (!err && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
+	err = ceph_mdsc_do_request(mdsc,
+				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
+				   req);
+	err = ceph_handle_snapdir(req, dentry, err);
+	if (err)
+		goto out;
+	if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
-	if (!err)
-		err = ceph_init_file(req->r_dentry->d_inode, file,
-				     req->r_fmode);
+	if (err)
+		goto out;
+	file = lookup_instantiate_filp(nd, req->r_dentry, ceph_open);
+	if (IS_ERR(file))
+		err = PTR_ERR(file);
+out:
+	ret = ceph_finish_lookup(req, dentry, err);
 	ceph_mdsc_put_request(req);
-	dout("ceph_lookup_open result=%p\n", dentry);
-	return dentry;
+	dout("ceph_lookup_open result=%p\n", ret);
+	return ret;
 }
 
 int ceph_release(struct inode *inode, struct file *file)
@@ -643,7 +654,8 @@ again:
 
 	if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (inode->i_sb->s_flags & MS_SYNCHRONOUS))
+	    (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
+	    (fi->flags & CEPH_F_SYNC))
 		/* hmm, this isn't really async... */
 		ret = ceph_sync_read(filp, base, len, ppos, &checkeof);
 	else
@@ -712,7 +724,7 @@ retry_snap:
 		want = CEPH_CAP_FILE_BUFFER;
 	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
 	if (ret < 0)
-		goto out;
+		goto out_put;
 
 	dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
 	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
@@ -720,12 +732,23 @@ retry_snap:
 
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (inode->i_sb->s_flags & MS_SYNCHRONOUS)) {
+	    (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
+	    (fi->flags & CEPH_F_SYNC)) {
 		ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
 			&iocb->ki_pos);
 	} else {
-		ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
+		/*
+		 * buffered write; drop Fw early to avoid slow
+		 * revocation if we get stuck on balance_dirty_pages
+		 */
+		int dirty;
 
+		spin_lock(&inode->i_lock);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		spin_unlock(&inode->i_lock);
+		ceph_put_cap_refs(ci, got);
+
+		ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
 		if ((ret >= 0 || ret == -EIOCBQUEUED) &&
 		    ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
 		     || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
@@ -733,7 +756,12 @@ retry_snap:
 			if (err < 0)
 				ret = err;
 		}
+
+		if (dirty)
+			__mark_inode_dirty(inode, dirty);
+		goto out;
 	}
+
 	if (ret >= 0) {
 		int dirty;
 		spin_lock(&inode->i_lock);
@@ -743,12 +771,13 @@ retry_snap:
 			__mark_inode_dirty(inode, dirty);
 	}
 
-out:
+out_put:
 	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
 	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
 	     ceph_cap_string(got));
 	ceph_put_cap_refs(ci, got);
 
+out:
 	if (ret == -EOLDSNAPC) {
 		dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
 		     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index dfb2831d8d85..095799ba9dd1 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -560,7 +560,8 @@ static int fill_inode(struct inode *inode,
 	struct ceph_mds_reply_inode *info = iinfo->in;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int i;
-	int issued, implemented;
+	int issued = 0, implemented;
+	int updating_inode = 0;
 	struct timespec mtime, atime, ctime;
 	u32 nsplits;
 	struct ceph_buffer *xattr_blob = NULL;
@@ -599,7 +600,8 @@ static int fill_inode(struct inode *inode,
 	if (le64_to_cpu(info->version) > 0 &&
 	    (ci->i_version & ~1) >= le64_to_cpu(info->version))
 		goto no_change;
-
+	
+	updating_inode = 1;
 	issued = __ceph_caps_issued(ci, &implemented);
 	issued |= implemented | __ceph_caps_dirty(ci);
 
@@ -707,17 +709,6 @@ static int fill_inode(struct inode *inode,
 		ci->i_rfiles = le64_to_cpu(info->rfiles);
 		ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
 		ceph_decode_timespec(&ci->i_rctime, &info->rctime);
-
-		/* set dir completion flag? */
-		if (ci->i_files == 0 && ci->i_subdirs == 0 &&
-		    ceph_snap(inode) == CEPH_NOSNAP &&
-		    (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
-		    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
-		    (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
-			dout(" marking %p complete (empty)\n", inode);
-			/* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
-			ci->i_max_offset = 2;
-		}
 		break;
 	default:
 		pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
@@ -774,6 +765,19 @@ no_change:
 		__ceph_get_fmode(ci, cap_fmode);
 	}
 
+	/* set dir completion flag? */
+	if (S_ISDIR(inode->i_mode) &&
+	    updating_inode &&                 /* didn't jump to no_change */
+	    ci->i_files == 0 && ci->i_subdirs == 0 &&
+	    ceph_snap(inode) == CEPH_NOSNAP &&
+	    (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
+	    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+	    (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
+		dout(" marking %p complete (empty)\n", inode);
+		/* ci->i_ceph_flags |= CEPH_I_COMPLETE; */
+		ci->i_max_offset = 2;
+	}
+
 	/* update delegation info? */
 	if (dirinfo)
 		ceph_fill_dirfrag(inode, dirinfo);
@@ -805,14 +809,14 @@ static void update_dentry_lease(struct dentry *dentry,
 		return;
 
 	spin_lock(&dentry->d_lock);
-	dout("update_dentry_lease %p mask %d duration %lu ms ttl %lu\n",
-	     dentry, le16_to_cpu(lease->mask), duration, ttl);
+	dout("update_dentry_lease %p duration %lu ms ttl %lu\n",
+	     dentry, duration, ttl);
 
 	/* make lease_rdcache_gen match directory */
 	dir = dentry->d_parent->d_inode;
 	di->lease_shared_gen = ceph_inode(dir)->i_shared_gen;
 
-	if (lease->mask == 0)
+	if (duration == 0)
 		goto out_unlock;
 
 	if (di->lease_gen == session->s_cap_gen &&
@@ -839,11 +843,13 @@ out_unlock:
 /*
  * Set dentry's directory position based on the current dir's max, and
  * order it in d_subdirs, so that dcache_readdir behaves.
+ *
+ * Always called under directory's i_mutex.
  */
 static void ceph_set_dentry_offset(struct dentry *dn)
 {
 	struct dentry *dir = dn->d_parent;
-	struct inode *inode = dn->d_parent->d_inode;
+	struct inode *inode = dir->d_inode;
 	struct ceph_dentry_info *di;
 
 	BUG_ON(!inode);
@@ -1022,9 +1028,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 
 		/* do we have a dn lease? */
 		have_lease = have_dir_cap ||
-			(le16_to_cpu(rinfo->dlease->mask) &
-			 CEPH_LOCK_DN);
-
+			le32_to_cpu(rinfo->dlease->duration_ms);
 		if (!have_lease)
 			dout("fill_trace  no dentry lease or dir cap\n");
 
@@ -1560,7 +1564,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 {
 	struct inode *inode = dentry->d_inode;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct inode *parent_inode = dentry->d_parent->d_inode;
+	struct inode *parent_inode;
 	const unsigned int ia_valid = attr->ia_valid;
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
@@ -1743,7 +1747,9 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 		req->r_inode_drop = release;
 		req->r_args.setattr.mask = cpu_to_le32(mask);
 		req->r_num_caps = 1;
+		parent_inode = ceph_get_dentry_parent_inode(dentry);
 		err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+		iput(parent_inode);
 	}
 	dout("setattr %p result=%d (%s locally, %d remote)\n", inode, err,
 	     ceph_cap_string(dirtied), mask);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index ef0b5f48e13a..3b256b50f7d8 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -38,7 +38,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
 static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
 {
 	struct inode *inode = file->f_dentry->d_inode;
-	struct inode *parent_inode = file->f_dentry->d_parent->d_inode;
+	struct inode *parent_inode;
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_mds_request *req;
 	struct ceph_ioctl_layout l;
@@ -87,7 +87,9 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
 	req->r_args.setlayout.layout.fl_pg_preferred =
 		cpu_to_le32(l.preferred_osd);
 
+	parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
 	err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+	iput(parent_inode);
 	ceph_mdsc_put_request(req);
 	return err;
 }
@@ -231,6 +233,14 @@ static long ceph_ioctl_lazyio(struct file *file)
 	return 0;
 }
 
+static long ceph_ioctl_syncio(struct file *file)
+{
+	struct ceph_file_info *fi = file->private_data;
+
+	fi->flags |= CEPH_F_SYNC;
+	return 0;
+}
+
 long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 {
 	dout("ioctl file %p cmd %u arg %lu\n", file, cmd, arg);
@@ -249,6 +259,9 @@ long ceph_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 
 	case CEPH_IOC_LAZYIO:
 		return ceph_ioctl_lazyio(file);
+
+	case CEPH_IOC_SYNCIO:
+		return ceph_ioctl_syncio(file);
 	}
 
 	return -ENOTTY;
diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h
index 52e8fd74d450..0c5167e43180 100644
--- a/fs/ceph/ioctl.h
+++ b/fs/ceph/ioctl.h
@@ -40,5 +40,6 @@ struct ceph_ioctl_dataloc {
 				   struct ceph_ioctl_dataloc)
 
 #define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
+#define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5)
 
 #endif
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 0c1d91756528..fee028b5332e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -483,22 +483,26 @@ void ceph_mdsc_release_request(struct kref *kref)
 		destroy_reply_info(&req->r_reply_info);
 	}
 	if (req->r_inode) {
-		ceph_put_cap_refs(ceph_inode(req->r_inode),
-				  CEPH_CAP_PIN);
+		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
 		iput(req->r_inode);
 	}
 	if (req->r_locked_dir)
-		ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
-				  CEPH_CAP_PIN);
+		ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
 	if (req->r_target_inode)
 		iput(req->r_target_inode);
 	if (req->r_dentry)
 		dput(req->r_dentry);
 	if (req->r_old_dentry) {
-		ceph_put_cap_refs(
-			ceph_inode(req->r_old_dentry->d_parent->d_inode),
-			CEPH_CAP_PIN);
+		/*
+		 * track (and drop pins for) r_old_dentry_dir
+		 * separately, since r_old_dentry's d_parent may have
+		 * changed between the dir mutex being dropped and
+		 * this request being freed.
+		 */
+		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
+				  CEPH_CAP_PIN);
 		dput(req->r_old_dentry);
+		iput(req->r_old_dentry_dir);
 	}
 	kfree(req->r_path1);
 	kfree(req->r_path2);
@@ -617,6 +621,12 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
  */
 struct dentry *get_nonsnap_parent(struct dentry *dentry)
 {
+	/*
+	 * we don't need to worry about protecting the d_parent access
+	 * here because we never renaming inside the snapped namespace
+	 * except to resplice to another snapdir, and either the old or new
+	 * result is a valid result.
+	 */
 	while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
 		dentry = dentry->d_parent;
 	return dentry;
@@ -652,7 +662,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 	if (req->r_inode) {
 		inode = req->r_inode;
 	} else if (req->r_dentry) {
-		struct inode *dir = req->r_dentry->d_parent->d_inode;
+		/* ignore race with rename; old or new d_parent is okay */
+		struct dentry *parent = req->r_dentry->d_parent;
+		struct inode *dir = parent->d_inode;
 
 		if (dir->i_sb != mdsc->fsc->sb) {
 			/* not this fs! */
@@ -660,8 +672,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
 			/* direct snapped/virtual snapdir requests
 			 * based on parent dir inode */
-			struct dentry *dn =
-				get_nonsnap_parent(req->r_dentry->d_parent);
+			struct dentry *dn = get_nonsnap_parent(parent);
 			inode = dn->d_inode;
 			dout("__choose_mds using nonsnap parent %p\n", inode);
 		} else if (req->r_dentry->d_inode) {
@@ -670,7 +681,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 		} else {
 			/* dir + name */
 			inode = dir;
-			hash = ceph_dentry_hash(req->r_dentry);
+			hash = ceph_dentry_hash(dir, req->r_dentry);
 			is_hash = true;
 		}
 	}
@@ -1931,9 +1942,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
 	if (req->r_locked_dir)
 		ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
 	if (req->r_old_dentry)
-		ceph_get_cap_refs(
-			ceph_inode(req->r_old_dentry->d_parent->d_inode),
-			CEPH_CAP_PIN);
+		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
+				  CEPH_CAP_PIN);
 
 	/* issue */
 	mutex_lock(&mdsc->mutex);
@@ -2714,7 +2724,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 	struct ceph_mds_lease *h = msg->front.iov_base;
 	u32 seq;
 	struct ceph_vino vino;
-	int mask;
 	struct qstr dname;
 	int release = 0;
 
@@ -2725,7 +2734,6 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 		goto bad;
 	vino.ino = le64_to_cpu(h->ino);
 	vino.snap = CEPH_NOSNAP;
-	mask = le16_to_cpu(h->mask);
 	seq = le32_to_cpu(h->seq);
 	dname.name = (void *)h + sizeof(*h) + sizeof(u32);
 	dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
@@ -2737,8 +2745,8 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 
 	/* lookup inode */
 	inode = ceph_find_inode(sb, vino);
-	dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
-	     ceph_lease_op_name(h->action), mask, vino.ino, inode,
+	dout("handle_lease %s, ino %llx %p %.*s\n",
+	     ceph_lease_op_name(h->action), vino.ino, inode,
 	     dname.len, dname.name);
 	if (inode == NULL) {
 		dout("handle_lease no inode %llx\n", vino.ino);
@@ -2828,7 +2836,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
 		return;
 	lease = msg->front.iov_base;
 	lease->action = action;
-	lease->mask = cpu_to_le16(1);
 	lease->ino = cpu_to_le64(ceph_vino(inode).ino);
 	lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
 	lease->seq = cpu_to_le32(seq);
@@ -2850,7 +2857,7 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
  * Pass @inode always, @dentry is optional.
  */
 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
-			     struct dentry *dentry, int mask)
+			     struct dentry *dentry)
 {
 	struct ceph_dentry_info *di;
 	struct ceph_mds_session *session;
@@ -2858,7 +2865,6 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
 
 	BUG_ON(inode == NULL);
 	BUG_ON(dentry == NULL);
-	BUG_ON(mask == 0);
 
 	/* is dentry lease valid? */
 	spin_lock(&dentry->d_lock);
@@ -2868,8 +2874,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
 	    di->lease_gen != di->lease_session->s_cap_gen ||
 	    !time_before(jiffies, dentry->d_time)) {
 		dout("lease_release inode %p dentry %p -- "
-		     "no lease on %d\n",
-		     inode, dentry, mask);
+		     "no lease\n",
+		     inode, dentry);
 		spin_unlock(&dentry->d_lock);
 		return;
 	}
@@ -2880,8 +2886,8 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
 	__ceph_mdsc_drop_dentry_lease(dentry);
 	spin_unlock(&dentry->d_lock);
 
-	dout("lease_release inode %p dentry %p mask %d to mds%d\n",
-	     inode, dentry, mask, session->s_mds);
+	dout("lease_release inode %p dentry %p to mds%d\n",
+	     inode, dentry, session->s_mds);
 	ceph_mdsc_lease_send_msg(session, inode, dentry,
 				 CEPH_MDS_LEASE_RELEASE, seq);
 	ceph_put_mds_session(session);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 7d8a0d662d56..4bb239921dbd 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -171,6 +171,7 @@ struct ceph_mds_request {
 	struct inode *r_inode;              /* arg1 */
 	struct dentry *r_dentry;            /* arg1 */
 	struct dentry *r_old_dentry;        /* arg2: rename from or link from */
+	struct inode *r_old_dentry_dir;     /* arg2: old dentry's parent dir */
 	char *r_path1, *r_path2;
 	struct ceph_vino r_ino1, r_ino2;
 
@@ -333,7 +334,7 @@ extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
 
 extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
 				    struct inode *inode,
-				    struct dentry *dn, int mask);
+				    struct dentry *dn);
 
 extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
 
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 54b14de2e729..e26437191333 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -449,6 +449,15 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 	spin_lock(&inode->i_lock);
 	used = __ceph_caps_used(ci);
 	dirty = __ceph_caps_dirty(ci);
+
+	/*
+	 * If there is a write in progress, treat that as a dirty Fw,
+	 * even though it hasn't completed yet; by the time we finish
+	 * up this capsnap it will be.
+	 */
+	if (used & CEPH_CAP_FILE_WR)
+		dirty |= CEPH_CAP_FILE_WR;
+
 	if (__ceph_have_pending_cap_snap(ci)) {
 		/* there is no point in queuing multiple "pending" cap_snaps,
 		   as no new writes are allowed to start when pending, so any
@@ -456,13 +465,19 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 		   cap_snap.  lucky us. */
 		dout("queue_cap_snap %p already pending\n", inode);
 		kfree(capsnap);
-	} else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR) ||
-		   (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
-			     CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
+	} else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
+			    CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
 		struct ceph_snap_context *snapc = ci->i_head_snapc;
 
-		dout("queue_cap_snap %p cap_snap %p queuing under %p\n", inode,
-		     capsnap, snapc);
+		/*
+		 * if we are a sync write, we may need to go to the snaprealm
+		 * to get the current snapc.
+		 */
+		if (!snapc)
+			snapc = ci->i_snap_realm->cached_context;
+
+		dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
+		     inode, capsnap, snapc, ceph_cap_string(dirty));
 		ihold(inode);
 
 		atomic_set(&capsnap->nref, 1);
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index f2f77fd3c14c..d47c5ec7fb1f 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -73,8 +73,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 	 */
 	buf->f_bsize = 1 << CEPH_BLOCK_SHIFT;
 	buf->f_blocks = le64_to_cpu(st.kb) >> (CEPH_BLOCK_SHIFT-10);
-	buf->f_bfree = (le64_to_cpu(st.kb) - le64_to_cpu(st.kb_used)) >>
-		(CEPH_BLOCK_SHIFT-10);
+	buf->f_bfree = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
 	buf->f_bavail = le64_to_cpu(st.kb_avail) >> (CEPH_BLOCK_SHIFT-10);
 
 	buf->f_files = le64_to_cpu(st.num_objects);
@@ -780,6 +779,10 @@ static int ceph_register_bdi(struct super_block *sb,
 		fsc->backing_dev_info.ra_pages =
 			(fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
 			>> PAGE_SHIFT;
+	else
+		fsc->backing_dev_info.ra_pages =
+			default_backing_dev_info.ra_pages;
+
 	err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d",
 			   atomic_long_inc_return(&bdi_seq));
 	if (!err)
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 30446b144e3d..a23eed526f05 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -543,13 +543,16 @@ extern void ceph_reservation_status(struct ceph_fs_client *client,
 /*
  * we keep buffered readdir results attached to file->private_data
  */
+#define CEPH_F_SYNC     1
+#define CEPH_F_ATEND    2
+
 struct ceph_file_info {
-	int fmode;     /* initialized on open */
+	short fmode;     /* initialized on open */
+	short flags;     /* CEPH_F_* */
 
 	/* readdir: position within the dir */
 	u32 frag;
 	struct ceph_mds_request *last_readdir;
-	int at_end;
 
 	/* readdir: position within a frag */
 	unsigned offset;       /* offset of last chunk, adjusted for . and .. */
@@ -789,6 +792,8 @@ extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,
 	ceph_snapdir_dentry_ops;
 
 extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);
+extern int ceph_handle_snapdir(struct ceph_mds_request *req,
+			       struct dentry *dentry, int err);
 extern struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,
 					 struct dentry *dentry, int err);
 
@@ -796,7 +801,8 @@ extern void ceph_dentry_lru_add(struct dentry *dn);
 extern void ceph_dentry_lru_touch(struct dentry *dn);
 extern void ceph_dentry_lru_del(struct dentry *dn);
 extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
-extern unsigned ceph_dentry_hash(struct dentry *dn);
+extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
+extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
 
 /*
  * our d_ops vary depending on whether the inode is live,
@@ -819,14 +825,6 @@ extern int ceph_encode_locks(struct inode *i, struct ceph_pagelist *p,
 			     int p_locks, int f_locks);
 extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
 
-static inline struct inode *get_dentry_parent_inode(struct dentry *dentry)
-{
-	if (dentry && dentry->d_parent)
-		return dentry->d_parent->d_inode;
-
-	return NULL;
-}
-
 /* debugfs.c */
 extern int ceph_fs_debugfs_init(struct ceph_fs_client *client);
 extern void ceph_fs_debugfs_cleanup(struct ceph_fs_client *client);
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index f42d730f1b66..96c6739a0280 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -629,7 +629,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
 	struct inode *inode = dentry->d_inode;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct inode *parent_inode = dentry->d_parent->d_inode;
+	struct inode *parent_inode;
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	int err;
@@ -677,7 +677,9 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
 	req->r_data_len = size;
 
 	dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
+	parent_inode = ceph_get_dentry_parent_inode(dentry);
 	err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+	iput(parent_inode);
 	ceph_mdsc_put_request(req);
 	dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
 
@@ -788,7 +790,7 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct inode *inode = dentry->d_inode;
-	struct inode *parent_inode = dentry->d_parent->d_inode;
+	struct inode *parent_inode;
 	struct ceph_mds_request *req;
 	int err;
 
@@ -802,7 +804,9 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
 	req->r_num_caps = 1;
 	req->r_path2 = kstrdup(name, GFP_NOFS);
 
+	parent_inode = ceph_get_dentry_parent_inode(dentry);
 	err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+	iput(parent_inode);
 	ceph_mdsc_put_request(req);
 	return err;
 }