summary refs log tree commit diff
path: root/fs/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c324
-rw-r--r--fs/ceph/caps.c11
-rw-r--r--fs/ceph/dir.c69
-rw-r--r--fs/ceph/export.c13
-rw-r--r--fs/ceph/file.c15
-rw-r--r--fs/ceph/inode.c34
-rw-r--r--fs/ceph/mds_client.c7
-rw-r--r--fs/ceph/snap.c16
-rw-r--r--fs/ceph/super.c47
-rw-r--r--fs/ceph/super.h23
-rw-r--r--fs/ceph/xattr.c78
11 files changed, 419 insertions, 218 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 19adeb0ef82a..fc5cae2a0db2 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,8 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
 
 static int ceph_releasepage(struct page *page, gfp_t g)
 {
-	struct inode *inode = page->mapping ? page->mapping->host : NULL;
-	dout("%p releasepage %p idx %lu\n", inode, page, page->index);
+	dout("%p releasepage %p idx %lu\n", page->mapping->host,
+	     page, page->index);
 	WARN_ON(PageDirty(page));
 
 	/* Can we release the page from the cache? */
@@ -276,7 +276,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 	for (i = 0; i < num_pages; i++) {
 		struct page *page = osd_data->pages[i];
 
-		if (rc < 0 && rc != ENOENT)
+		if (rc < 0 && rc != -ENOENT)
 			goto unlock;
 		if (bytes < (int)PAGE_CACHE_SIZE) {
 			/* zero (remainder of) page */
@@ -606,71 +606,71 @@ static void writepages_finish(struct ceph_osd_request *req,
 	struct inode *inode = req->r_inode;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_osd_data *osd_data;
-	unsigned wrote;
 	struct page *page;
-	int num_pages;
-	int i;
+	int num_pages, total_pages = 0;
+	int i, j;
+	int rc = req->r_result;
 	struct ceph_snap_context *snapc = req->r_snapc;
 	struct address_space *mapping = inode->i_mapping;
-	int rc = req->r_result;
-	u64 bytes = req->r_ops[0].extent.length;
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	long writeback_stat;
-	unsigned issued = ceph_caps_issued(ci);
+	bool remove_page;
 
-	osd_data = osd_req_op_extent_osd_data(req, 0);
-	BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
-	num_pages = calc_pages_for((u64)osd_data->alignment,
-					(u64)osd_data->length);
-	if (rc >= 0) {
-		/*
-		 * Assume we wrote the pages we originally sent.  The
-		 * osd might reply with fewer pages if our writeback
-		 * raced with a truncation and was adjusted at the osd,
-		 * so don't believe the reply.
-		 */
-		wrote = num_pages;
-	} else {
-		wrote = 0;
+
+	dout("writepages_finish %p rc %d\n", inode, rc);
+	if (rc < 0)
 		mapping_set_error(mapping, rc);
-	}
-	dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
-	     inode, rc, bytes, wrote);
 
-	/* clean all pages */
-	for (i = 0; i < num_pages; i++) {
-		page = osd_data->pages[i];
-		BUG_ON(!page);
-		WARN_ON(!PageUptodate(page));
+	/*
+	 * We lost the cache cap, need to truncate the page before
+	 * it is unlocked, otherwise we'd truncate it later in the
+	 * page truncation thread, possibly losing some data that
+	 * raced its way in
+	 */
+	remove_page = !(ceph_caps_issued(ci) &
+			(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
 
-		writeback_stat =
-			atomic_long_dec_return(&fsc->writeback_count);
-		if (writeback_stat <
-		    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
-			clear_bdi_congested(&fsc->backing_dev_info,
-					    BLK_RW_ASYNC);
+	/* clean all pages */
+	for (i = 0; i < req->r_num_ops; i++) {
+		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
+			break;
 
-		ceph_put_snap_context(page_snap_context(page));
-		page->private = 0;
-		ClearPagePrivate(page);
-		dout("unlocking %d %p\n", i, page);
-		end_page_writeback(page);
+		osd_data = osd_req_op_extent_osd_data(req, i);
+		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
+		num_pages = calc_pages_for((u64)osd_data->alignment,
+					   (u64)osd_data->length);
+		total_pages += num_pages;
+		for (j = 0; j < num_pages; j++) {
+			page = osd_data->pages[j];
+			BUG_ON(!page);
+			WARN_ON(!PageUptodate(page));
+
+			if (atomic_long_dec_return(&fsc->writeback_count) <
+			     CONGESTION_OFF_THRESH(
+					fsc->mount_options->congestion_kb))
+				clear_bdi_congested(&fsc->backing_dev_info,
+						    BLK_RW_ASYNC);
+
+			ceph_put_snap_context(page_snap_context(page));
+			page->private = 0;
+			ClearPagePrivate(page);
+			dout("unlocking %p\n", page);
+			end_page_writeback(page);
+
+			if (remove_page)
+				generic_error_remove_page(inode->i_mapping,
+							  page);
 
-		/*
-		 * We lost the cache cap, need to truncate the page before
-		 * it is unlocked, otherwise we'd truncate it later in the
-		 * page truncation thread, possibly losing some data that
-		 * raced its way in
-		 */
-		if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
-			generic_error_remove_page(inode->i_mapping, page);
+			unlock_page(page);
+		}
+		dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
+		     inode, osd_data->length, rc >= 0 ? num_pages : 0);
 
-		unlock_page(page);
+		ceph_release_pages(osd_data->pages, num_pages);
 	}
-	dout("%p wrote+cleaned %d pages\n", inode, wrote);
-	ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc);
 
-	ceph_release_pages(osd_data->pages, num_pages);
+	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
+
+	osd_data = osd_req_op_extent_osd_data(req, 0);
 	if (osd_data->pages_from_pool)
 		mempool_free(osd_data->pages,
 			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
@@ -778,17 +778,15 @@ retry:
 	while (!done && index <= end) {
 		unsigned i;
 		int first;
-		pgoff_t next;
-		int pvec_pages, locked_pages;
-		struct page **pages = NULL;
+		pgoff_t strip_unit_end = 0;
+		int num_ops = 0, op_idx;
+		int pvec_pages, locked_pages = 0;
+		struct page **pages = NULL, **data_pages;
 		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
 		struct page *page;
 		int want;
-		u64 offset, len;
-		long writeback_stat;
+		u64 offset = 0, len = 0;
 
-		next = 0;
-		locked_pages = 0;
 		max_pages = max_pages_ever;
 
 get_more_pages:
@@ -824,8 +822,8 @@ get_more_pages:
 				unlock_page(page);
 				break;
 			}
-			if (next && (page->index != next)) {
-				dout("not consecutive %p\n", page);
+			if (strip_unit_end && (page->index > strip_unit_end)) {
+				dout("end of strip unit %p\n", page);
 				unlock_page(page);
 				break;
 			}
@@ -867,36 +865,31 @@ get_more_pages:
 			/*
 			 * We have something to write.  If this is
 			 * the first locked page this time through,
-			 * allocate an osd request and a page array
-			 * that it will use.
+			 * calculate max possinle write size and
+			 * allocate a page array
 			 */
 			if (locked_pages == 0) {
-				BUG_ON(pages);
+				u64 objnum;
+				u64 objoff;
+
 				/* prepare async write request */
 				offset = (u64)page_offset(page);
 				len = wsize;
-				req = ceph_osdc_new_request(&fsc->client->osdc,
-							&ci->i_layout, vino,
-							offset, &len, 0,
-							do_sync ? 2 : 1,
-							CEPH_OSD_OP_WRITE,
-							CEPH_OSD_FLAG_WRITE |
-							CEPH_OSD_FLAG_ONDISK,
-							snapc, truncate_seq,
-							truncate_size, true);
-				if (IS_ERR(req)) {
-					rc = PTR_ERR(req);
+
+				rc = ceph_calc_file_object_mapping(&ci->i_layout,
+								offset, len,
+								&objnum, &objoff,
+								&len);
+				if (rc < 0) {
 					unlock_page(page);
 					break;
 				}
 
-				if (do_sync)
-					osd_req_op_init(req, 1,
-							CEPH_OSD_OP_STARTSYNC, 0);
-
-				req->r_callback = writepages_finish;
-				req->r_inode = inode;
+				num_ops = 1 + do_sync;
+				strip_unit_end = page->index +
+					((len - 1) >> PAGE_CACHE_SHIFT);
 
+				BUG_ON(pages);
 				max_pages = calc_pages_for(0, (u64)len);
 				pages = kmalloc(max_pages * sizeof (*pages),
 						GFP_NOFS);
@@ -905,6 +898,20 @@ get_more_pages:
 					pages = mempool_alloc(pool, GFP_NOFS);
 					BUG_ON(!pages);
 				}
+
+				len = 0;
+			} else if (page->index !=
+				   (offset + len) >> PAGE_CACHE_SHIFT) {
+				if (num_ops >= (pool ?  CEPH_OSD_SLAB_OPS :
+							CEPH_OSD_MAX_OPS)) {
+					redirty_page_for_writepage(wbc, page);
+					unlock_page(page);
+					break;
+				}
+
+				num_ops++;
+				offset = (u64)page_offset(page);
+				len = 0;
 			}
 
 			/* note position of first page in pvec */
@@ -913,18 +920,16 @@ get_more_pages:
 			dout("%p will write page %p idx %lu\n",
 			     inode, page, page->index);
 
-			writeback_stat =
-			       atomic_long_inc_return(&fsc->writeback_count);
-			if (writeback_stat > CONGESTION_ON_THRESH(
+			if (atomic_long_inc_return(&fsc->writeback_count) >
+			    CONGESTION_ON_THRESH(
 				    fsc->mount_options->congestion_kb)) {
 				set_bdi_congested(&fsc->backing_dev_info,
 						  BLK_RW_ASYNC);
 			}
 
-			set_page_writeback(page);
 			pages[locked_pages] = page;
 			locked_pages++;
-			next = page->index + 1;
+			len += PAGE_CACHE_SIZE;
 		}
 
 		/* did we get anything? */
@@ -944,38 +949,119 @@ get_more_pages:
 			/* shift unused pages over in the pvec...  we
 			 * will need to release them below. */
 			for (j = i; j < pvec_pages; j++) {
-				dout(" pvec leftover page %p\n",
-				     pvec.pages[j]);
+				dout(" pvec leftover page %p\n", pvec.pages[j]);
 				pvec.pages[j-i+first] = pvec.pages[j];
 			}
 			pvec.nr -= i-first;
 		}
 
-		/* Format the osd request message and submit the write */
+new_request:
 		offset = page_offset(pages[0]);
-		len = (u64)locked_pages << PAGE_CACHE_SHIFT;
-		if (snap_size == -1) {
-			len = min(len, (u64)i_size_read(inode) - offset);
-			 /* writepages_finish() clears writeback pages
-			  * according to the data length, so make sure
-			  * data length covers all locked pages */
-			len = max(len, 1 +
-				((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT));
-		} else {
-			len = min(len, snap_size - offset);
+		len = wsize;
+
+		req = ceph_osdc_new_request(&fsc->client->osdc,
+					&ci->i_layout, vino,
+					offset, &len, 0, num_ops,
+					CEPH_OSD_OP_WRITE,
+					CEPH_OSD_FLAG_WRITE |
+					CEPH_OSD_FLAG_ONDISK,
+					snapc, truncate_seq,
+					truncate_size, false);
+		if (IS_ERR(req)) {
+			req = ceph_osdc_new_request(&fsc->client->osdc,
+						&ci->i_layout, vino,
+						offset, &len, 0,
+						min(num_ops,
+						    CEPH_OSD_SLAB_OPS),
+						CEPH_OSD_OP_WRITE,
+						CEPH_OSD_FLAG_WRITE |
+						CEPH_OSD_FLAG_ONDISK,
+						snapc, truncate_seq,
+						truncate_size, true);
+			BUG_ON(IS_ERR(req));
 		}
-		dout("writepages got %d pages at %llu~%llu\n",
-		     locked_pages, offset, len);
+		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
+			     PAGE_CACHE_SIZE - offset);
+
+		req->r_callback = writepages_finish;
+		req->r_inode = inode;
 
-		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+		/* Format the osd request message and submit the write */
+		len = 0;
+		data_pages = pages;
+		op_idx = 0;
+		for (i = 0; i < locked_pages; i++) {
+			u64 cur_offset = page_offset(pages[i]);
+			if (offset + len != cur_offset) {
+				if (op_idx + do_sync + 1 == req->r_num_ops)
+					break;
+				osd_req_op_extent_dup_last(req, op_idx,
+							   cur_offset - offset);
+				dout("writepages got pages at %llu~%llu\n",
+				     offset, len);
+				osd_req_op_extent_osd_data_pages(req, op_idx,
+							data_pages, len, 0,
 							!!pool, false);
+				osd_req_op_extent_update(req, op_idx, len);
 
-		pages = NULL;	/* request message now owns the pages array */
-		pool = NULL;
+				len = 0;
+				offset = cur_offset; 
+				data_pages = pages + i;
+				op_idx++;
+			}
 
-		/* Update the write op length in case we changed it */
+			set_page_writeback(pages[i]);
+			len += PAGE_CACHE_SIZE;
+		}
+
+		if (snap_size != -1) {
+			len = min(len, snap_size - offset);
+		} else if (i == locked_pages) {
+			/* writepages_finish() clears writeback pages
+			 * according to the data length, so make sure
+			 * data length covers all locked pages */
+			u64 min_len = len + 1 - PAGE_CACHE_SIZE;
+			len = min(len, (u64)i_size_read(inode) - offset);
+			len = max(len, min_len);
+		}
+		dout("writepages got pages at %llu~%llu\n", offset, len);
+
+		osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
+						 0, !!pool, false);
+		osd_req_op_extent_update(req, op_idx, len);
 
-		osd_req_op_extent_update(req, 0, len);
+		if (do_sync) {
+			op_idx++;
+			osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
+		}
+		BUG_ON(op_idx + 1 != req->r_num_ops);
+
+		pool = NULL;
+		if (i < locked_pages) {
+			BUG_ON(num_ops <= req->r_num_ops);
+			num_ops -= req->r_num_ops;
+			num_ops += do_sync;
+			locked_pages -= i;
+
+			/* allocate new pages array for next request */
+			data_pages = pages;
+			pages = kmalloc(locked_pages * sizeof (*pages),
+					GFP_NOFS);
+			if (!pages) {
+				pool = fsc->wb_pagevec_pool;
+				pages = mempool_alloc(pool, GFP_NOFS);
+				BUG_ON(!pages);
+			}
+			memcpy(pages, data_pages + i,
+			       locked_pages * sizeof(*pages));
+			memset(data_pages + i, 0,
+			       locked_pages * sizeof(*pages));
+		} else {
+			BUG_ON(num_ops != req->r_num_ops);
+			index = pages[i - 1]->index + 1;
+			/* request message now owns the pages array */
+			pages = NULL;
+		}
 
 		vino = ceph_vino(inode);
 		ceph_osdc_build_request(req, offset, snapc, vino.snap,
@@ -985,9 +1071,10 @@ get_more_pages:
 		BUG_ON(rc);
 		req = NULL;
 
-		/* continue? */
-		index = next;
-		wbc->nr_to_write -= locked_pages;
+		wbc->nr_to_write -= i;
+		if (pages)
+			goto new_request;
+
 		if (wbc->nr_to_write <= 0)
 			done = 1;
 
@@ -1522,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 				    ceph_vino(inode), 0, &len, 0, 1,
 				    CEPH_OSD_OP_CREATE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ceph_empty_snapc, 0, 0, false);
+				    NULL, 0, 0, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
 		goto out;
@@ -1540,9 +1627,8 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
 				    ceph_vino(inode), 0, &len, 1, 3,
 				    CEPH_OSD_OP_WRITE,
 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
-				    ceph_empty_snapc,
-				    ci->i_truncate_seq, ci->i_truncate_size,
-				    false);
+				    NULL, ci->i_truncate_seq,
+				    ci->i_truncate_size, false);
 	if (IS_ERR(req)) {
 		err = PTR_ERR(req);
 		goto out;
@@ -1663,8 +1749,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
 		goto out;
 	}
 
-	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc,
-					 ceph_empty_snapc,
+	rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
 					 1, false, GFP_NOFS);
 	if (!rd_req) {
 		err = -ENOMEM;
@@ -1678,8 +1763,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
 		 "%llx.00000000", ci->i_vino.ino);
 	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name);
 
-	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc,
-					 ceph_empty_snapc,
+	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
 					 1, false, GFP_NOFS);
 	if (!wr_req) {
 		err = -ENOMEM;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6fe0ad26a7df..de17bb232ff8 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -991,7 +991,7 @@ static int send_cap_msg(struct ceph_mds_session *session,
 			u32 seq, u64 flush_tid, u64 oldest_flush_tid,
 			u32 issue_seq, u32 mseq, u64 size, u64 max_size,
 			struct timespec *mtime, struct timespec *atime,
-			u64 time_warp_seq,
+			struct timespec *ctime, u64 time_warp_seq,
 			kuid_t uid, kgid_t gid, umode_t mode,
 			u64 xattr_version,
 			struct ceph_buffer *xattrs_buf,
@@ -1042,6 +1042,8 @@ static int send_cap_msg(struct ceph_mds_session *session,
 		ceph_encode_timespec(&fc->mtime, mtime);
 	if (atime)
 		ceph_encode_timespec(&fc->atime, atime);
+	if (ctime)
+		ceph_encode_timespec(&fc->ctime, ctime);
 	fc->time_warp_seq = cpu_to_le32(time_warp_seq);
 
 	fc->uid = cpu_to_le32(from_kuid(&init_user_ns, uid));
@@ -1116,7 +1118,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	int held, revoking, dropping, keep;
 	u64 seq, issue_seq, mseq, time_warp_seq, follows;
 	u64 size, max_size;
-	struct timespec mtime, atime;
+	struct timespec mtime, atime, ctime;
 	int wake = 0;
 	umode_t mode;
 	kuid_t uid;
@@ -1180,6 +1182,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	ci->i_requested_max_size = max_size;
 	mtime = inode->i_mtime;
 	atime = inode->i_atime;
+	ctime = inode->i_ctime;
 	time_warp_seq = ci->i_time_warp_seq;
 	uid = inode->i_uid;
 	gid = inode->i_gid;
@@ -1198,7 +1201,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
 		op, keep, want, flushing, seq,
 		flush_tid, oldest_flush_tid, issue_seq, mseq,
-		size, max_size, &mtime, &atime, time_warp_seq,
+		size, max_size, &mtime, &atime, &ctime, time_warp_seq,
 		uid, gid, mode, xattr_version, xattr_blob,
 		follows, inline_data);
 	if (ret < 0) {
@@ -1320,7 +1323,7 @@ retry:
 			     capsnap->dirty, 0, capsnap->flush_tid, 0,
 			     0, mseq, capsnap->size, 0,
 			     &capsnap->mtime, &capsnap->atime,
-			     capsnap->time_warp_seq,
+			     &capsnap->ctime, capsnap->time_warp_seq,
 			     capsnap->uid, capsnap->gid, capsnap->mode,
 			     capsnap->xattr_version, capsnap->xattr_blob,
 			     capsnap->follows, capsnap->inline_data);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index fd11fb231a2e..fadc243dfb28 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry)
 	if (dentry->d_fsdata)
 		return 0;
 
-	di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO);
+	di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
 	if (!di)
 		return -ENOMEM;          /* oh well */
 
@@ -68,23 +68,6 @@ out_unlock:
 	return 0;
 }
 
-struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry)
-{
-	struct inode *inode = NULL;
-
-	if (!dentry)
-		return NULL;
-
-	spin_lock(&dentry->d_lock);
-	if (!IS_ROOT(dentry)) {
-		inode = d_inode(dentry->d_parent);
-		ihold(inode);
-	}
-	spin_unlock(&dentry->d_lock);
-	return inode;
-}
-
-
 /*
  * for readdir, we encode the directory frag and offset within that
  * frag into f_pos.
@@ -624,6 +607,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	struct ceph_mds_request *req;
 	int op;
+	int mask;
 	int err;
 
 	dout("lookup %p dentry %p '%pd'\n",
@@ -666,8 +650,12 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
 		return ERR_CAST(req);
 	req->r_dentry = dget(dentry);
 	req->r_num_caps = 2;
-	/* we only need inode linkage */
-	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
+
+	mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+	if (ceph_security_xattr_wanted(dir))
+		mask |= CEPH_CAP_XATTR_SHARED;
+	req->r_args.getattr.mask = cpu_to_le32(mask);
+
 	req->r_locked_dir = dir;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	err = ceph_handle_snapdir(req, dentry, err);
