summary refs log tree commit diff
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2014-10-15 06:46:01 +0200
committerLinus Torvalds <torvalds@linux-foundation.org>2014-10-15 06:46:01 +0200
commit6b0490816671b2f4126a99998c9bf3c8c0472de2 (patch)
tree016543455c2bdbe47b422fed6a3b4ffb991c97d6 /fs/ceph
parentce9d7f7b45930ed16c512aabcfe651d44f1c8619 (diff)
parent0bc62284ee3f2a228c64902ed818b6ba8e04159b (diff)
downloadlinux-6b0490816671b2f4126a99998c9bf3c8c0472de2.tar.gz
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
 "There is the long-awaited discard support for RBD (Guangliang Zhao,
  Josh Durgin), a pile of RBD bug fixes that didn't belong in late -rc's
  (Ilya Dryomov, Li RongQing), a pile of fs/ceph bug fixes and
  performance and debugging improvements (Yan, Zheng, John Spray), and a
  smattering of cleanups (Chao Yu, Fabian Frederick, Joe Perches)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
  ceph: fix divide-by-zero in __validate_layout()
  rbd: rbd workqueues need a resque worker
  libceph: ceph-msgr workqueue needs a resque worker
  ceph: fix bool assignments
  libceph: separate multiple ops with commas in debugfs output
  libceph: sync osd op definitions in rados.h
  libceph: remove redundant declaration
  ceph: additional debugfs output
  ceph: export ceph_session_state_name function
  ceph: include the initial ACL in create/mkdir/mknod MDS requests
  ceph: use pagelist to present MDS request data
  libceph: reference counting pagelist
  ceph: fix llistxattr on symlink
  ceph: send client metadata to MDS
  ceph: remove redundant code for max file size verification
  ceph: remove redundant io_iter_advance()
  ceph: move ceph_find_inode() outside the s_mutex
  ceph: request xattrs if xattr_version is zero
  rbd: set the remaining discard properties to enable support
  rbd: use helpers to handle discard for layered images correctly
  ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/acl.c125
-rw-r--r--fs/ceph/addr.c9
-rw-r--r--fs/ceph/caps.c37
-rw-r--r--fs/ceph/debugfs.c46
-rw-r--r--fs/ceph/dir.c41
-rw-r--r--fs/ceph/file.c33
-rw-r--r--fs/ceph/inode.c16
-rw-r--r--fs/ceph/ioctl.c6
-rw-r--r--fs/ceph/mds_client.c136
-rw-r--r--fs/ceph/mds_client.h6
-rw-r--r--fs/ceph/super.h27
-rw-r--r--fs/ceph/xattr.c81
12 files changed, 386 insertions, 177 deletions
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index cebf2ebefb55..5bd853ba44ff 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -169,36 +169,109 @@ out:
 	return ret;
 }
 
