summary refs log tree commit diff
path: root/fs/ceph
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2018-06-15 07:24:58 +0900
committerLinus Torvalds <torvalds@linux-foundation.org>2018-06-15 07:24:58 +0900
commitdc594c39f7a9dcdfd5dbb1a446ac6d06182e2472 (patch)
tree1296214ff63762d72e46acb1e8090e99608da746 /fs/ceph
parente7655d2b25466c534ed1f539367dae595bb0bd20 (diff)
parent23edca864951250af845a11da86bb3ea63522ed2 (diff)
downloadlinux-dc594c39f7a9dcdfd5dbb1a446ac6d06182e2472.tar.gz
Merge tag 'ceph-for-4.18-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "The main piece is a set of libceph changes that revamps how OSD
  requests are aborted, improving CephFS ENOSPC handling and making
  "umount -f" actually work (Zheng and myself).

  The rest is mostly mount option handling cleanups from Chengguang and
  assorted fixes from Zheng, Luis and Dongsheng.

* tag 'ceph-for-4.18-rc1' of git://github.com/ceph/ceph-client: (31 commits)
  rbd: flush rbd_dev->watch_dwork after watch is unregistered
  ceph: update description of some mount options
  ceph: show ino32 if the value is different with default
  ceph: strengthen rsize/wsize/readdir_max_bytes validation
  ceph: fix alignment of rasize
  ceph: fix use-after-free in ceph_statfs()
  ceph: prevent i_version from going back
  ceph: fix wrong check for the case of updating link count
  libceph: allocate the locator string with GFP_NOFAIL
  libceph: make abort_on_full a per-osdc setting
  libceph: don't abort reads in ceph_osdc_abort_on_full()
  libceph: avoid a use-after-free during map check
  libceph: don't warn if req->r_abort_on_full is set
  libceph: use for_each_request() in ceph_osdc_abort_on_full()
  libceph: defer __complete_request() to a workqueue
  libceph: move more code into __complete_request()
  libceph: no need to call flush_workqueue() before destruction
  ceph: flush pending works before shutdown super
  ceph: abort osd requests on force umount
  libceph: introduce ceph_osdc_abort_requests()
  ...
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c1
-rw-r--r--fs/ceph/caps.c160
-rw-r--r--fs/ceph/dir.c2
-rw-r--r--fs/ceph/file.c1
-rw-r--r--fs/ceph/inode.c67
-rw-r--r--fs/ceph/super.c35
-rw-r--r--fs/ceph/xattr.c60
7 files changed, 202 insertions, 124 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index c9cb2f33a6d6..afcc59ed7090 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1936,7 +1936,6 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
 	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
 
 	wr_req->r_mtime = ci->vfs_inode.i_mtime;
-	wr_req->r_abort_on_full = true;
 	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
 
 	if (!err)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 23dbfae16156..0ae41854d676 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -69,6 +69,8 @@ static char *gcap_string(char *s, int c)
 		*s++ = 'w';
 	if (c & CEPH_CAP_GBUFFER)
 		*s++ = 'b';
+	if (c & CEPH_CAP_GWREXTEND)
+		*s++ = 'a';
 	if (c & CEPH_CAP_GLAZYIO)
 		*s++ = 'l';
 	return s;
@@ -3022,30 +3024,41 @@ static void invalidate_aliases(struct inode *inode)
 		dput(prev);
 }
 
+struct cap_extra_info {
+	struct ceph_string *pool_ns;
+	/* inline data */
+	u64 inline_version;
+	void *inline_data;
+	u32 inline_len;
+	/* dirstat */
+	bool dirstat_valid;
+	u64 nfiles;
+	u64 nsubdirs;
+	/* currently issued */
+	int issued;
+};
+
 /*
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
  *
  * caller holds s_mutex and i_ceph_lock, we drop both.
  */
-static void handle_cap_grant(struct ceph_mds_client *mdsc,
-			     struct inode *inode, struct ceph_mds_caps *grant,
-			     struct ceph_string **pns, u64 inline_version,
-			     void *inline_data, u32 inline_len,
-			     struct ceph_buffer *xattr_buf,
+static void handle_cap_grant(struct inode *inode,
 			     struct ceph_mds_session *session,
-			     struct ceph_cap *cap, int issued)
+			     struct ceph_cap *cap,
+			     struct ceph_mds_caps *grant,
+			     struct ceph_buffer *xattr_buf,
+			     struct cap_extra_info *extra_info)
 	__releases(ci->i_ceph_lock)