@@ -1095,6 +1083,7 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry)
 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 {
 	int valid = 0;
+	struct dentry *parent;
 	struct inode *dir;
 
 	if (flags & LOOKUP_RCU)
@@ -1103,7 +1092,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 	dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
 	     dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
 
-	dir = ceph_get_dentry_parent_inode(dentry);
+	parent = dget_parent(dentry);
+	dir = d_inode(parent);
 
 	/* always trust cached snapped dentries, snapdir dentry */
 	if (ceph_snap(dir) != CEPH_NOSNAP) {
@@ -1121,13 +1111,48 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
 			valid = 1;
 	}
 
+	if (!valid) {
+		struct ceph_mds_client *mdsc =
+			ceph_sb_to_client(dir->i_sb)->mdsc;
+		struct ceph_mds_request *req;
+		int op, mask, err;
+
+		op = ceph_snap(dir) == CEPH_SNAPDIR ?
+			CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
+		req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
+		if (!IS_ERR(req)) {
+			req->r_dentry = dget(dentry);
+			req->r_num_caps = 2;
+
+			mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+			if (ceph_security_xattr_wanted(dir))
+				mask |= CEPH_CAP_XATTR_SHARED;
+			req->r_args.getattr.mask = mask;
+
+			req->r_locked_dir = dir;
+			err = ceph_mdsc_do_request(mdsc, NULL, req);
+			if (err == 0 || err == -ENOENT) {
+				if (dentry == req->r_dentry) {
+					valid = !d_unhashed(dentry);
+				} else {
+					d_invalidate(req->r_dentry);
+					err = -EAGAIN;
+				}
+			}
+			ceph_mdsc_put_request(req);
+			dout("d_revalidate %p lookup result=%d\n",
+			     dentry, err);
+		}
+	}
+
 	dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
 	if (valid) {
 		ceph_dentry_lru_touch(dentry);
 	} else {
 		ceph_dir_clear_complete(dir);
 	}
-	iput(dir);
+
+	dput(parent);
 	return valid;
 }
 
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 3b3172357326..6e72c98162d5 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -71,12 +71,18 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
 	inode = ceph_find_inode(sb, vino);
 	if (!inode) {
 		struct ceph_mds_request *req;
+		int mask;
 
 		req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
 					       USE_ANY_MDS);
 		if (IS_ERR(req))
 			return ERR_CAST(req);
 
