summary refs log tree commit diff
path: root/fs/ceph/inode.c
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-06-12 23:06:23 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2014-06-12 23:06:23 -0700
commit6d87c225f5d82d29243dc124f1ffcbb0e14ec358 (patch)
tree7d72e2e6a77ec0911e86911d2ddae62c1b4161cf /fs/ceph/inode.c
parent338c09a94b14c449dd53227e9bea44816668c6a5 (diff)
parent22001f619f29ddf66582d834223dcff4c0b74595 (diff)
downloadlinux-6d87c225f5d82d29243dc124f1ffcbb0e14ec358.tar.gz
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
 "This has a mix of bug fixes and cleanups.

  Alex's patch fixes a rare race in RBD.  Ilya's patches fix an ENOENT
  check when a second rbd image is mapped and a couple memory leaks.
  Zheng fixes several issues with fragmented directories and multiple
  MDSs.  Josh fixes a spin/sleep issue, and Josh and Guangliang's
  patches fix setting and unsetting RBD images read-only.

  Naturally there are several other cleanups mixed in for good measure"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
  rbd: only set disk to read-only once
  rbd: move calls that may sleep out of spin lock range
  rbd: add ioctl for rbd
  ceph: use truncate_pagecache() instead of truncate_inode_pages()
  ceph: include time stamp in every MDS request
  rbd: fix ida/idr memory leak
  rbd: use reference counts for image requests
  rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync()
  rbd: make sure we have latest osdmap on 'rbd map'
  libceph: add ceph_monc_wait_osdmap()
  libceph: mon_get_version request infrastructure
  libceph: recognize poolop requests in debugfs
  ceph: refactor readpage_nounlock() to make the logic clearer
  mds: check cap ID when handling cap export message
  ceph: remember subtree root dirfrag's auth MDS
  ceph: introduce ceph_fill_fragtree()
  ceph: handle cap import atomically
  ceph: pre-allocate ceph_cap struct for ceph_add_cap()
  ceph: update inode fields according to issued caps
  rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO
  ...
Diffstat (limited to 'fs/ceph/inode.c')
-rw-r--r--fs/ceph/inode.c247
1 files changed, 154 insertions, 93 deletions
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e4fff9ff1c27..04c89c266cec 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -10,6 +10,7 @@
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
 #include <linux/posix_acl.h>
+#include <linux/random.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -179,9 +180,8 @@ struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, u32 f)
  * specified, copy the frag delegation info to the caller if
  * it is present.
  */
-u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
-		     struct ceph_inode_frag *pfrag,
-		     int *found)
+static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+			      struct ceph_inode_frag *pfrag, int *found)
 {
 	u32 t = ceph_frag_make(0, 0);
 	struct ceph_inode_frag *frag;
@@ -191,7 +191,6 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
 	if (found)
 		*found = 0;
 
-	mutex_lock(&ci->i_fragtree_mutex);
 	while (1) {
 		WARN_ON(!ceph_frag_contains_value(t, v));
 		frag = __ceph_find_frag(ci, t);
@@ -220,10 +219,19 @@ u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
 	}
 	dout("choose_frag(%x) = %x\n", v, t);
 
-	mutex_unlock(&ci->i_fragtree_mutex);
 	return t;
 }
 
+u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+		     struct ceph_inode_frag *pfrag, int *found)
+{
+	u32 ret;
+	mutex_lock(&ci->i_fragtree_mutex);
+	ret = __ceph_choose_frag(ci, v, pfrag, found);
+	mutex_unlock(&ci->i_fragtree_mutex);
+	return ret;
+}
+
 /*
  * Process dirfrag (delegation) info from the mds.  Include leaf
  * fragment in tree ONLY if ndist > 0.  Otherwise, only
@@ -237,11 +245,17 @@ static int ceph_fill_dirfrag(struct inode *inode,
 	u32 id = le32_to_cpu(dirinfo->frag);
 	int mds = le32_to_cpu(dirinfo->auth);
 	int ndist = le32_to_cpu(dirinfo->ndist);
+	int diri_auth = -1;
 	int i;
 	int err = 0;
 
+	spin_lock(&ci->i_ceph_lock);
+	if (ci->i_auth_cap)
+		diri_auth = ci->i_auth_cap->mds;
+	spin_unlock(&ci->i_ceph_lock);
+
 	mutex_lock(&ci->i_fragtree_mutex);
-	if (ndist == 0) {
+	if (ndist == 0 && mds == diri_auth) {
 		/* no delegation info needed. */
 		frag = __ceph_find_frag(ci, id);
 		if (!frag)