-	__releases(mdsc->snap_rwsem)
+	__releases(session->s_mdsc->snap_rwsem)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int mds = session->s_mds;
 	int seq = le32_to_cpu(grant->seq);
 	int newcaps = le32_to_cpu(grant->caps);
 	int used, wanted, dirty;
 	u64 size = le64_to_cpu(grant->size);
 	u64 max_size = le64_to_cpu(grant->max_size);
-	struct timespec mtime, atime, ctime;
 	int check_caps = 0;
 	bool wake = false;
 	bool writeback = false;
@@ -3055,7 +3068,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 	bool fill_inline = false;
 
 	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
-	     inode, cap, mds, seq, ceph_cap_string(newcaps));
+	     inode, cap, session->s_mds, seq, ceph_cap_string(newcaps));
 	dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
 		inode->i_size);
 
@@ -3101,7 +3114,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 	__check_cap_issue(ci, cap, newcaps);
 
 	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
-	    (issued & CEPH_CAP_AUTH_EXCL) == 0) {
+	    (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
 		inode->i_mode = le32_to_cpu(grant->mode);
 		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
 		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
@@ -3110,15 +3123,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 		     from_kgid(&init_user_ns, inode->i_gid));
 	}
 
-	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
-	    (issued & CEPH_CAP_LINK_EXCL) == 0) {
+	if ((newcaps & CEPH_CAP_LINK_SHARED) &&
+	    (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
 		set_nlink(inode, le32_to_cpu(grant->nlink));
 		if (inode->i_nlink == 0 &&
 		    (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
 			deleted_inode = true;
 	}
 
-	if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
+	if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
+	    grant->xattr_len) {
 		int len = le32_to_cpu(grant->xattr_len);
 		u64 version = le64_to_cpu(grant->xattr_version);
 
@@ -3134,15 +3148,21 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 	}
 
 	if (newcaps & CEPH_CAP_ANY_RD) {
+		struct timespec mtime, atime, ctime;
 		/* ctime/mtime/atime? */
 		ceph_decode_timespec(&mtime, &grant->mtime);
 		ceph_decode_timespec(&atime, &grant->atime);
 		ceph_decode_timespec(&ctime, &grant->ctime);
-		ceph_fill_file_time(inode, issued,
+		ceph_fill_file_time(inode, extra_info->issued,
 				    le32_to_cpu(grant->time_warp_seq),
 				    &ctime, &mtime, &atime);
 	}
 
+	if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) {
+		ci->i_files = extra_info->nfiles;
+		ci->i_subdirs = extra_info->nsubdirs;
+	}
+
 	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
 		/* file layout may have changed */
 		s64 old_pool = ci->i_layout.pool_id;
@@ -3151,15 +3171,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 		ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
 		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
 					lockdep_is_held(&ci->i_ceph_lock));
-		rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
+		rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);
 
-		if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
+		if (ci->i_layout.pool_id != old_pool ||
+		    extra_info->pool_ns != old_ns)
 			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 
-		*pns = old_ns;
+		extra_info->pool_ns = old_ns;
 
 		/* size/truncate_seq? */
-		queue_trunc = ceph_fill_file_size(inode, issued,
+		queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
 					le32_to_cpu(grant->truncate_seq),
 					le64_to_cpu(grant->truncate_size),
 					size);
@@ -3238,24 +3259,26 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
 	}
 	BUG_ON(cap->issued & ~cap->implemented);
 