-int ceph_init_acl(struct dentry *dentry, struct inode *inode, struct inode *dir)
+int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+		       struct ceph_acls_info *info)
 {
-	struct posix_acl *default_acl, *acl;
-	umode_t new_mode = inode->i_mode;
-	int error;
-
-	error = posix_acl_create(dir, &new_mode, &default_acl, &acl);
-	if (error)
-		return error;
-
-	if (!default_acl && !acl) {
-		cache_no_acl(inode);
-		if (new_mode != inode->i_mode) {
-			struct iattr newattrs = {
-				.ia_mode = new_mode,
-				.ia_valid = ATTR_MODE,
-			};
-			error = ceph_setattr(dentry, &newattrs);
+	struct posix_acl *acl, *default_acl;
+	size_t val_size1 = 0, val_size2 = 0;
+	struct ceph_pagelist *pagelist = NULL;
+	void *tmp_buf = NULL;
+	int err;
+
+	err = posix_acl_create(dir, mode, &default_acl, &acl);
+	if (err)
+		return err;
+
+	if (acl) {
+		int ret = posix_acl_equiv_mode(acl, mode);
+		if (ret < 0)
+			goto out_err;
+		if (ret == 0) {
+			posix_acl_release(acl);
+			acl = NULL;
 		}
-		return error;
 	}
 
-	if (default_acl) {
-		error = ceph_set_acl(inode, default_acl, ACL_TYPE_DEFAULT);
-		posix_acl_release(default_acl);
-	}
+	if (!default_acl && !acl)
+		return 0;
+
+	if (acl)
+		val_size1 = posix_acl_xattr_size(acl->a_count);
+	if (default_acl)
+		val_size2 = posix_acl_xattr_size(default_acl->a_count);
+
+	err = -ENOMEM;
+	tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS);
+	if (!tmp_buf)
+		goto out_err;
+	pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS);
+	if (!pagelist)
+		goto out_err;
+	ceph_pagelist_init(pagelist);
+
+	err = ceph_pagelist_reserve(pagelist, PAGE_SIZE);
+	if (err)
+		goto out_err;
+
+	ceph_pagelist_encode_32(pagelist, acl && default_acl ? 2 : 1);
+
 	if (acl) {
-		if (!error)
-			error = ceph_set_acl(inode, acl, ACL_TYPE_ACCESS);
-		posix_acl_release(acl);
+		size_t len = strlen(POSIX_ACL_XATTR_ACCESS);
+		err = ceph_pagelist_reserve(pagelist, len + val_size1 + 8);
+		if (err)
+			goto out_err;
+		ceph_pagelist_encode_string(pagelist, POSIX_ACL_XATTR_ACCESS,
+					    len);
+		err = posix_acl_to_xattr(&init_user_ns, acl,
+					 tmp_buf, val_size1);
+		if (err < 0)
+			goto out_err;
+		ceph_pagelist_encode_32(pagelist, val_size1);
+		ceph_pagelist_append(pagelist, tmp_buf, val_size1);
 	}
-	return error;
+	if (default_acl) {
+		size_t len = strlen(POSIX_ACL_XATTR_DEFAULT);
+		err = ceph_pagelist_reserve(pagelist, len + val_size2 + 8);
+		if (err)
+			goto out_err;
+		err = ceph_pagelist_encode_string(pagelist,
+						  POSIX_ACL_XATTR_DEFAULT, len);
+		err = posix_acl_to_xattr(&init_user_ns, default_acl,
+					 tmp_buf, val_size2);
+		if (err < 0)
+			goto out_err;
+		ceph_pagelist_encode_32(pagelist, val_size2);
+		ceph_pagelist_append(pagelist, tmp_buf, val_size2);
+	}
+
+	kfree(tmp_buf);
+
+	info->acl = acl;
+	info->default_acl = default_acl;
+	info->pagelist = pagelist;
+	return 0;
+
+out_err:
+	posix_acl_release(acl);
+	posix_acl_release(default_acl);
+	kfree(tmp_buf);
+	if (pagelist)
+		ceph_pagelist_release(pagelist);
+	return err;
+}
+
+void ceph_init_inode_acls(struct inode* inode, struct ceph_acls_info *info)
+{
+	if (!inode)
+		return;
+	ceph_set_cached_acl(inode, ACL_TYPE_ACCESS, info->acl);
+	ceph_set_cached_acl(inode, ACL_TYPE_DEFAULT, info->default_acl);
+}
+
+void ceph_release_acls_info(struct ceph_acls_info *info)
+{
+	posix_acl_release(info->acl);
+	posix_acl_release(info->default_acl);
+	if (info->pagelist)
+		ceph_pagelist_release(info->pagelist);
 }
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 90b3954d48ed..18c06bbaf136 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1076,12 +1076,6 @@ retry_locked:
 	/* past end of file? */
 	i_size = inode->i_size;   /* caller holds i_mutex */
 
-	if (i_size + len > inode->i_sb->s_maxbytes) {
-		/* file is too big */
-		r = -EINVAL;
-		goto fail;
-	}
-
 	if (page_off >= i_size ||
 	    (pos_in_page == 0 && (pos+len) >= i_size &&
 	     end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
@@ -1099,9 +1093,6 @@ retry_locked:
 	if (r < 0)
 		goto fail_nosnap;
 	goto retry_locked;
-
-fail:
-	up_read(&mdsc->snap_rwsem);
 fail_nosnap:
 	unlock_page(page);
 	return r;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6d1cd45dca89..659f2ea9e6f7 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2397,12 +2397,12 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 	u64 max_size = le64_to_cpu(grant->max_size);
 	struct timespec mtime, atime, ctime;
 	int check_caps = 0;
-	bool wake = 0;
-	bool writeback = 0;
-	bool queue_trunc = 0;
-	bool queue_invalidate = 0;
-	bool queue_revalidate = 0;
-	bool deleted_inode = 0;
+	bool wake = false;
+	bool writeback = false;
+	bool queue_trunc = false;
+	bool queue_invalidate = false;
+	bool queue_revalidate = false;
+	bool deleted_inode = false;
 
 	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
 	     inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2437,7 +2437,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 			/* there were locked pages.. invalidate later
 			   in a separate thread. */
 			if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
-				queue_invalidate = 1;
+				queue_invalidate = true;
 				ci->i_rdcache_revoking = ci->i_rdcache_gen;
 			}
 		}
@@ -2466,7 +2466,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 		set_nlink(inode, le32_to_cpu(grant->nlink));
 		if (inode->i_nlink == 0 &&
 		    (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
-			deleted_inode = 1;
+			deleted_inode = true;
 	}
 
 	if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
@@ -2487,7 +2487,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 	/* Do we need to revalidate our fscache cookie. Don't bother on the
 	 * first cache cap as we already validate at cookie creation time. */
 	if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
-		queue_revalidate = 1;
+		queue_revalidate = true;
 
 	if (newcaps & CEPH_CAP_ANY_RD) {
 		/* ctime/mtime/atime? */
@@ -2516,7 +2516,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 				ci->i_wanted_max_size = 0;  /* reset */
 				ci->i_requested_max_size = 0;
 			}
-			wake = 1;
+			wake = true;
 		}
 	}
 
@@ -2546,7 +2546,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 		     ceph_cap_string(newcaps),
 		     ceph_cap_string(revoking));
 		if (revoking & used & CEPH_CAP_FILE_BUFFER)