@@ -286,6 +300,75 @@ out:
 	return err;
 }
 
+static int ceph_fill_fragtree(struct inode *inode,
+			      struct ceph_frag_tree_head *fragtree,
+			      struct ceph_mds_reply_dirfrag *dirinfo)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_inode_frag *frag;
+	struct rb_node *rb_node;
+	int i;
+	u32 id, nsplits;
+	bool update = false;
+
+	mutex_lock(&ci->i_fragtree_mutex);
+	nsplits = le32_to_cpu(fragtree->nsplits);
+	if (nsplits) {
+		i = prandom_u32() % nsplits;
+		id = le32_to_cpu(fragtree->splits[i].frag);
+		if (!__ceph_find_frag(ci, id))
+			update = true;
+	} else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
+		rb_node = rb_first(&ci->i_fragtree);
+		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+		if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
+			update = true;
+	}
+	if (!update && dirinfo) {
+		id = le32_to_cpu(dirinfo->frag);
+		if (id != __ceph_choose_frag(ci, id, NULL, NULL))
+			update = true;
+	}
+	if (!update)
+		goto out_unlock;
+
+	dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
+	rb_node = rb_first(&ci->i_fragtree);
+	for (i = 0; i < nsplits; i++) {
+		id = le32_to_cpu(fragtree->splits[i].frag);
+		frag = NULL;
+		while (rb_node) {
+			frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+			if (ceph_frag_compare(frag->frag, id) >= 0) {
+				if (frag->frag != id)
+					frag = NULL;
+				else
+					rb_node = rb_next(rb_node);
+				break;
+			}
+			rb_node = rb_next(rb_node);
+			rb_erase(&frag->node, &ci->i_fragtree);
+			kfree(frag);
+			frag = NULL;
+		}
+		if (!frag) {
+			frag = __get_or_create_frag(ci, id);
+			if (IS_ERR(frag))
+				continue;
+		}
+		frag->split_by = le32_to_cpu(fragtree->splits[i].by);
+		dout(" frag %x split by %d\n", frag->frag, frag->split_by);
+	}
+	while (rb_node) {
+		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+		rb_node = rb_next(rb_node);
+		rb_erase(&frag->node, &ci->i_fragtree);
+		kfree(frag);
+	}
+out_unlock:
+	mutex_unlock(&ci->i_fragtree_mutex);
+	return 0;
+}
 
 /*
  * initialize a newly allocated inode.
@@ -341,7 +424,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
 	INIT_LIST_HEAD(&ci->i_cap_snaps);
 	ci->i_head_snapc = NULL;
 	ci->i_snap_caps = 0;
-	ci->i_cap_exporting_issued = 0;
 
 	for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
 		ci->i_nr_by_mode[i] = 0;
@@ -407,7 +489,7 @@ void ceph_destroy_inode(struct inode *inode)
 
 	/*
 	 * we may still have a snap_realm reference if there are stray
-	 * caps in i_cap_exporting_issued or i_snap_caps.
+	 * caps in i_snap_caps.
 	 */
 	if (ci->i_snap_realm) {
 		struct ceph_mds_client *mdsc =
@@ -582,22 +664,26 @@ static int fill_inode(struct inode *inode,
 		      unsigned long ttl_from, int cap_fmode,
 		      struct ceph_cap_reservation *caps_reservation)
 {
+	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct ceph_mds_reply_inode *info = iinfo->in;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int i;
-	int issued = 0, implemented;
+	int issued = 0, implemented, new_issued;
 	struct timespec mtime, atime, ctime;
-	u32 nsplits;
-	struct ceph_inode_frag *frag;
-	struct rb_node *rb_node;
 	struct ceph_buffer *xattr_blob = NULL;
+	struct ceph_cap *new_cap = NULL;
 	int err = 0;
-	int queue_trunc = 0;
+	bool wake = false;
+	bool queue_trunc = false;
+	bool new_version = false;
 
 	dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
 	     inode, ceph_vinop(inode), le64_to_cpu(info->version),
 	     ci->i_version);
 
+	/* prealloc new cap struct */
+	if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
+		new_cap = ceph_get_cap(mdsc, caps_reservation);
+
 	/*
 	 * prealloc xattr data, if it looks like we'll need it.  only
 	 * if len > 4 (meaning there are actually xattrs; the first 4
@@ -623,19 +709,23 @@ static int fill_inode(struct inode *inode,
 	 *   3    2     skip
 	 *   3    3     update
 	 */
-	if (le64_to_cpu(info->version) > 0 &&
-	    (ci->i_version & ~1) >= le64_to_cpu(info->version))
-		goto no_change;
-	
+	if (ci->i_version == 0 ||
+	    ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+	     le64_to_cpu(info->version) > (ci->i_version & ~1)))
+		new_version = true;
+
 	issued = __ceph_caps_issued(ci, &implemented);
 	issued |= implemented | __ceph_caps_dirty(ci);