-	if (inline_version > 0 && inline_version >= ci->i_inline_version) {
-		ci->i_inline_version = inline_version;
+	if (extra_info->inline_version > 0 &&
+	    extra_info->inline_version >= ci->i_inline_version) {
+		ci->i_inline_version = extra_info->inline_version;
 		if (ci->i_inline_version != CEPH_INLINE_NONE &&
 		    (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
 			fill_inline = true;
 	}
 
 	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
-		if (newcaps & ~issued)
+		if (newcaps & ~extra_info->issued)
 			wake = true;
-		kick_flushing_inode_caps(mdsc, session, inode);
-		up_read(&mdsc->snap_rwsem);
+		kick_flushing_inode_caps(session->s_mdsc, session, inode);
+		up_read(&session->s_mdsc->snap_rwsem);
 	} else {
 		spin_unlock(&ci->i_ceph_lock);
 	}
 
 	if (fill_inline)
-		ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
+		ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
+				      extra_info->inline_len);
 
 	if (queue_trunc)
 		ceph_queue_vmtruncate(inode);
@@ -3720,31 +3743,25 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		      struct ceph_msg *msg)
 {
 	struct ceph_mds_client *mdsc = session->s_mdsc;
-	struct super_block *sb = mdsc->fsc->sb;
 	struct inode *inode;
 	struct ceph_inode_info *ci;
 	struct ceph_cap *cap;
 	struct ceph_mds_caps *h;
 	struct ceph_mds_cap_peer *peer = NULL;
 	struct ceph_snap_realm *realm = NULL;
-	struct ceph_string *pool_ns = NULL;
-	int mds = session->s_mds;
-	int op, issued;
+	int op;
+	int msg_version = le16_to_cpu(msg->hdr.version);
 	u32 seq, mseq;
 	struct ceph_vino vino;
-	u64 tid;
-	u64 inline_version = 0;
-	void *inline_data = NULL;
-	u32  inline_len = 0;
 	void *snaptrace;
 	size_t snaptrace_len;
 	void *p, *end;
+	struct cap_extra_info extra_info = {};
 
-	dout("handle_caps from mds%d\n", mds);
+	dout("handle_caps from mds%d\n", session->s_mds);
 
 	/* decode */
 	end = msg->front.iov_base + msg->front.iov_len;
-	tid = le64_to_cpu(msg->hdr.tid);
 	if (msg->front.iov_len < sizeof(*h))
 		goto bad;
 	h = msg->front.iov_base;
@@ -3758,7 +3775,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	snaptrace_len = le32_to_cpu(h->snap_trace_len);
 	p = snaptrace + snaptrace_len;
 
-	if (le16_to_cpu(msg->hdr.version) >= 2) {
+	if (msg_version >= 2) {
 		u32 flock_len;
 		ceph_decode_32_safe(&p, end, flock_len, bad);
 		if (p + flock_len > end)
@@ -3766,7 +3783,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		p += flock_len;
 	}
 
-	if (le16_to_cpu(msg->hdr.version) >= 3) {
+	if (msg_version >= 3) {
 		if (op == CEPH_CAP_OP_IMPORT) {
 			if (p + sizeof(*peer) > end)
 				goto bad;
@@ -3778,16 +3795,16 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		}
 	}
 
-	if (le16_to_cpu(msg->hdr.version) >= 4) {
-		ceph_decode_64_safe(&p, end, inline_version, bad);
-		ceph_decode_32_safe(&p, end, inline_len, bad);
-		if (p + inline_len > end)
+	if (msg_version >= 4) {
+		ceph_decode_64_safe(&p, end, extra_info.inline_version, bad);
+		ceph_decode_32_safe(&p, end, extra_info.inline_len, bad);
+		if (p + extra_info.inline_len > end)
 			goto bad;
-		inline_data = p;
-		p += inline_len;
+		extra_info.inline_data = p;
+		p += extra_info.inline_len;
 	}
 
-	if (le16_to_cpu(msg->hdr.version) >= 5) {
+	if (msg_version >= 5) {
 		struct ceph_osd_client	*osdc = &mdsc->fsc->client->osdc;
 		u32			epoch_barrier;
 
@@ -3795,7 +3812,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
 	}
 
-	if (le16_to_cpu(msg->hdr.version) >= 8) {
+	if (msg_version >= 8) {
 		u64 flush_tid;
 		u32 caller_uid, caller_gid;
 		u32 pool_ns_len;
@@ -3809,13 +3826,33 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 		ceph_decode_32_safe(&p, end, pool_ns_len, bad);
 		if (pool_ns_len > 0) {
 			ceph_decode_need(&p, end, pool_ns_len, bad);
-			pool_ns = ceph_find_or_create_string(p, pool_ns_len);
+			extra_info.pool_ns =
+				ceph_find_or_create_string(p, pool_ns_len);
 			p += pool_ns_len;
 		}
 	}
 
+	if (msg_version >= 11) {
+		struct ceph_timespec *btime;
+		u64 change_attr;
+		u32 flags;
+
+		/* version >= 9 */
+		if (p + sizeof(*btime) > end)
+			goto bad;
+		btime = p;
+		p += sizeof(*btime);
+		ceph_decode_64_safe(&p, end, change_attr, bad);
+		/* version >= 10 */
+		ceph_decode_32_safe(&p, end, flags, bad);
+		/* version >= 11 */
+		extra_info.dirstat_valid = true;
+		ceph_decode_64_safe(&p, end, extra_info.nfiles, bad);
+		ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad);
+	}
+
 	/* lookup ino */
-	inode = ceph_find_inode(sb, vino);
+	inode = ceph_find_inode(mdsc->fsc->sb, vino);
 	ci = ceph_inode(inode);
 	dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
 	     vino.snap, inode);
@@ -3848,7 +3885,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	/* these will work even if we don't have a cap yet */
 	switch (op) {
 	case CEPH_CAP_OP_FLUSHSNAP_ACK:
-		handle_cap_flushsnap_ack(inode, tid, h, session);
+		handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
+					 h, session);
 		goto done;
 
 	case CEPH_CAP_OP_EXPORT:
@@ -3867,10 +3905,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 			down_read(&mdsc->snap_rwsem);
 		}
 		handle_cap_import(mdsc, inode, h, peer, session,
-				  &cap, &issued);
-		handle_cap_grant(mdsc, inode, h, &pool_ns,
-				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued);
+				  &cap, &extra_info.issued);
+		handle_cap_grant(inode, session, cap,
+				 h, msg->middle, &extra_info);
 		if (realm)
 			ceph_put_snap_realm(mdsc, realm);
 		goto done_unlocked;
@@ -3878,10 +3915,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
 	/* the rest require a cap */
 	spin_lock(&ci->i_ceph_lock);
-	cap = __get_cap_for_mds(ceph_inode(inode), mds);
+	cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds);
 	if (!cap) {
 		dout(" no cap on %p ino %llx.%llx from mds%d\n",
-		     inode, ceph_ino(inode), ceph_snap(inode), mds);
+		     inode, ceph_ino(inode), ceph_snap(inode),
+		     session->s_mds);
 		spin_unlock(&ci->i_ceph_lock);
 		goto flush_cap_releases;
 	}