-			writeback = 1;  /* initiate writeback; will delay ack */
+			writeback = true;  /* initiate writeback; will delay ack */
 		else if (revoking == CEPH_CAP_FILE_CACHE &&
 			 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
 			 queue_invalidate)
@@ -2572,7 +2572,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 		cap->implemented |= newcaps; /* add bits only, to
 					      * avoid stepping on a
 					      * pending revocation */
-		wake = 1;
+		wake = true;
 	}
 	BUG_ON(cap->issued & ~cap->implemented);
 
@@ -2586,7 +2586,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 		kick_flushing_inode_caps(mdsc, session, inode);
 		up_read(&mdsc->snap_rwsem);
 		if (newcaps & ~issued)
-			wake = 1;
+			wake = true;
 	}
 
 	if (queue_trunc) {
@@ -3045,6 +3045,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		}
 	}
 
+	/* lookup ino */
+	inode = ceph_find_inode(sb, vino);
+	ci = ceph_inode(inode);
+	dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
+	     vino.snap, inode);
+
 	mutex_lock(&session->s_mutex);
 	session->s_seq++;
 	dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
@@ -3053,11 +3059,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	if (op == CEPH_CAP_OP_IMPORT)
 		ceph_add_cap_releases(mdsc, session);
 
-	/* lookup ino */
-	inode = ceph_find_inode(sb, vino);
-	ci = ceph_inode(inode);
-	dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
-	     vino.snap, inode);
 	if (!inode) {
 		dout(" i don't have ino %llx\n", vino.ino);
 
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 5a743ac141ab..5d5a4c8c8496 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -158,10 +158,47 @@ static int dentry_lru_show(struct seq_file *s, void *ptr)
 	return 0;
 }
 
+static int mds_sessions_show(struct seq_file *s, void *ptr)
+{
+	struct ceph_fs_client *fsc = s->private;
+	struct ceph_mds_client *mdsc = fsc->mdsc;
+	struct ceph_auth_client *ac = fsc->client->monc.auth;
+	struct ceph_options *opt = fsc->client->options;
+	int mds = -1;
+
+	mutex_lock(&mdsc->mutex);
+
+	/* The 'num' portion of an 'entity name' */
+	seq_printf(s, "global_id %llu\n", ac->global_id);
+
+	/* The -o name mount argument */
+	seq_printf(s, "name \"%s\"\n", opt->name ? opt->name : "");
+
+	/* The list of MDS session rank+state */
+	for (mds = 0; mds < mdsc->max_sessions; mds++) {
+		struct ceph_mds_session *session =
+			__ceph_lookup_mds_session(mdsc, mds);
+		if (!session) {
+			continue;
+		}
+		mutex_unlock(&mdsc->mutex);
+		seq_printf(s, "mds.%d %s\n",
+				session->s_mds,
+				ceph_session_state_name(session->s_state));
+
+		ceph_put_mds_session(session);
+		mutex_lock(&mdsc->mutex);
+	}
+	mutex_unlock(&mdsc->mutex);
+
+	return 0;
+}
+
 CEPH_DEFINE_SHOW_FUNC(mdsmap_show)
 CEPH_DEFINE_SHOW_FUNC(mdsc_show)
 CEPH_DEFINE_SHOW_FUNC(caps_show)
 CEPH_DEFINE_SHOW_FUNC(dentry_lru_show)