+	new_issued = ~issued & le32_to_cpu(info->cap.caps);
 
 	/* update inode */
 	ci->i_version = le64_to_cpu(info->version);
 	inode->i_version++;
 	inode->i_rdev = le32_to_cpu(info->rdev);
+	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
-	if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+	if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
+	    (issued & CEPH_CAP_AUTH_EXCL) == 0) {
 		inode->i_mode = le32_to_cpu(info->mode);
 		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
 		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
@@ -644,23 +734,35 @@ static int fill_inode(struct inode *inode,
 		     from_kgid(&init_user_ns, inode->i_gid));
 	}
 
-	if ((issued & CEPH_CAP_LINK_EXCL) == 0)
+	if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
+	    (issued & CEPH_CAP_LINK_EXCL) == 0)
 		set_nlink(inode, le32_to_cpu(info->nlink));
 
-	/* be careful with mtime, atime, size */
-	ceph_decode_timespec(&atime, &info->atime);
-	ceph_decode_timespec(&mtime, &info->mtime);
-	ceph_decode_timespec(&ctime, &info->ctime);
-	queue_trunc = ceph_fill_file_size(inode, issued,
-					  le32_to_cpu(info->truncate_seq),
-					  le64_to_cpu(info->truncate_size),
-					  le64_to_cpu(info->size));
-	ceph_fill_file_time(inode, issued,
-			    le32_to_cpu(info->time_warp_seq),
-			    &ctime, &mtime, &atime);
-
-	ci->i_layout = info->layout;
-	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+	if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
+		/* be careful with mtime, atime, size */
+		ceph_decode_timespec(&atime, &info->atime);
+		ceph_decode_timespec(&mtime, &info->mtime);
+		ceph_decode_timespec(&ctime, &info->ctime);
+		ceph_fill_file_time(inode, issued,
+				le32_to_cpu(info->time_warp_seq),
+				&ctime, &mtime, &atime);
+	}
+
+	if (new_version ||
+	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
+		ci->i_layout = info->layout;
+		queue_trunc = ceph_fill_file_size(inode, issued,
+					le32_to_cpu(info->truncate_seq),
+					le64_to_cpu(info->truncate_size),
+					le64_to_cpu(info->size));
+		/* only update max_size on auth cap */
+		if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+		    ci->i_max_size != le64_to_cpu(info->max_size)) {
+			dout("max_size %lld -> %llu\n", ci->i_max_size,
+					le64_to_cpu(info->max_size));
+			ci->i_max_size = le64_to_cpu(info->max_size);
+		}
+	}
 
 	/* xattrs */
 	/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
@@ -745,58 +847,6 @@ static int fill_inode(struct inode *inode,
 		dout(" marking %p complete (empty)\n", inode);
 		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
 	}