@@ -3890,15 +3928,15 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 	switch (op) {
 	case CEPH_CAP_OP_REVOKE:
 	case CEPH_CAP_OP_GRANT:
-		__ceph_caps_issued(ci, &issued);
-		issued |= __ceph_caps_dirty(ci);
-		handle_cap_grant(mdsc, inode, h, &pool_ns,
-				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued);
+		__ceph_caps_issued(ci, &extra_info.issued);
+		extra_info.issued |= __ceph_caps_dirty(ci);
+		handle_cap_grant(inode, session, cap,
+				 h, msg->middle, &extra_info);
 		goto done_unlocked;
 
 	case CEPH_CAP_OP_FLUSH_ACK:
-		handle_cap_flush_ack(inode, tid, h, session, cap);
+		handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
+				     h, session, cap);
 		break;
 
 	case CEPH_CAP_OP_TRUNC:
@@ -3925,7 +3963,7 @@ done:
 	mutex_unlock(&session->s_mutex);
 done_unlocked:
 	iput(inode);
-	ceph_put_string(pool_ns);
+	ceph_put_string(extra_info.pool_ns);
 	return;
 
 bad:
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 1a78dd6f8bf2..036ac0f3a393 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1486,6 +1486,8 @@ const struct file_operations ceph_dir_fops = {
 	.release = ceph_release,
 	.unlocked_ioctl = ceph_ioctl,
 	.fsync = ceph_fsync,
+	.lock = ceph_lock,
+	.flock = ceph_flock,
 };
 
 const struct file_operations ceph_snapdir_fops = {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index cf0e45b10121..6b9f7f3cd237 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -895,7 +895,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	req->r_callback = ceph_aio_complete_req;
 	req->r_inode = inode;
 	req->r_priv = aio_req;
-	req->r_abort_on_full = true;
 
 	ret = ceph_osdc_start_request(req->r_osdc, req, false);
 out:
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ae056927080d..4fda7a9d4c9d 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -739,7 +739,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct ceph_mds_reply_inode *info = iinfo->in;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int issued = 0, implemented, new_issued;
+	int issued, new_issued, info_caps;
 	struct timespec mtime, atime, ctime;
 	struct ceph_buffer *xattr_blob = NULL;
 	struct ceph_string *pool_ns = NULL;
@@ -754,8 +754,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 	     inode, ceph_vinop(inode), le64_to_cpu(info->version),
 	     ci->i_version);
 
+	info_caps = le32_to_cpu(info->cap.caps);
+
 	/* prealloc new cap struct */