+		mask = CEPH_STAT_CAP_INODE;
+		if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+			mask |= CEPH_CAP_XATTR_SHARED;
+		req->r_args.getattr.mask = cpu_to_le32(mask);
+
 		req->r_ino1 = vino;
 		req->r_num_caps = 1;
 		err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -128,6 +134,7 @@ static struct dentry *__get_parent(struct super_block *sb,
 	struct ceph_mds_request *req;
 	struct inode *inode;
 	struct dentry *dentry;
+	int mask;
 	int err;
 
 	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
@@ -144,6 +151,12 @@ static struct dentry *__get_parent(struct super_block *sb,
 			.snap = CEPH_NOSNAP,
 		};
 	}
+
+	mask = CEPH_STAT_CAP_INODE;
+	if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+		mask |= CEPH_CAP_XATTR_SHARED;
+	req->r_args.getattr.mask = cpu_to_le32(mask);
+
 	req->r_num_caps = 1;
 	err = ceph_mdsc_do_request(mdsc, NULL, req);
 	inode = req->r_target_inode;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index eb9028e8cfc5..ef38f01c1795 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -157,7 +157,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 	case S_IFDIR:
 		dout("init_file %p %p 0%o (regular)\n", inode, file,
 		     inode->i_mode);
-		cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO);
+		cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
 		if (cf == NULL) {
 			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 			return -ENOMEM;
@@ -300,6 +300,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 	struct ceph_mds_request *req;
 	struct dentry *dn;
 	struct ceph_acls_info acls = {};
+       int mask;
 	int err;
 
 	dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -335,6 +336,12 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 			acls.pagelist = NULL;
 		}
 	}