+CEPH_DEFINE_SHOW_FUNC(mds_sessions_show)
 
 
 /*
@@ -193,6 +230,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
 	debugfs_remove(fsc->debugfs_bdi);
 	debugfs_remove(fsc->debugfs_congestion_kb);
 	debugfs_remove(fsc->debugfs_mdsmap);
+	debugfs_remove(fsc->debugfs_mds_sessions);
 	debugfs_remove(fsc->debugfs_caps);
 	debugfs_remove(fsc->debugfs_mdsc);
 	debugfs_remove(fsc->debugfs_dentry_lru);
@@ -231,6 +269,14 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
 	if (!fsc->debugfs_mdsmap)
 		goto out;
 
+	fsc->debugfs_mds_sessions = debugfs_create_file("mds_sessions",
+					0600,
+					fsc->client->debugfs_dir,
+					fsc,
+					&mds_sessions_show_fops);
+	if (!fsc->debugfs_mds_sessions)
+		goto out;
+
 	fsc->debugfs_mdsc = debugfs_create_file("mdsc",
 						0600,
 						fsc->client->debugfs_dir,
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index b6c59eaa4f64..e6d63f8f98c0 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -682,17 +682,22 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
+	struct ceph_acls_info acls = {};
 	int err;
 
 	if (ceph_snap(dir) != CEPH_NOSNAP)
 		return -EROFS;
 
+	err = ceph_pre_init_acls(dir, &mode, &acls);
+	if (err < 0)
+		return err;
+
 	dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
 	     dir, dentry, mode, rdev);
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
 	if (IS_ERR(req)) {
-		d_drop(dentry);
-		return PTR_ERR(req);
+		err = PTR_ERR(req);
+		goto out;
 	}
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
@@ -701,15 +706,20 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
 	req->r_args.mknod.rdev = cpu_to_le32(rdev);
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+	if (acls.pagelist) {
+		req->r_pagelist = acls.pagelist;
+		acls.pagelist = NULL;
+	}
 	err = ceph_mdsc_do_request(mdsc, dir, req);
 	if (!err && !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
 	ceph_mdsc_put_request(req);
-
+out:
 	if (!err)
-		ceph_init_acl(dentry, dentry->d_inode, dir);
+		ceph_init_inode_acls(dentry->d_inode, &acls);
 	else
 		d_drop(dentry);
+	ceph_release_acls_info(&acls);
 	return err;
 }
 
@@ -733,8 +743,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
 	dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
 	if (IS_ERR(req)) {
-		d_drop(dentry);
-		return PTR_ERR(req);
+		err = PTR_ERR(req);
+		goto out;
 	}
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
@@ -746,9 +756,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
 	if (!err && !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
 	ceph_mdsc_put_request(req);
-	if (!err)
-		ceph_init_acl(dentry, dentry->d_inode, dir);
-	else
+out:
+	if (err)
 		d_drop(dentry);
 	return err;
 }
@@ -758,6 +767,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
 	struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
+	struct ceph_acls_info acls = {};
 	int err = -EROFS;
 	int op;
 
@@ -772,6 +782,12 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
 	} else {
 		goto out;
 	}
+
+	mode |= S_IFDIR;
+	err = ceph_pre_init_acls(dir, &mode, &acls);
+	if (err < 0)
+		goto out;
+
 	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
@@ -784,15 +800,20 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
 	req->r_args.mkdir.mode = cpu_to_le32(mode);
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+	if (acls.pagelist) {
+		req->r_pagelist = acls.pagelist;
+		acls.pagelist = NULL;
+	}
 	err = ceph_mdsc_do_request(mdsc, dir, req);
 	if (!err && !req->r_reply_info.head->is_dentry)
 		err = ceph_handle_notrace_create(dir, dentry);
 	ceph_mdsc_put_request(req);
 out:
 	if (!err)
-		ceph_init_acl(dentry, dentry->d_inode, dir);
+		ceph_init_inode_acls(dentry->d_inode, &acls);
 	else
 		d_drop(dentry);
+	ceph_release_acls_info(&acls);
 	return err;
 }
 
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 2eb02f80a0ab..d7e0da8366e6 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -235,6 +235,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
 	struct dentry *dn;
+	struct ceph_acls_info acls = {};
 	int err;
 
 	dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n",
@@ -248,22 +249,34 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	if (err < 0)
 		return err;
 
+	if (flags & O_CREAT) {
+		err = ceph_pre_init_acls(dir, &mode, &acls);
+		if (err < 0)
+			return err;
+	}
+
 	/* do the open */
 	req = prepare_open_request(dir->i_sb, flags, mode);
-	if (IS_ERR(req))
-		return PTR_ERR(req);
+	if (IS_ERR(req)) {
+		err = PTR_ERR(req);
+		goto out_acl;
+	}
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
 	if (flags & O_CREAT) {
 		req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 		req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+		if (acls.pagelist) {
+			req->r_pagelist = acls.pagelist;
+			acls.pagelist = NULL;
+		}
 	}
 	req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
 	err = ceph_mdsc_do_request(mdsc,
 				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
 				   req);
 	if (err)
-		goto out_err;
+		goto out_req;
 
 	err = ceph_handle_snapdir(req, dentry, err);
 	if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
@@ -278,7 +291,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 		dn = NULL;
 	}
 	if (err)
-		goto out_err;
+		goto out_req;
 	if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) {
 		/* make vfs retry on splice, ENOENT, or symlink */
 		dout("atomic_open finish_no_open on dn %p\n", dn);
@@ -286,15 +299,17 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	} else {
 		dout("atomic_open finish_open on dn %p\n", dn);
 		if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) {
-			ceph_init_acl(dentry, dentry->d_inode, dir);
+			ceph_init_inode_acls(dentry->d_inode, &acls);
 			*opened |= FILE_CREATED;
 		}
 		err = finish_open(file, dentry, ceph_open, opened);
 	}
-out_err:
+out_req:
 	if (!req->r_err && req->r_target_inode)
 		ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode);
 	ceph_mdsc_put_request(req);
+out_acl:
+	ceph_release_acls_info(&acls);
 	dout("atomic_open result=%d\n", err);
 	return err;
 }