-	if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
+	if (info_caps && ceph_snap(inode) == CEPH_NOSNAP)
 		new_cap = ceph_get_cap(mdsc, caps_reservation);
 
 	/*
@@ -792,9 +794,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 	     le64_to_cpu(info->version) > (ci->i_version & ~1)))
 		new_version = true;
 
-	issued = __ceph_caps_issued(ci, &implemented);
-	issued |= implemented | __ceph_caps_dirty(ci);
-	new_issued = ~issued & le32_to_cpu(info->cap.caps);
+	__ceph_caps_issued(ci, &issued);
+	issued |= __ceph_caps_dirty(ci);
+	new_issued = ~issued & info_caps;
 
 	/* update inode */
 	inode->i_rdev = le32_to_cpu(info->rdev);
@@ -826,6 +828,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 				&ctime, &mtime, &atime);
 	}
 
+	if (new_version || (info_caps & CEPH_CAP_FILE_SHARED)) {
+		ci->i_files = le64_to_cpu(info->files);
+		ci->i_subdirs = le64_to_cpu(info->subdirs);
+	}
+
 	if (new_version ||
 	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
 		s64 old_pool = ci->i_layout.pool_id;
@@ -854,6 +861,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 		}
 	}
 
+	/* layout and rstat are not tracked by capability, update them if
+	 * the inode info is from auth mds */
+	if (new_version || (info->cap.flags & CEPH_CAP_FLAG_AUTH)) {
+		if (S_ISDIR(inode->i_mode)) {
+			ci->i_dir_layout = iinfo->dir_layout;
+			ci->i_rbytes = le64_to_cpu(info->rbytes);
+			ci->i_rfiles = le64_to_cpu(info->rfiles);
+			ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
+			ceph_decode_timespec(&ci->i_rctime, &info->rctime);
+		}
+	}
+
 	/* xattrs */
 	/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
 	if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))  &&
@@ -870,7 +889,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 	}
 
 	/* finally update i_version */
-	ci->i_version = le64_to_cpu(info->version);
+	if (le64_to_cpu(info->version) > ci->i_version)
+		ci->i_version = le64_to_cpu(info->version);
 
 	inode->i_mapping->a_ops = &ceph_aops;
 
@@ -918,15 +938,6 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 	case S_IFDIR:
 		inode->i_op = &ceph_dir_iops;
 		inode->i_fop = &ceph_dir_fops;
-
-		ci->i_dir_layout = iinfo->dir_layout;
-
-		ci->i_files = le64_to_cpu(info->files);
-		ci->i_subdirs = le64_to_cpu(info->subdirs);
-		ci->i_rbytes = le64_to_cpu(info->rbytes);
-		ci->i_rfiles = le64_to_cpu(info->rfiles);
-		ci->i_rsubdirs = le64_to_cpu(info->rsubdirs);
-		ceph_decode_timespec(&ci->i_rctime, &info->rctime);
 		break;
 	default:
 		pr_err("fill_inode %llx.%llx BAD mode 0%o\n",
@@ -934,12 +945,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 	}
 
 	/* were we issued a capability? */