+
+       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+       if (ceph_security_xattr_wanted(dir))
+               mask |= CEPH_CAP_XATTR_SHARED;
+       req->r_args.open.mask = cpu_to_le32(mask);
+
 	req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
 	err = ceph_mdsc_do_request(mdsc,
 				   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
@@ -725,7 +732,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	ret = ceph_osdc_start_request(req->r_osdc, req, false);
 out:
 	if (ret < 0) {
-		BUG_ON(ret == -EOLDSNAPC);
 		req->r_result = ret;
 		ceph_aio_complete_req(req, NULL);
 	}
@@ -783,7 +789,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	int num_pages = 0;
 	int flags;
 	int ret;
-	struct timespec mtime = CURRENT_TIME;
+	struct timespec mtime = current_fs_time(inode->i_sb);
 	size_t count = iov_iter_count(iter);
 	loff_t pos = iocb->ki_pos;
 	bool write = iov_iter_rw(iter) == WRITE;
@@ -949,7 +955,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 				ret = ceph_osdc_start_request(req->r_osdc,
 							      req, false);
 			if (ret < 0) {
-				BUG_ON(ret == -EOLDSNAPC);
 				req->r_result = ret;
 				ceph_aio_complete_req(req, NULL);
 			}
@@ -988,7 +993,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 	int flags;
 	int check_caps = 0;
 	int ret;
-	struct timespec mtime = CURRENT_TIME;
+	struct timespec mtime = current_fs_time(inode->i_sb);
 	size_t count = iov_iter_count(from);
 
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e48fd8b23257..ed58b168904a 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -549,6 +549,10 @@ int ceph_fill_file_size(struct inode *inode, int issued,
 	if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
 	    (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
 		dout("size %lld -> %llu\n", inode->i_size, size);
+		if (size > 0 && S_ISDIR(inode->i_mode)) {
+			pr_err("fill_file_size non-zero size for directory\n");
+			size = 0;
+		}
 		i_size_write(inode, size);
 		inode->i_blocks = (size + (1<<9) - 1) >> 9;
 		ci->i_reported_size = size;
@@ -1261,6 +1265,7 @@ retry_lookup:
 			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
 			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
 			     ceph_vinop(in));
+			d_invalidate(dn);
 			have_lease = false;
 		}
 
@@ -1349,15 +1354,20 @@ static int fill_readdir_cache(struct inode *dir, struct dentry *dn,
 
 	if (!ctl->page || pgoff != page_index(ctl->page)) {
 		ceph_readdir_cache_release(ctl);
-		ctl->page  = grab_cache_page(&dir->i_data, pgoff);
+		if (idx == 0)
+			ctl->page = grab_cache_page(&dir->i_data, pgoff);
+		else
+			ctl->page = find_lock_page(&dir->i_data, pgoff);
 		if (!ctl->page) {
 			ctl->index = -1;
-			return -ENOMEM;
+			return idx == 0 ? -ENOMEM : 0;
 		}
 		/* reading/filling the cache are serialized by
 		 * i_mutex, no need to use page lock */
 		unlock_page(ctl->page);
 		ctl->dentries = kmap(ctl->page);
+		if (idx == 0)
+			memset(ctl->dentries, 0, PAGE_CACHE_SIZE);
 	}
 
 	if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) &&
@@ -1380,7 +1390,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 	struct qstr dname;
 	struct dentry *dn;
 	struct inode *in;
-	int err = 0, ret, i;
+	int err = 0, skipped = 0, ret, i;
 	struct inode *snapdir = NULL;
 	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
 	struct ceph_dentry_info *di;
@@ -1492,7 +1502,17 @@ retry_lookup:
 		}
 
 		if (d_really_is_negative(dn)) {
-			struct dentry *realdn = splice_dentry(dn, in);
+			struct dentry *realdn;
+
+			if (ceph_security_xattr_deadlock(in)) {
+				dout(" skip splicing dn %p to inode %p"
+				     " (security xattr deadlock)\n", dn, in);
+				iput(in);
+				skipped++;
+				goto next_item;
+			}
+
+			realdn = splice_dentry(dn, in);
 			if (IS_ERR(realdn)) {
 				err = PTR_ERR(realdn);
 				d_drop(dn);
@@ -1509,7 +1529,7 @@ retry_lookup:
 				    req->r_session,
 				    req->r_request_started);
 
-		if (err == 0 && cache_ctl.index >= 0) {
+		if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
 			ret = fill_readdir_cache(d_inode(parent), dn,
 						 &cache_ctl, req);
 			if (ret < 0)
@@ -1520,7 +1540,7 @@ next_item:
 			dput(dn);
 	}
 out:
-	if (err == 0) {
+	if (err == 0 && skipped == 0) {
 		req->r_did_prepopulate = true;
 		req->r_readdir_cache_idx = cache_ctl.index;
 	}
@@ -1950,7 +1970,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 	if (dirtied) {
 		inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied,
 							   &prealloc_cf);
-		inode->i_ctime = CURRENT_TIME;
+		inode->i_ctime = current_fs_time(inode->i_sb);
 	}
 
 	release &= issued;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 911d64d865f1..44852c3ae531 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1729,7 +1729,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
 	init_completion(&req->r_safe_completion);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 
-	req->r_stamp = CURRENT_TIME;
+	req->r_stamp = current_fs_time(mdsc->fsc->sb);
 
 	req->r_op = op;
 	req->r_direct_mode = mode;
@@ -2540,6 +2540,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
 	/* insert trace into our cache */
 	mutex_lock(&req->r_fill_mutex);
+	current->journal_info = req;
 	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
 	if (err == 0) {
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
@@ -2547,6 +2548,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 			ceph_readdir_prepopulate(req, req->r_session);
 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
+	current->journal_info = NULL;
 	mutex_unlock(&req->r_fill_mutex);
 
 	up_read(&mdsc->snap_rwsem);
@@ -3764,7 +3766,6 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
 
 	/* do we need it? */
-	ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
 	mutex_lock(&mdsc->mutex);
 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
 		dout("handle_map epoch %u <= our %u\n",
@@ -3791,6 +3792,8 @@ void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 	mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
 
 	__wake_requests(mdsc, &mdsc->waiting_for_map);
+	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
+			  mdsc->mdsmap->m_epoch);
 
 	mutex_unlock(&mdsc->mutex);
 	schedule_delayed(mdsc);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 4aa7122a8d38..9caaa7ffc93f 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,6 @@ static int cmpu64_rev(const void *a, const void *b)
 }
 
 
-struct ceph_snap_context *ceph_empty_snapc;
-
 /*
  * build the snap context for a given realm.
  */
@@ -987,17 +985,3 @@ out:
 		up_write(&mdsc->snap_rwsem);
 	return;
 }