@@ -826,8 +841,7 @@ again:
 	ceph_put_cap_refs(ci, got);
 
 	if (checkeof && ret >= 0) {
-		int statret = ceph_do_getattr(inode,
-					      CEPH_STAT_CAP_SIZE);
+		int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
 
 		/* hit EOF or hole? */
 		if (statret == 0 && iocb->ki_pos < inode->i_size &&
@@ -836,7 +850,6 @@ again:
 			     ", reading more\n", iocb->ki_pos,
 			     inode->i_size);
 
-			iov_iter_advance(to, ret);
 			read += ret;
 			len -= ret;
 			checkeof = 0;
@@ -995,7 +1008,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
 	mutex_lock(&inode->i_mutex);
 
 	if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
-		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
+		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
 		if (ret < 0) {
 			offset = ret;
 			goto out;
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 04c89c266cec..7b6139004401 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -766,7 +766,7 @@ static int fill_inode(struct inode *inode,
 
 	/* xattrs */
 	/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
-	if ((issued & CEPH_CAP_XATTR_EXCL) == 0 &&
+	if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
 	    le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
 		if (ci->i_xattrs.blob)
 			ceph_buffer_put(ci->i_xattrs.blob);
@@ -1813,10 +1813,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	if (ia_valid & ATTR_SIZE) {
 		dout("setattr %p size %lld -> %lld\n", inode,
 		     inode->i_size, attr->ia_size);
-		if (attr->ia_size > inode->i_sb->s_maxbytes) {
-			err = -EINVAL;
-			goto out;
-		}
 		if ((issued & CEPH_CAP_FILE_EXCL) &&
 		    attr->ia_size > inode->i_size) {
 			inode->i_size = attr->ia_size;
@@ -1896,8 +1892,6 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	if (mask & CEPH_SETATTR_SIZE)
 		__ceph_do_pending_vmtruncate(inode);
 	return err;
-out:
-	spin_unlock(&ci->i_ceph_lock);
 out_put:
 	ceph_mdsc_put_request(req);
 	return err;
@@ -1907,7 +1901,7 @@ out_put:
  * Verify that we have a lease on the given mask.  If not,
  * do a getattr against an mds.
  */
-int ceph_do_getattr(struct inode *inode, int mask)
+int ceph_do_getattr(struct inode *inode, int mask, bool force)
 {
 	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1920,7 +1914,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
 	}
 
 	dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
-	if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
+	if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
 		return 0;
 
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
@@ -1948,7 +1942,7 @@ int ceph_permission(struct inode *inode, int mask)
 	if (mask & MAY_NOT_BLOCK)
 		return -ECHILD;
 
-	err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED);
+	err = ceph_do_getattr(inode, CEPH_CAP_AUTH_SHARED, false);
 
 	if (!err)
 		err = generic_permission(inode, mask);
@@ -1966,7 +1960,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int err;
 
-	err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);
+	err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL, false);
 	if (!err) {
 		generic_fillattr(inode, stat);
 		stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index a822a6e58290..f851d8d70158 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -19,7 +19,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg)
 	struct ceph_ioctl_layout l;
 	int err;
 
-	err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT);
+	err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
 	if (!err) {
 		l.stripe_unit = ceph_file_layout_su(ci->i_layout);
 		l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
@@ -41,7 +41,7 @@ static long __validate_layout(struct ceph_mds_client *mdsc,
 	/* validate striping parameters */
 	if ((l->object_size & ~PAGE_MASK) ||
 	    (l->stripe_unit & ~PAGE_MASK) ||
-	    (l->stripe_unit != 0 &&
+	    ((unsigned)l->stripe_unit != 0 &&
 	     ((unsigned)l->object_size % (unsigned)l->stripe_unit)))
 		return -EINVAL;
 
@@ -74,7 +74,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
 		return -EFAULT;
 
 	/* validate changed params against current layout */
-	err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT);
+	err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
 	if (err)
 		return err;
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index bad07c09f91e..a92d3f5c6c12 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -7,6 +7,7 @@
 #include <linux/sched.h>
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
+#include <linux/utsname.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -334,7 +335,7 @@ static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
 /*
  * sessions
  */
-static const char *session_state_name(int s)
+const char *ceph_session_state_name(int s)
 {
 	switch (s) {
 	case CEPH_MDS_SESSION_NEW: return "new";
@@ -542,6 +543,8 @@ void ceph_mdsc_release_request(struct kref *kref)
 	}
 	kfree(req->r_path1);
 	kfree(req->r_path2);
+	if (req->r_pagelist)
+		ceph_pagelist_release(req->r_pagelist);
 	put_request_session(req);
 	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 	kfree(req);
@@ -812,6 +815,74 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
 	h = msg->front.iov_base;
 	h->op = cpu_to_le32(op);
 	h->seq = cpu_to_le64(seq);
+
+	return msg;
+}
+
+/*
+ * session message, specialization for CEPH_SESSION_REQUEST_OPEN
+ * to include additional client metadata fields.
+ */
+static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
+{
+	struct ceph_msg *msg;
+	struct ceph_mds_session_head *h;
+	int i = -1;
+	int metadata_bytes = 0;
+	int metadata_key_count = 0;
+	struct ceph_options *opt = mdsc->fsc->client->options;
+	void *p;
+
+	const char* metadata[3][2] = {
+		{"hostname", utsname()->nodename},
+		{"entity_id", opt->name ? opt->name : ""},
+		{NULL, NULL}
+	};
+
+	/* Calculate serialized length of metadata */
+	metadata_bytes = 4;  /* map length */
+	for (i = 0; metadata[i][0] != NULL; ++i) {
+		metadata_bytes += 8 + strlen(metadata[i][0]) +
+			strlen(metadata[i][1]);
+		metadata_key_count++;
+	}
+
+	/* Allocate the message */
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes,
+			   GFP_NOFS, false);
+	if (!msg) {
+		pr_err("create_session_msg ENOMEM creating msg\n");
+		return NULL;
+	}
+	h = msg->front.iov_base;
+	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
+	h->seq = cpu_to_le64(seq);
+
+	/*
+	 * Serialize client metadata into waiting buffer space, using
+	 * the format that userspace expects for map<string, string>
+	 */
+	msg->hdr.version = 2;  /* ClientSession messages with metadata are v2 */
+
+	/* The write pointer, following the session_head structure */
+	p = msg->front.iov_base + sizeof(*h);
+
+	/* Number of entries in the map */
+	ceph_encode_32(&p, metadata_key_count);
+
+	/* Two length-prefixed strings for each entry in the map */
+	for (i = 0; metadata[i][0] != NULL; ++i) {
+		size_t const key_len = strlen(metadata[i][0]);
+		size_t const val_len = strlen(metadata[i][1]);
+
+		ceph_encode_32(&p, key_len);
+		memcpy(p, metadata[i][0], key_len);
+		p += key_len;
+		ceph_encode_32(&p, val_len);
+		memcpy(p, metadata[i][1], val_len);
+		p += val_len;
+	}
+
 	return msg;
 }
 
@@ -835,7 +906,7 @@ static int __open_session(struct ceph_mds_client *mdsc,
 	session->s_renew_requested = jiffies;
 
 	/* send connect message */
-	msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
+	msg = create_session_open_msg(mdsc, session->s_seq);
 	if (!msg)
 		return -ENOMEM;
 	ceph_con_send(&session->s_con, msg);
@@ -1164,7 +1235,7 @@ static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
 	struct ceph_msg *msg;
 
 	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
-	     session->s_mds, session_state_name(session->s_state), seq);
+	     session->s_mds, ceph_session_state_name(session->s_state), seq);
 	msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
 	if (!msg)
 		return -ENOMEM;
@@ -1216,7 +1287,7 @@ static int request_close_session(struct ceph_mds_client *mdsc,
 	struct ceph_msg *msg;
 
 	dout("request_close_session mds%d state %s seq %lld\n",
-	     session->s_mds, session_state_name(session->s_state),
+	     session->s_mds, ceph_session_state_name(session->s_state),
 	     session->s_seq);
 	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
 	if (!msg)
@@ -1847,13 +1918,15 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 
-	if (req->r_data_len) {
-		/* outbound data set only by ceph_sync_setxattr() */
-		BUG_ON(!req->r_pages);
-		ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0);
+	if (req->r_pagelist) {
+		struct ceph_pagelist *pagelist = req->r_pagelist;
+		atomic_inc(&pagelist->refcnt);
+		ceph_msg_data_add_pagelist(msg, pagelist);
+		msg->hdr.data_len = cpu_to_le32(pagelist->length);
+	} else {
+		msg->hdr.data_len = 0;
 	}
 
-	msg->hdr.data_len = cpu_to_le32(req->r_data_len);
 	msg->hdr.data_off = cpu_to_le16(0);
 
 out_free2:
@@ -2007,7 +2080,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
 	req->r_session = get_session(session);
 
 	dout("do_request mds%d session %p state %s\n", mds, session,
-	     session_state_name(session->s_state));
+	     ceph_session_state_name(session->s_state));
 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
 	    session->s_state != CEPH_MDS_SESSION_HUNG) {
 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
@@ -2078,6 +2151,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
 		if (req->r_session &&
 		    req->r_session->s_mds == mds) {
 			dout(" kicking tid %llu\n", req->r_tid);
+			list_del_init(&req->r_wait);
 			__do_request(mdsc, req);
 		}
 	}
@@ -2444,7 +2518,7 @@ static void handle_session(struct ceph_mds_session *session,
 
 	dout("handle_session mds%d %s %p state %s seq %llu\n",
 	     mds, ceph_session_op_name(op), session,
-	     session_state_name(session->s_state), seq);
+	     ceph_session_state_name(session->s_state), seq);
 
 	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
 		session->s_state = CEPH_MDS_SESSION_OPEN;
@@ -2471,9 +2545,8 @@ static void handle_session(struct ceph_mds_session *session,
 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
 			pr_info("mds%d reconnect denied\n", session->s_mds);
 		remove_session_caps(session);
-		wake = 1; /* for good measure */
+		wake = 2; /* for good measure */
 		wake_up_all(&mdsc->session_close_wq);
-		kick_requests(mdsc, mds);
 		break;
 
 	case CEPH_SESSION_STALE:
@@ -2503,6 +2576,8 @@ static void handle_session(struct ceph_mds_session *session,
 	if (wake) {
 		mutex_lock(&mdsc->mutex);
 		__wake_requests(mdsc, &session->s_waiting);
+		if (wake == 2)
+			kick_requests(mdsc, mds);
 		mutex_unlock(&mdsc->mutex);
 	}
 	return;
@@ -2695,18 +2770,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
 	session->s_seq = 0;
 
-	ceph_con_close(&session->s_con);
-	ceph_con_open(&session->s_con,
-		      CEPH_ENTITY_TYPE_MDS, mds,
-		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
-
-	/* replay unsafe requests */
-	replay_unsafe_requests(mdsc, session);
-
-	down_read(&mdsc->snap_rwsem);
-
 	dout("session %p state %s\n", session,
-	     session_state_name(session->s_state));
+	     ceph_session_state_name(session->s_state));
 
 	spin_lock(&session->s_gen_ttl_lock);
 	session->s_cap_gen++;
@@ -2723,6 +2788,19 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
 	discard_cap_releases(mdsc, session);
 	spin_unlock(&session->s_cap_lock);
 
+	/* trim unused caps to reduce MDS's cache rejoin time */
+	shrink_dcache_parent(mdsc->fsc->sb->s_root);
+
+	ceph_con_close(&session->s_con);
+	ceph_con_open(&session->s_con,
+		      CEPH_ENTITY_TYPE_MDS, mds,
+		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
+
+	/* replay unsafe requests */
+	replay_unsafe_requests(mdsc, session);
+
+	down_read(&mdsc->snap_rwsem);
+
 	/* traverse this session's caps */
 	s_nr_caps = session->s_nr_caps;
 	err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
@@ -2791,7 +2869,6 @@ fail:
 	mutex_unlock(&session->s_mutex);
 fail_nomsg:
 	ceph_pagelist_release(pagelist);
-	kfree(pagelist);
 fail_nopagelist:
 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
 	return;
@@ -2827,7 +2904,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
 		     ceph_mds_state_name(newstate),
 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
-		     session_state_name(s->s_state));
+		     ceph_session_state_name(s->s_state));
 
 		if (i >= newmap->m_max_mds ||
 		    memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2939,14 +3016,15 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 	if (dname.len != get_unaligned_le32(h+1))
 		goto bad;
 
-	mutex_lock(&session->s_mutex);
-	session->s_seq++;
-
 	/* lookup inode */
 	inode = ceph_find_inode(sb, vino);
 	dout("handle_lease %s, ino %llx %p %.*s\n",
 	     ceph_lease_op_name(h->action), vino.ino, inode,
 	     dname.len, dname.name);
+
+	mutex_lock(&session->s_mutex);
+	session->s_seq++;
+
 	if (inode == NULL) {
 		dout("handle_lease no inode %llx\n", vino.ino);
 		goto release;
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e00737cf523c..3288359353e9 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -202,9 +202,7 @@ struct ceph_mds_request {
 	bool r_direct_is_hash;  /* true if r_direct_hash is valid */
 
 	/* data payload is used for xattr ops */
-	struct page **r_pages;
-	int r_num_pages;
-	int r_data_len;
+	struct ceph_pagelist *r_pagelist;
 
 	/* what caps shall we drop? */
 	int r_inode_drop, r_inode_unless;
@@ -332,6 +330,8 @@ ceph_get_mds_session(struct ceph_mds_session *s)
 	return s;
 }
 
+extern const char *ceph_session_state_name(int s);
+
 extern void ceph_put_mds_session(struct ceph_mds_session *s);
 
 extern int ceph_send_msg_mds(struct ceph_mds_client *mdsc,
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 12b20744e386..b82f507979b8 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -95,6 +95,7 @@ struct ceph_fs_client {
 	struct dentry *debugfs_congestion_kb;
 	struct dentry *debugfs_bdi;
 	struct dentry *debugfs_mdsc, *debugfs_mdsmap;
+	struct dentry *debugfs_mds_sessions;
 #endif
 
 #ifdef CONFIG_CEPH_FSCACHE
@@ -714,7 +715,7 @@ extern void ceph_queue_vmtruncate(struct inode *inode);
 extern void ceph_queue_invalidate(struct inode *inode);
 extern void ceph_queue_writeback(struct inode *inode);
 
-extern int ceph_do_getattr(struct inode *inode, int mask);
+extern int ceph_do_getattr(struct inode *inode, int mask, bool force);
 extern int ceph_permission(struct inode *inode, int mask);
 extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
 extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
@@ -733,15 +734,23 @@ extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
 extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
 extern void __init ceph_xattr_init(void);
 extern void ceph_xattr_exit(void);
+extern const struct xattr_handler *ceph_xattr_handlers[];
 
 /* acl.c */
-extern const struct xattr_handler *ceph_xattr_handlers[];
+struct ceph_acls_info {
+	void *default_acl;
+	void *acl;
+	struct ceph_pagelist *pagelist;
+};
 
 #ifdef CONFIG_CEPH_FS_POSIX_ACL
 
 struct posix_acl *ceph_get_acl(struct inode *, int);
 int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type);
-int ceph_init_acl(struct dentry *, struct inode *, struct inode *);
+int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+		       struct ceph_acls_info *info);
+void ceph_init_inode_acls(struct inode *inode, struct ceph_acls_info *info);
+void ceph_release_acls_info(struct ceph_acls_info *info);
 
 static inline void ceph_forget_all_cached_acls(struct inode *inode)
 {
@@ -753,12 +762,18 @@ static inline void ceph_forget_all_cached_acls(struct inode *inode)
 #define ceph_get_acl NULL
 #define ceph_set_acl NULL
 
-static inline int ceph_init_acl(struct dentry *dentry, struct inode *inode,
-				struct inode *dir)
+static inline int ceph_pre_init_acls(struct inode *dir, umode_t *mode,
+				     struct ceph_acls_info *info)
 {
 	return 0;
 }
-
+static inline void ceph_init_inode_acls(struct inode *inode,
+					struct ceph_acls_info *info)
+{
+}
+static inline void ceph_release_acls_info(struct ceph_acls_info *info)
+{
+}
 static inline int ceph_acl_chmod(struct dentry *dentry, struct inode *inode)
 {
 	return 0;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 12f58d22e017..678b0d2bbbc4 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -1,4 +1,5 @@
 #include <linux/ceph/ceph_debug.h>
+#include <linux/ceph/pagelist.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -284,8 +285,7 @@ static size_t ceph_vxattrs_name_size(struct ceph_vxattr *vxattrs)
 		return ceph_dir_vxattrs_name_size;
 	if (vxattrs == ceph_file_vxattrs)
 		return ceph_file_vxattrs_name_size;
-	BUG();
-
+	BUG_ON(vxattrs);
 	return 0;
 }
 
@@ -736,24 +736,20 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 	dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
 	     ci->i_xattrs.version, ci->i_xattrs.index_version);
 
-	if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
-	    (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
-		goto get_xattr;
-	} else {
+	if (ci->i_xattrs.version == 0 ||
+	    !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
 		spin_unlock(&ci->i_ceph_lock);
 		/* get xattrs from mds (if we don't already have them) */
-		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
+		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
 		if (err)
 			return err;
+		spin_lock(&ci->i_ceph_lock);
 	}
 
-	spin_lock(&ci->i_ceph_lock);
-
 	err = __build_xattrs(inode);
 	if (err < 0)
 		goto out;
 
-get_xattr:
 	err = -ENODATA;  /* == ENOATTR */
 	xattr = __get_xattr(ci, name);
 	if (!xattr)
@@ -798,23 +794,18 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
 	dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
 	     ci->i_xattrs.version, ci->i_xattrs.index_version);
 
-	if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
-	    (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
-		goto list_xattr;
-	} else {
+	if (ci->i_xattrs.version == 0 ||
+	    !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
 		spin_unlock(&ci->i_ceph_lock);
-		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
+		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
 		if (err)
 			return err;
+		spin_lock(&ci->i_ceph_lock);
 	}
 
-	spin_lock(&ci->i_ceph_lock);
-
 	err = __build_xattrs(inode);
 	if (err < 0)
 		goto out;
-
-list_xattr:
 	/*
 	 * Start with virtual dir xattr names (if any) (including
 	 * terminating '\0' characters for each).
@@ -860,35 +851,25 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_request *req;
 	struct ceph_mds_client *mdsc = fsc->mdsc;
+	struct ceph_pagelist *pagelist = NULL;
 	int err;
-	int i, nr_pages;
-	struct page **pages = NULL;
-	void *kaddr;
-
-	/* copy value into some pages */
-	nr_pages = calc_pages_for(0, size);
-	if (nr_pages) {
-		pages = kmalloc(sizeof(pages[0])*nr_pages, GFP_NOFS);
-		if (!pages)
+
+	if (value) {
+		/* copy value into pagelist */
+		pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+		if (!pagelist)
 			return -ENOMEM;
-		err = -ENOMEM;
-		for (i = 0; i < nr_pages; i++) {
-			pages[i] = __page_cache_alloc(GFP_NOFS);
-			if (!pages[i]) {
-				nr_pages = i;
-				goto out;
-			}
-			kaddr = kmap(pages[i]);
-			memcpy(kaddr, value + i*PAGE_CACHE_SIZE,
-			       min(PAGE_CACHE_SIZE, size-i*PAGE_CACHE_SIZE));
-		}
+
+		ceph_pagelist_init(pagelist);
+		err = ceph_pagelist_append(pagelist, value, size);
+		if (err)
+			goto out;
+	} else {
+		flags |= CEPH_XATTR_REMOVE;
 	}
 
 	dout("setxattr value=%.*s\n", (int)size, value);
 
-	if (!value)
-		flags |= CEPH_XATTR_REMOVE;
-
 	/* do request */
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
 				       USE_AUTH_MDS);
@@ -903,9 +884,8 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
 	req->r_args.setxattr.flags = cpu_to_le32(flags);
 	req->r_path2 = kstrdup(name, GFP_NOFS);
 
-	req->r_pages = pages;
-	req->r_num_pages = nr_pages;
-	req->r_data_len = size;
+	req->r_pagelist = pagelist;
+	pagelist = NULL;
 
 	dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -913,11 +893,8 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
 	dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
 
 out:
-	if (pages) {
-		for (i = 0; i < nr_pages; i++)
-			__free_page(pages[i]);
-		kfree(pages);
-	}
+	if (pagelist)
+		ceph_pagelist_release(pagelist);
 	return err;
 }
 
@@ -968,7 +945,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
 retry:
 	issued = __ceph_caps_issued(ci, NULL);
 	dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
-	if (!(issued & CEPH_CAP_XATTR_EXCL))
+	if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
 		goto do_sync;
 	__build_xattrs(inode);
 
@@ -1077,7 +1054,7 @@ retry:
 	issued = __ceph_caps_issued(ci, NULL);
 	dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
 
-	if (!(issued & CEPH_CAP_XATTR_EXCL))
+	if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
 		goto do_sync;
 	__build_xattrs(inode);