-	if (info->cap.caps) {
+	if (info_caps) {
 		if (ceph_snap(inode) == CEPH_NOSNAP) {
-			unsigned caps = le32_to_cpu(info->cap.caps);
 			ceph_add_cap(inode, session,
 				     le64_to_cpu(info->cap.cap_id),
-				     cap_fmode, caps,
+				     cap_fmode, info_caps,
 				     le32_to_cpu(info->cap.wanted),
 				     le32_to_cpu(info->cap.seq),
 				     le32_to_cpu(info->cap.mseq),
@@ -949,7 +959,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 			/* set dir completion flag? */
 			if (S_ISDIR(inode->i_mode) &&
 			    ci->i_files == 0 && ci->i_subdirs == 0 &&
-			    (caps & CEPH_CAP_FILE_SHARED) &&
+			    (info_caps & CEPH_CAP_FILE_SHARED) &&
 			    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
 			    !__ceph_dir_is_complete(ci)) {
 				dout(" marking %p complete (empty)\n", inode);
@@ -962,8 +972,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 			wake = true;
 		} else {
 			dout(" %p got snap_caps %s\n", inode,
-			     ceph_cap_string(le32_to_cpu(info->cap.caps)));
-			ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
+			     ceph_cap_string(info_caps));
+			ci->i_snap_caps |= info_caps;
 			if (cap_fmode >= 0)
 				__ceph_get_fmode(ci, cap_fmode);
 		}
@@ -978,8 +988,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 		int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
 		ci->i_inline_version = iinfo->inline_version;
 		if (ci->i_inline_version != CEPH_INLINE_NONE &&
-		    (locked_page ||
-		     (le32_to_cpu(info->cap.caps) & cache_caps)))
+		    (locked_page || (info_caps & cache_caps)))
 			fill_inline = true;
 	}
 
@@ -2178,6 +2187,7 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
 	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
+	int mode;
 	int err;
 
 	if (ceph_snap(inode) == CEPH_SNAPDIR) {
@@ -2190,7 +2200,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
 	if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
 		return 0;
 
-	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
+	mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
+	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
 	if (IS_ERR(req))
 		return PTR_ERR(req);
 	req->r_inode = inode;
@@ -2261,6 +2272,14 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
 				stat->size = ci->i_files + ci->i_subdirs;
 			stat->blocks = 0;
 			stat->blksize = 65536;
+			/*
+			 * Some applications rely on the number of st_nlink
+			 * value on directories to be either 0 (if unlinked)
+			 * or 2 + number of subdirectories.
+			 */
+			if (stat->nlink == 1)
+				/* '.' + '..' + subdirs */
+				stat->nlink = 1 + 1 + ci->i_subdirs;
 		}
 	}
 	return err;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index b33082e6878f..95a3b3ac9b6e 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -45,7 +45,7 @@ static void ceph_put_super(struct super_block *s)
 static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 {
 	struct ceph_fs_client *fsc = ceph_inode_to_client(d_inode(dentry));
-	struct ceph_monmap *monmap = fsc->client->monc.monmap;
+	struct ceph_mon_client *monc = &fsc->client->monc;
 	struct ceph_statfs st;
 	u64 fsid;
 	int err;
@@ -58,7 +58,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 	}
 
 	dout("statfs\n");
-	err = ceph_monc_do_statfs(&fsc->client->monc, data_pool, &st);
+	err = ceph_monc_do_statfs(monc, data_pool, &st);
 	if (err < 0)
 		return err;
 
@@ -94,8 +94,11 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 	buf->f_namelen = NAME_MAX;
 
 	/* Must convert the fsid, for consistent values across arches */
-	fsid = le64_to_cpu(*(__le64 *)(&monmap->fsid)) ^
-	       le64_to_cpu(*((__le64 *)&monmap->fsid + 1));
+	mutex_lock(&monc->mutex);
+	fsid = le64_to_cpu(*(__le64 *)(&monc->monmap->fsid)) ^
+	       le64_to_cpu(*((__le64 *)&monc->monmap->fsid + 1));
+	mutex_unlock(&monc->mutex);
+
 	buf->f_fsid.val[0] = fsid & 0xffffffff;
 	buf->f_fsid.val[1] = fsid >> 32;
 
@@ -256,19 +259,19 @@ static int parse_fsopt_token(char *c, void *private)
 		break;
 		/* misc */
 	case Opt_wsize:
-		if (intval < PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE)
+		if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE)
 			return -EINVAL;
 		fsopt->wsize = ALIGN(intval, PAGE_SIZE);
 		break;
 	case Opt_rsize:
-		if (intval < PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
+		if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
 			return -EINVAL;
 		fsopt->rsize = ALIGN(intval, PAGE_SIZE);
 		break;
 	case Opt_rasize:
 		if (intval < 0)
 			return -EINVAL;
-		fsopt->rasize = ALIGN(intval + PAGE_SIZE - 1, PAGE_SIZE);
+		fsopt->rasize = ALIGN(intval, PAGE_SIZE);
 		break;
 	case Opt_caps_wanted_delay_min:
 		if (intval < 1)
@@ -286,7 +289,7 @@ static int parse_fsopt_token(char *c, void *private)
 		fsopt->max_readdir = intval;
 		break;
 	case Opt_readdir_max_bytes:
-		if (intval < PAGE_SIZE && intval != 0)
+		if (intval < (int)PAGE_SIZE && intval != 0)
 			return -EINVAL;
 		fsopt->max_readdir_bytes = intval;
 		break;
@@ -534,6 +537,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_puts(m, ",noasyncreaddir");
 	if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
 		seq_puts(m, ",nodcache");
+	if (fsopt->flags & CEPH_MOUNT_OPT_INO32)
+		seq_puts(m, ",ino32");
 	if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) {
 		seq_show_option(m, "fsc", fsopt->fscache_uniq);
 	}
@@ -551,7 +556,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 
 	if (fsopt->mds_namespace)
 		seq_show_option(m, "mds_namespace", fsopt->mds_namespace);