-
-int __init ceph_snap_init(void)
-{
-	ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS);
-	if (!ceph_empty_snapc)
-		return -ENOMEM;
-	ceph_empty_snapc->seq = 1;
-	return 0;
-}
-
-void ceph_snap_exit(void)
-{
-	ceph_put_snap_context(ceph_empty_snapc);
-}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index ca4d5e8457f1..c973043deb0e 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -439,8 +439,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 
 	if (fsopt->flags & CEPH_MOUNT_OPT_DIRSTAT)
 		seq_puts(m, ",dirstat");
-	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES) == 0)
-		seq_puts(m, ",norbytes");
+	if ((fsopt->flags & CEPH_MOUNT_OPT_RBYTES))
+		seq_puts(m, ",rbytes");
 	if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
 		seq_puts(m, ",noasyncreaddir");
 	if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0)
@@ -530,7 +530,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 		goto fail;
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-	fsc->client->monc.want_mdsmap = 1;
+	ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
 
 	fsc->mount_options = fsopt;
 
@@ -793,22 +793,20 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
 	struct dentry *root;
 	int first = 0;   /* first vfsmount for this super_block */
 
-	dout("mount start\n");
+	dout("mount start %p\n", fsc);
 	mutex_lock(&fsc->client->mount_mutex);
 