-no_change:
-	/* only update max_size on auth cap */
-	if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
-	    ci->i_max_size != le64_to_cpu(info->max_size)) {
-		dout("max_size %lld -> %llu\n", ci->i_max_size,
-		     le64_to_cpu(info->max_size));
-		ci->i_max_size = le64_to_cpu(info->max_size);
-	}
-
-	spin_unlock(&ci->i_ceph_lock);
-
-	/* queue truncate if we saw i_size decrease */
-	if (queue_trunc)
-		ceph_queue_vmtruncate(inode);
-
-	/* populate frag tree */
-	/* FIXME: move me up, if/when version reflects fragtree changes */
-	nsplits = le32_to_cpu(info->fragtree.nsplits);
-	mutex_lock(&ci->i_fragtree_mutex);
-	rb_node = rb_first(&ci->i_fragtree);
-	for (i = 0; i < nsplits; i++) {
-		u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-		frag = NULL;
-		while (rb_node) {
-			frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-			if (ceph_frag_compare(frag->frag, id) >= 0) {
-				if (frag->frag != id)
-					frag = NULL;
-				else
-					rb_node = rb_next(rb_node);
-				break;
-			}
-			rb_node = rb_next(rb_node);
-			rb_erase(&frag->node, &ci->i_fragtree);
-			kfree(frag);
-			frag = NULL;
-		}
-		if (!frag) {
-			frag = __get_or_create_frag(ci, id);
-			if (IS_ERR(frag))
-				continue;
-		}
-		frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
-		dout(" frag %x split by %d\n", frag->frag, frag->split_by);
-	}
-	while (rb_node) {
-		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-		rb_node = rb_next(rb_node);
-		rb_erase(&frag->node, &ci->i_fragtree);
-		kfree(frag);
-	}
-	mutex_unlock(&ci->i_fragtree_mutex);
 
 	/* were we issued a capability? */
 	if (info->cap.caps) {
@@ -809,30 +859,41 @@ no_change:
 				     le32_to_cpu(info->cap.seq),
 				     le32_to_cpu(info->cap.mseq),
 				     le64_to_cpu(info->cap.realm),
-				     info->cap.flags,
-				     caps_reservation);
+				     info->cap.flags, &new_cap);
+			wake = true;
 		} else {
-			spin_lock(&ci->i_ceph_lock);
 			dout(" %p got snap_caps %s\n", inode,
 			     ceph_cap_string(le32_to_cpu(info->cap.caps)));
 			ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
 			if (cap_fmode >= 0)
 				__ceph_get_fmode(ci, cap_fmode);
-			spin_unlock(&ci->i_ceph_lock);
 		}
 	} else if (cap_fmode >= 0) {
 		pr_warn("mds issued no caps on %llx.%llx\n",
 			   ceph_vinop(inode));
 		__ceph_get_fmode(ci, cap_fmode);
 	}
+	spin_unlock(&ci->i_ceph_lock);
+
+	if (wake)
+		wake_up_all(&ci->i_cap_wq);
+
+	/* queue truncate if we saw i_size decrease */
+	if (queue_trunc)
+		ceph_queue_vmtruncate(inode);
+
+	/* populate frag tree */
+	if (S_ISDIR(inode->i_mode))
+		ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
 
 	/* update delegation info? */
 	if (dirinfo)
 		ceph_fill_dirfrag(inode, dirinfo);
 
 	err = 0;
-
 out:
+	if (new_cap)
+		ceph_put_cap(mdsc, new_cap);
 	if (xattr_blob)
 		ceph_buffer_put(xattr_blob);
 	return err;
@@ -1485,7 +1546,7 @@ static void ceph_invalidate_work(struct work_struct *work)
 	orig_gen = ci->i_rdcache_gen;
 	spin_unlock(&ci->i_ceph_lock);
 
-	truncate_inode_pages(inode->i_mapping, 0);
+	truncate_pagecache(inode, 0);
 
 	spin_lock(&ci->i_ceph_lock);
 	if (orig_gen == ci->i_rdcache_gen &&
@@ -1588,7 +1649,7 @@ retry:
 	     ci->i_truncate_pending, to);
 	spin_unlock(&ci->i_ceph_lock);
 
-	truncate_inode_pages(inode->i_mapping, to);
+	truncate_pagecache(inode, to);
 
 	spin_lock(&ci->i_ceph_lock);
 	if (to == ci->i_truncate_size) {