-	if (fsopt->wsize)
+	if (fsopt->wsize != CEPH_MAX_WRITE_SIZE)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
 	if (fsopt->rsize != CEPH_MAX_READ_SIZE)
 		seq_printf(m, ",rsize=%d", fsopt->rsize);
@@ -616,7 +621,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 		err = PTR_ERR(fsc->client);
 		goto fail;
 	}
+
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
+	fsc->client->osdc.abort_on_full = true;
 
 	if (!fsopt->mds_namespace) {
 		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
@@ -674,6 +681,13 @@ fail:
 	return ERR_PTR(err);
 }
 
+static void flush_fs_workqueues(struct ceph_fs_client *fsc)
+{
+	flush_workqueue(fsc->wb_wq);
+	flush_workqueue(fsc->pg_inv_wq);
+	flush_workqueue(fsc->trunc_wq);
+}
+
 static void destroy_fs_client(struct ceph_fs_client *fsc)
 {
 	dout("destroy_fs_client %p\n", fsc);
@@ -793,6 +807,7 @@ static void ceph_umount_begin(struct super_block *sb)
 	if (!fsc)
 		return;
 	fsc->mount_state = CEPH_MOUNT_SHUTDOWN;
+	ceph_osdc_abort_requests(&fsc->client->osdc, -EIO);
 	ceph_mdsc_force_umount(fsc->mdsc);
 	return;
 }
@@ -1088,6 +1103,8 @@ static void ceph_kill_sb(struct super_block *s)
 	dout("kill_sb %p\n", s);
 
 	ceph_mdsc_pre_umount(fsc->mdsc);
+	flush_fs_workqueues(fsc);
+
 	generic_shutdown_super(s);
 
 	fsc->client->extra_mon_dispatch = NULL;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 315f7e63e7cc..5bc8edb4c2a6 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -50,10 +50,14 @@ struct ceph_vxattr {
 	size_t name_size;	/* strlen(name) + 1 (for '\0') */
 	size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
 			      size_t size);
-	bool readonly, hidden;
 	bool (*exists_cb)(struct ceph_inode_info *ci);
+	unsigned int flags;
 };
 
+#define VXATTR_FLAG_READONLY		(1<<0)
+#define VXATTR_FLAG_HIDDEN		(1<<1)
+#define VXATTR_FLAG_RSTAT		(1<<2)
+
 /* layouts */
 
 static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
@@ -262,32 +266,31 @@ static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci,
 #define CEPH_XATTR_NAME2(_type, _name, _name2)	\
 	XATTR_CEPH_PREFIX #_type "." #_name "." #_name2
 
-#define XATTR_NAME_CEPH(_type, _name)					\
+#define XATTR_NAME_CEPH(_type, _name, _flags)				\
 	{								\
 		.name = CEPH_XATTR_NAME(_type, _name),			\
 		.name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
 		.getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
-		.readonly = true,				\
-		.hidden = false,				\
-		.exists_cb = NULL,			\
+		.exists_cb = NULL,					\
+		.flags = (VXATTR_FLAG_READONLY | _flags),		\
 	}
+#define XATTR_RSTAT_FIELD(_type, _name)			\
+	XATTR_NAME_CEPH(_type, _name, VXATTR_FLAG_RSTAT)
 #define XATTR_LAYOUT_FIELD(_type, _name, _field)			\
 	{								\
 		.name = CEPH_XATTR_NAME2(_type, _name, _field),	\
 		.name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \
 		.getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \
-		.readonly = false,				\
-		.hidden = true,			\
 		.exists_cb = ceph_vxattrcb_layout_exists,	\
+		.flags = VXATTR_FLAG_HIDDEN,			\
 	}
 #define XATTR_QUOTA_FIELD(_type, _name)					\
 	{								\
 		.name = CEPH_XATTR_NAME(_type, _name),			\
 		.name_size = sizeof(CEPH_XATTR_NAME(_type, _name)),	\
 		.getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name,	\
-		.readonly = false,					\
-		.hidden = true,						\
 		.exists_cb = ceph_vxattrcb_quota_exists,		\
+		.flags = VXATTR_FLAG_HIDDEN,				\
 	}
 
 static struct ceph_vxattr ceph_dir_vxattrs[] = {
@@ -295,30 +298,28 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
 		.name = "ceph.dir.layout",
 		.name_size = sizeof("ceph.dir.layout"),
 		.getxattr_cb = ceph_vxattrcb_layout,
-		.readonly = false,
-		.hidden = true,
 		.exists_cb = ceph_vxattrcb_layout_exists,
+		.flags = VXATTR_FLAG_HIDDEN,
 	},
 	XATTR_LAYOUT_FIELD(dir, layout, stripe_unit),
 	XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
 	XATTR_LAYOUT_FIELD(dir, layout, object_size),
 	XATTR_LAYOUT_FIELD(dir, layout, pool),
 	XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