-	err = __ceph_open_session(fsc->client, started);
-	if (err < 0)
-		goto out;
+	if (!fsc->sb->s_root) {
+		err = __ceph_open_session(fsc->client, started);
+		if (err < 0)
+			goto out;
 
-	dout("mount opening root\n");
-	root = open_root_dentry(fsc, "", started);
-	if (IS_ERR(root)) {
-		err = PTR_ERR(root);
-		goto out;
-	}
-	if (fsc->sb->s_root) {
-		dput(root);
-	} else {
+		dout("mount opening root\n");
+		root = open_root_dentry(fsc, "", started);
+		if (IS_ERR(root)) {
+			err = PTR_ERR(root);
+			goto out;
+		}
 		fsc->sb->s_root = root;
 		first = 1;
 
@@ -818,6 +816,7 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
 	}
 
 	if (path[0] == 0) {
+		root = fsc->sb->s_root;
 		dget(root);
 	} else {
 		dout("mount opening base mountpoint\n");
@@ -833,16 +832,14 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,
 	mutex_unlock(&fsc->client->mount_mutex);
 	return root;
 
-out:
-	mutex_unlock(&fsc->client->mount_mutex);
-	return ERR_PTR(err);
-
 fail:
 	if (first) {
 		dput(fsc->sb->s_root);
 		fsc->sb->s_root = NULL;
 	}
-	goto out;
+out:
+	mutex_unlock(&fsc->client->mount_mutex);
+	return ERR_PTR(err);
 }
 
 static int ceph_set_super(struct super_block *s, void *data)
@@ -1042,19 +1039,14 @@ static int __init init_ceph(void)
 
 	ceph_flock_init();
 	ceph_xattr_init();
-	ret = ceph_snap_init();
-	if (ret)
-		goto out_xattr;
 	ret = register_filesystem(&ceph_fs_type);
 	if (ret)
-		goto out_snap;
+		goto out_xattr;
 
 	pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL);
 
 	return 0;
 