-	XATTR_NAME_CEPH(dir, entries),
-	XATTR_NAME_CEPH(dir, files),
-	XATTR_NAME_CEPH(dir, subdirs),
-	XATTR_NAME_CEPH(dir, rentries),
-	XATTR_NAME_CEPH(dir, rfiles),
-	XATTR_NAME_CEPH(dir, rsubdirs),
-	XATTR_NAME_CEPH(dir, rbytes),
-	XATTR_NAME_CEPH(dir, rctime),
+	XATTR_NAME_CEPH(dir, entries, 0),
+	XATTR_NAME_CEPH(dir, files, 0),
+	XATTR_NAME_CEPH(dir, subdirs, 0),
+	XATTR_RSTAT_FIELD(dir, rentries),
+	XATTR_RSTAT_FIELD(dir, rfiles),
+	XATTR_RSTAT_FIELD(dir, rsubdirs),
+	XATTR_RSTAT_FIELD(dir, rbytes),
+	XATTR_RSTAT_FIELD(dir, rctime),
 	{
 		.name = "ceph.quota",
 		.name_size = sizeof("ceph.quota"),
 		.getxattr_cb = ceph_vxattrcb_quota,
-		.readonly = false,
-		.hidden = true,
 		.exists_cb = ceph_vxattrcb_quota_exists,
+		.flags = VXATTR_FLAG_HIDDEN,
 	},
 	XATTR_QUOTA_FIELD(quota, max_bytes),
 	XATTR_QUOTA_FIELD(quota, max_files),
@@ -333,9 +334,8 @@ static struct ceph_vxattr ceph_file_vxattrs[] = {
 		.name = "ceph.file.layout",
 		.name_size = sizeof("ceph.file.layout"),
 		.getxattr_cb = ceph_vxattrcb_layout,
-		.readonly = false,
-		.hidden = true,
 		.exists_cb = ceph_vxattrcb_layout_exists,
+		.flags = VXATTR_FLAG_HIDDEN,
 	},
 	XATTR_LAYOUT_FIELD(file, layout, stripe_unit),
 	XATTR_LAYOUT_FIELD(file, layout, stripe_count),
@@ -374,9 +374,10 @@ static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs)
 	struct ceph_vxattr *vxattr;
 	size_t size = 0;
 
-	for (vxattr = vxattrs; vxattr->name; vxattr++)
-		if (!vxattr->hidden)
+	for (vxattr = vxattrs; vxattr->name; vxattr++) {
+		if (!(vxattr->flags & VXATTR_FLAG_HIDDEN))
 			size += vxattr->name_size;
+	}
 
 	return size;
 }
@@ -809,7 +810,10 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 	/* let's see if a virtual xattr was requested */
 	vxattr = ceph_match_vxattr(inode, name);
 	if (vxattr) {
-		err = ceph_do_getattr(inode, 0, true);
+		int mask = 0;
+		if (vxattr->flags & VXATTR_FLAG_RSTAT)
+			mask |= CEPH_STAT_RSTAT;
+		err = ceph_do_getattr(inode, mask, true);
 		if (err)
 			return err;
 		err = -ENODATA;
@@ -919,7 +923,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
 	err = namelen;
 	if (vxattrs) {
 		for (i = 0; vxattrs[i].name; i++) {
-			if (!vxattrs[i].hidden &&
+			if (!(vxattrs[i].flags & VXATTR_FLAG_HIDDEN) &&
 			    !(vxattrs[i].exists_cb &&
 			      !vxattrs[i].exists_cb(ci))) {
 				len = sprintf(names, "%s", vxattrs[i].name);
@@ -1024,7 +1028,7 @@ int __ceph_setxattr(struct inode *inode, const char *name,
 
 	vxattr = ceph_match_vxattr(inode, name);
 	if (vxattr) {
-		if (vxattr->readonly)
+		if (vxattr->flags & VXATTR_FLAG_READONLY)
 			return -EOPNOTSUPP;
 		if (value && !strncmp(vxattr->name, "ceph.quota", 10))
 			check_realm = true;