-out_snap:
-	ceph_snap_exit();
 out_xattr:
 	ceph_xattr_exit();
 	destroy_caches();
@@ -1066,7 +1058,6 @@ static void __exit exit_ceph(void)
 {
 	dout("exit_ceph\n");
 	unregister_filesystem(&ceph_fs_type);
-	ceph_snap_exit();
 	ceph_xattr_exit();
 	destroy_caches();
 }
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 9c458eb52245..e705c4d612d7 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -37,8 +37,7 @@
 #define CEPH_MOUNT_OPT_FSCACHE         (1<<10) /* use fscache */
 #define CEPH_MOUNT_OPT_NOPOOLPERM      (1<<11) /* no pool permission check */
 
-#define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES | \
-				   CEPH_MOUNT_OPT_DCACHE)
+#define CEPH_MOUNT_OPT_DEFAULT    CEPH_MOUNT_OPT_DCACHE
 
 #define ceph_set_mount_opt(fsc, opt) \
 	(fsc)->mount_options->flags |= CEPH_MOUNT_OPT_##opt;
@@ -469,7 +468,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_POOL_PERM	(1 << 4)  /* pool rd/wr bits are valid */
 #define CEPH_I_POOL_RD		(1 << 5)  /* can read from pool */
 #define CEPH_I_POOL_WR		(1 << 6)  /* can write to pool */
-
+#define CEPH_I_SEC_INITED	(1 << 7)  /* security initialized */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
 					   long long release_count,
@@ -721,7 +720,6 @@ static inline int default_congestion_kb(void)
 
 
 /* snap.c */
-extern struct ceph_snap_context *ceph_empty_snapc;
 struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc,
 					       u64 ino);
 extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc,
@@ -738,8 +736,6 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci);
 extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
 				  struct ceph_cap_snap *capsnap);
 extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
-extern int ceph_snap_init(void);
-extern void ceph_snap_exit(void);
 
 /*
  * a cap_snap is "pending" if it is still awaiting an in-progress
@@ -808,6 +804,20 @@ extern void __init ceph_xattr_init(void);
 extern void ceph_xattr_exit(void);
 extern const struct xattr_handler *ceph_xattr_handlers[];
 
+#ifdef CONFIG_SECURITY
+extern bool ceph_security_xattr_deadlock(struct inode *in);
+extern bool ceph_security_xattr_wanted(struct inode *in);
+#else
+static inline bool ceph_security_xattr_deadlock(struct inode *in)
+{
+	return false;
+}
+static inline bool ceph_security_xattr_wanted(struct inode *in)
+{
+	return false;
+}
+#endif
+
 /* acl.c */
 struct ceph_acls_info {
 	void *default_acl;
@@ -947,7 +957,6 @@ extern void ceph_dentry_lru_touch(struct dentry *dn);
 extern void ceph_dentry_lru_del(struct dentry *dn);
 extern void ceph_invalidate_dentry_lease(struct dentry *dentry);
 extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn);
-extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry);
 extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl);
 
 /*
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 819163d8313b..9410abdef3ce 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -714,31 +714,62 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
 	}
 }
 
+static inline int __get_request_mask(struct inode *in) {
+	struct ceph_mds_request *req = current->journal_info;
+	int mask = 0;
+	if (req && req->r_target_inode == in) {
+		if (req->r_op == CEPH_MDS_OP_LOOKUP ||
+		    req->r_op == CEPH_MDS_OP_LOOKUPINO ||
+		    req->r_op == CEPH_MDS_OP_LOOKUPPARENT ||
+		    req->r_op == CEPH_MDS_OP_GETATTR) {
+			mask = le32_to_cpu(req->r_args.getattr.mask);
+		} else if (req->r_op == CEPH_MDS_OP_OPEN ||
+			   req->r_op == CEPH_MDS_OP_CREATE) {
+			mask = le32_to_cpu(req->r_args.open.mask);
+		}
+	}
+	return mask;
+}
+
 ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 		      size_t size)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int err;
 	struct ceph_inode_xattr *xattr;
 	struct ceph_vxattr *vxattr = NULL;
+	int req_mask;
+	int err;
 
 	if (!ceph_is_valid_xattr(name))
 		return -ENODATA;
 
 	/* let's see if a virtual xattr was requested */
 	vxattr = ceph_match_vxattr(inode, name);
-	if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
-		err = vxattr->getxattr_cb(ci, value, size);
+	if (vxattr) {
+		err = -ENODATA;
+		if (!(vxattr->exists_cb && !vxattr->exists_cb(ci)))
+			err = vxattr->getxattr_cb(ci, value, size);
 		return err;
 	}
 
+	req_mask = __get_request_mask(inode);
+
 	spin_lock(&ci->i_ceph_lock);
 	dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
 	     ci->i_xattrs.version, ci->i_xattrs.index_version);
 
 	if (ci->i_xattrs.version == 0 ||
-	    !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+	    !((req_mask & CEPH_CAP_XATTR_SHARED) ||
+	      __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
 		spin_unlock(&ci->i_ceph_lock);
+
+		/* security module gets xattr while filling trace */
+		if (current->journal_info != NULL) {
+			pr_warn_ratelimited("sync getxattr %p "
+					    "during filling trace\n", inode);
+			return -EBUSY;
+		}
+
 		/* get xattrs from mds (if we don't already have them) */
 		err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
 		if (err)
@@ -765,6 +796,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 
 	memcpy(value, xattr->val, xattr->val_len);
 
+	if (current->journal_info != NULL &&
+	    !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
+		ci->i_ceph_flags |= CEPH_I_SEC_INITED;
 out:
 	spin_unlock(&ci->i_ceph_lock);
 	return err;
@@ -999,7 +1033,7 @@ retry:
 		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
 					       &prealloc_cf);
 		ci->i_xattrs.dirty = true;
-		inode->i_ctime = CURRENT_TIME;
+		inode->i_ctime = current_fs_time(inode->i_sb);
 	}
 
 	spin_unlock(&ci->i_ceph_lock);
@@ -1015,7 +1049,15 @@ do_sync:
 do_sync_unlocked:
 	if (lock_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
-	err = ceph_sync_setxattr(dentry, name, value, size, flags);
+
+	/* security module set xattr while filling trace */
+	if (current->journal_info != NULL) {
+		pr_warn_ratelimited("sync setxattr %p "
+				    "during filling trace\n", inode);
+		err = -EBUSY;
+	} else {
+		err = ceph_sync_setxattr(dentry, name, value, size, flags);
+	}
 out:
 	ceph_free_cap_flush(prealloc_cf);
 	kfree(newname);
@@ -1136,7 +1178,7 @@ retry:
 	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL,
 				       &prealloc_cf);
 	ci->i_xattrs.dirty = true;
-	inode->i_ctime = CURRENT_TIME;
+	inode->i_ctime = current_fs_time(inode->i_sb);
 	spin_unlock(&ci->i_ceph_lock);
 	if (lock_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
@@ -1164,3 +1206,25 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
 
 	return __ceph_removexattr(dentry, name);
 }
+
+#ifdef CONFIG_SECURITY
+bool ceph_security_xattr_wanted(struct inode *in)
+{
+	return in->i_security != NULL;
+}
+
+bool ceph_security_xattr_deadlock(struct inode *in)
+{
+	struct ceph_inode_info *ci;
+	bool ret;
+	if (in->i_security == NULL)
+		return false;
+	ci = ceph_inode(in);
+	spin_lock(&ci->i_ceph_lock);
+	ret = !(ci->i_ceph_flags & CEPH_I_SEC_INITED) &&
+	      !(ci->i_xattrs.version > 0 &&
+		__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0));
+	spin_unlock(&ci->i_ceph_lock);
+	return ret;
+}
+#endif