summary refs log tree commit diff
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2017-09-12 20:03:53 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2017-09-12 20:03:53 -0700
commitcdb897e3279ad1677138d6bdf1cfaf1393718a08 (patch)
tree05c5162aadcee8e56a384ef059adcb7e85c48d43 /fs
parentb31ff3cdf540110da4572e3e29bd172087af65cc (diff)
parent15b51bd6badbb373c723aa019cf530c8263efd7e (diff)
downloadlinux-cdb897e3279ad1677138d6bdf1cfaf1393718a08.tar.gz
Merge tag 'ceph-for-4.14-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
 "The highlights include:

   - a large series of fixes and improvements to the snapshot-handling
     code (Zheng Yan)

   - individual read/write OSD requests passed down to libceph are now
     limited to 16M in size to avoid hitting OSD-side limits (Zheng Yan)

   - encode MStatfs v2 message to allow for more accurate space usage
     reporting (Douglas Fuller)

   - switch to the new writeback error tracking infrastructure (Jeff
     Layton)"

* tag 'ceph-for-4.14-rc1' of git://github.com/ceph/ceph-client: (35 commits)
  ceph: stop on-going cached readdir if mds revokes FILE_SHARED cap
  ceph: wait on writeback after writing snapshot data
  ceph: fix capsnap dirty pages accounting
  ceph: ignore wbc->range_{start,end} when write back snapshot data
  ceph: fix "range cyclic" mode writepages
  ceph: cleanup local variables in ceph_writepages_start()
  ceph: optimize pagevec iterating in ceph_writepages_start()
  ceph: make writepage_nounlock() invalidate page that beyonds EOF
  ceph: properly get capsnap's size in get_oldest_context()
  ceph: remove stale check in ceph_invalidatepage()
  ceph: queue cap snap only when snap realm's context changes
  ceph: handle race between vmtruncate and queuing cap snap
  ceph: fix message order check in handle_cap_export()
  ceph: fix NULL pointer dereference in ceph_flush_snaps()
  ceph: adjust 36 checks for NULL pointers
  ceph: delete an unnecessary return statement in update_dentry_lease()
  ceph: ENOMEM pr_err in __get_or_create_frag() is redundant
  ceph: check negative offsets in ceph_llseek()
  ceph: more accurate statfs
  ceph: properly set snap follows for cap reconnect
  ...
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/addr.c403
-rw-r--r--fs/ceph/cache.c2
-rw-r--r--fs/ceph/caps.c40
-rw-r--r--fs/ceph/debugfs.c2
-rw-r--r--fs/ceph/dir.c6
-rw-r--r--fs/ceph/file.c50
-rw-r--r--fs/ceph/inode.c53
-rw-r--r--fs/ceph/mds_client.c37
-rw-r--r--fs/ceph/mdsmap.c6
-rw-r--r--fs/ceph/snap.c37
-rw-r--r--fs/ceph/super.c78
-rw-r--r--fs/ceph/super.h16
-rw-r--r--fs/ceph/xattr.c8
13 files changed, 408 insertions, 330 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 1bc709fe330a..b3e3edc09d80 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -152,17 +152,10 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset,
 
 	ceph_invalidate_fscache_page(inode, page);
 
+	WARN_ON(!PageLocked(page));
 	if (!PagePrivate(page))
 		return;
 
-	/*
-	 * We can get non-dirty pages here due to races between
-	 * set_page_dirty and truncate_complete_page; just spit out a
-	 * warning, in case we end up with accounting problems later.
-	 */
-	if (!PageDirty(page))
-		pr_err("%p invalidatepage %p page not dirty\n", inode, page);
-
 	ClearPageChecked(page);
 
 	dout("%p invalidatepage %p idx %lu full dirty page\n",
@@ -455,13 +448,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 	if (rc == 0)
 		goto out;
 
-	if (fsc->mount_options->rsize >= PAGE_SIZE)
-		max = (fsc->mount_options->rsize + PAGE_SIZE - 1)
-			>> PAGE_SHIFT;
-
-	dout("readpages %p file %p nr_pages %d max %d\n", inode,
-		file, nr_pages,
-	     max);
+	max = fsc->mount_options->rsize >> PAGE_SHIFT;
+	dout("readpages %p file %p nr_pages %d max %d\n",
+	     inode, file, nr_pages, max);
 	while (!list_empty(page_list)) {
 		rc = start_read(inode, page_list, max);
 		if (rc < 0)
@@ -474,14 +463,22 @@ out:
 	return rc;
 }
 
+struct ceph_writeback_ctl
+{
+	loff_t i_size;
+	u64 truncate_size;
+	u32 truncate_seq;
+	bool size_stable;
+	bool head_snapc;
+};
+
 /*
  * Get ref for the oldest snapc for an inode with dirty data... that is, the
  * only snap context we are allowed to write back.
  */
-static struct ceph_snap_context *get_oldest_context(struct inode *inode,
-						    loff_t *snap_size,
-						    u64 *truncate_size,
-						    u32 *truncate_seq)
+static struct ceph_snap_context *
+get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
+		   struct ceph_snap_context *page_snapc)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_snap_context *snapc = NULL;
@@ -491,30 +488,78 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
 	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
 		     capsnap->context, capsnap->dirty_pages);
-		if (capsnap->dirty_pages) {
-			snapc = ceph_get_snap_context(capsnap->context);
-			if (snap_size)
-				*snap_size = capsnap->size;
-			if (truncate_size)
-				*truncate_size = capsnap->truncate_size;
-			if (truncate_seq)
-				*truncate_seq = capsnap->truncate_seq;
-			break;
+		if (!capsnap->dirty_pages)
+			continue;
+
+		/* get i_size, truncate_{seq,size} for page_snapc? */
+		if (snapc && capsnap->context != page_snapc)
+			continue;
+
+		if (ctl) {
+			if (capsnap->writing) {
+				ctl->i_size = i_size_read(inode);
+				ctl->size_stable = false;
+			} else {
+				ctl->i_size = capsnap->size;
+				ctl->size_stable = true;
+			}
+			ctl->truncate_size = capsnap->truncate_size;
+			ctl->truncate_seq = capsnap->truncate_seq;
+			ctl->head_snapc = false;
 		}
+
+		if (snapc)
+			break;
+
+		snapc = ceph_get_snap_context(capsnap->context);
+		if (!page_snapc ||
+		    page_snapc == snapc ||
+		    page_snapc->seq > snapc->seq)
+			break;
 	}
 	if (!snapc && ci->i_wrbuffer_ref_head) {
 		snapc = ceph_get_snap_context(ci->i_head_snapc);
 		dout(" head snapc %p has %d dirty pages\n",
 		     snapc, ci->i_wrbuffer_ref_head);
-		if (truncate_size)
-			*truncate_size = ci->i_truncate_size;
-		if (truncate_seq)
-			*truncate_seq = ci->i_truncate_seq;
+		if (ctl) {
+			ctl->i_size = i_size_read(inode);
+			ctl->truncate_size = ci->i_truncate_size;
+			ctl->truncate_seq = ci->i_truncate_seq;
+			ctl->size_stable = false;
+			ctl->head_snapc = true;
+		}
 	}
 	spin_unlock(&ci->i_ceph_lock);
 	return snapc;
 }
 
+static u64 get_writepages_data_length(struct inode *inode,
+				      struct page *page, u64 start)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_snap_context *snapc = page_snap_context(page);
+	struct ceph_cap_snap *capsnap = NULL;
+	u64 end = i_size_read(inode);
+
+	if (snapc != ci->i_head_snapc) {
+		bool found = false;
+		spin_lock(&ci->i_ceph_lock);
+		list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
+			if (capsnap->context == snapc) {
+				if (!capsnap->writing)
+					end = capsnap->size;
+				found = true;
+				break;
+			}
+		}
+		spin_unlock(&ci->i_ceph_lock);
+		WARN_ON(!found);
+	}
+	if (end > page_offset(page) + PAGE_SIZE)
+		end = page_offset(page) + PAGE_SIZE;
+	return end > start ? end - start : 0;
+}
+
 /*
  * Write a single page, but leave the page locked.
  *
@@ -526,30 +571,25 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	struct inode *inode;
 	struct ceph_inode_info *ci;
 	struct ceph_fs_client *fsc;
-	struct ceph_osd_client *osdc;
 	struct ceph_snap_context *snapc, *oldest;
 	loff_t page_off = page_offset(page);
-	loff_t snap_size = -1;
 	long writeback_stat;
-	u64 truncate_size;
-	u32 truncate_seq;
 	int err, len = PAGE_SIZE;
+	struct ceph_writeback_ctl ceph_wbc;
 
 	dout("writepage %p idx %lu\n", page, page->index);
 
 	inode = page->mapping->host;
 	ci = ceph_inode(inode);
 	fsc = ceph_inode_to_client(inode);
-	osdc = &fsc->client->osdc;
 
 	/* verify this is a writeable snap context */
 	snapc = page_snap_context(page);
-	if (snapc == NULL) {
+	if (!snapc) {
 		dout("writepage %p page %p not dirty?\n", inode, page);
 		return 0;
 	}
-	oldest = get_oldest_context(inode, &snap_size,
-				    &truncate_size, &truncate_seq);
+	oldest = get_oldest_context(inode, &ceph_wbc, snapc);
 	if (snapc->seq > oldest->seq) {
 		dout("writepage %p page %p snapc %p not writeable - noop\n",
 		     inode, page, snapc);
@@ -561,20 +601,18 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 	}
 	ceph_put_snap_context(oldest);
 
-	if (snap_size == -1)
-		snap_size = i_size_read(inode);
-
 	/* is this a partial page at end of file? */
-	if (page_off >= snap_size) {
-		dout("%p page eof %llu\n", page, snap_size);
+	if (page_off >= ceph_wbc.i_size) {
+		dout("%p page eof %llu\n", page, ceph_wbc.i_size);
+		page->mapping->a_ops->invalidatepage(page, 0, PAGE_SIZE);
 		return 0;
 	}
 
-	if (snap_size < page_off + len)
-		len = snap_size - page_off;
+	if (ceph_wbc.i_size < page_off + len)
+		len = ceph_wbc.i_size - page_off;
 
-	dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
-	     inode, page, page->index, page_off, len, snapc);
+	dout("writepage %p page %p index %lu on %llu~%u snapc %p seq %lld\n",
+	     inode, page, page->index, page_off, len, snapc, snapc->seq);
 
 	writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
 	if (writeback_stat >
@@ -582,10 +620,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
 		set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
 
 	set_page_writeback(page);
-	err = ceph_osdc_writepages(osdc, ceph_vino(inode),
-				   &ci->i_layout, snapc,
-				   page_off, len,
-				   truncate_seq, truncate_size,
+	err = ceph_osdc_writepages(&fsc->client->osdc, ceph_vino(inode),
+				   &ci->i_layout, snapc, page_off, len,
+				   ceph_wbc.truncate_seq,
+				   ceph_wbc.truncate_size,
 				   &inode->i_mtime, &page, 1);
 	if (err < 0) {
 		struct writeback_control tmp_wbc;
@@ -746,31 +784,17 @@ static int ceph_writepages_start(struct address_space *mapping,
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_vino vino = ceph_vino(inode);
-	pgoff_t index, start, end;
-	int range_whole = 0;
-	int should_loop = 1;
-	pgoff_t max_pages = 0, max_pages_ever = 0;
+	pgoff_t index, start_index, end = -1;
 	struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
 	struct pagevec pvec;
-	int done = 0;
 	int rc = 0;
 	unsigned int wsize = i_blocksize(inode);
 	struct ceph_osd_request *req = NULL;
-	int do_sync = 0;
-	loff_t snap_size, i_size;
-	u64 truncate_size;
-	u32 truncate_seq;
+	struct ceph_writeback_ctl ceph_wbc;
+	bool should_loop, range_whole = false;
+	bool stop, done = false;
 
-	/*
-	 * Include a 'sync' in the OSD request if this is a data
-	 * integrity write (e.g., O_SYNC write or fsync()), or if our
-	 * cap is being revoked.
-	 */
-	if ((wbc->sync_mode == WB_SYNC_ALL) ||
-		ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
-		do_sync = 1;
-	dout("writepages_start %p dosync=%d (mode=%s)\n",
-	     inode, do_sync,
+	dout("writepages_start %p (mode=%s)\n", inode,
 	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
 	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 
@@ -783,35 +807,17 @@ static int ceph_writepages_start(struct address_space *mapping,
 		mapping_set_error(mapping, -EIO);
 		return -EIO; /* we're in a forced umount, don't write! */
 	}
-	if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
+	if (fsc->mount_options->wsize < wsize)
 		wsize = fsc->mount_options->wsize;
-	if (wsize < PAGE_SIZE)
-		wsize = PAGE_SIZE;
-	max_pages_ever = wsize >> PAGE_SHIFT;
 
 	pagevec_init(&pvec, 0);
 
-	/* where to start/end? */
-	if (wbc->range_cyclic) {
-		start = mapping->writeback_index; /* Start from prev offset */
-		end = -1;
-		dout(" cyclic, start at %lu\n", start);
-	} else {
-		start = wbc->range_start >> PAGE_SHIFT;
-		end = wbc->range_end >> PAGE_SHIFT;
-		if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
-			range_whole = 1;
-		should_loop = 0;
-		dout(" not cyclic, %lu to %lu\n", start, end);
-	}
-	index = start;
+	start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
+	index = start_index;
 
 retry:
 	/* find oldest snap context with dirty data */
-	ceph_put_snap_context(snapc);
-	snap_size = -1;
-	snapc = get_oldest_context(inode, &snap_size,
-				   &truncate_size, &truncate_seq);
+	snapc = get_oldest_context(inode, &ceph_wbc, NULL);
 	if (!snapc) {
 		/* hmm, why does writepages get called when there
 		   is no dirty data? */
@@ -821,40 +827,56 @@ retry:
 	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
 	     snapc, snapc->seq, snapc->num_snaps);
 
-	i_size = i_size_read(inode);
-
-	if (last_snapc && snapc != last_snapc) {
-		/* if we switched to a newer snapc, restart our scan at the
-		 * start of the original file range. */
-		dout("  snapc differs from last pass, restarting at %lu\n",
-		     index);
-		index = start;
+	should_loop = false;
+	if (ceph_wbc.head_snapc && snapc != last_snapc) {
+		/* where to start/end? */
+		if (wbc->range_cyclic) {
+			index = start_index;
+			end = -1;
+			if (index > 0)
+				should_loop = true;
+			dout(" cyclic, start at %lu\n", index);
+		} else {
+			index = wbc->range_start >> PAGE_SHIFT;
+			end = wbc->range_end >> PAGE_SHIFT;
+			if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
+				range_whole = true;
+			dout(" not cyclic, %lu to %lu\n", index, end);
+		}
+	} else if (!ceph_wbc.head_snapc) {
+		/* Do not respect wbc->range_{start,end}. Dirty pages
+		 * in that range can be associated with newer snapc.
+		 * They are not writeable until we write all dirty pages
+		 * associated with 'snapc' get written */
+		if (index > 0 || wbc->sync_mode != WB_SYNC_NONE)
+			should_loop = true;
+		dout(" non-head snapc, range whole\n");
 	}
+
+	ceph_put_snap_context(last_snapc);
 	last_snapc = snapc;
 
-	while (!done && index <= end) {
-		unsigned i;
-		int first;
-		pgoff_t strip_unit_end = 0;
+	stop = false;
+	while (!stop && index <= end) {
 		int num_ops = 0, op_idx;
-		int pvec_pages, locked_pages = 0;
+		unsigned i, pvec_pages, max_pages, locked_pages = 0;
 		struct page **pages = NULL, **data_pages;
 		mempool_t *pool = NULL;	/* Becomes non-null if mempool used */
 		struct page *page;
-		int want;
+		pgoff_t strip_unit_end = 0;
 		u64 offset = 0, len = 0;
 
-		max_pages = max_pages_ever;
+		max_pages = wsize >> PAGE_SHIFT;
 
 get_more_pages:
-		first = -1;
-		want = min(end - index,
-			   min((pgoff_t)PAGEVEC_SIZE,
-			       max_pages - (pgoff_t)locked_pages) - 1)
-			+ 1;
+		pvec_pages = min_t(unsigned, PAGEVEC_SIZE,
+				   max_pages - locked_pages);
+		if (end - index < (u64)(pvec_pages - 1))
+			pvec_pages = (unsigned)(end - index) + 1;
+
 		pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
 						PAGECACHE_TAG_DIRTY,
-						want);
+						pvec_pages);
 		dout("pagevec_lookup_tag got %d\n", pvec_pages);
 		if (!pvec_pages && !locked_pages)
 			break;
@@ -871,11 +893,15 @@ get_more_pages:
 			    unlikely(page->mapping != mapping)) {
 				dout("!dirty or !mapping %p\n", page);
 				unlock_page(page);
-				break;
+				continue;
 			}
-			if (!wbc->range_cyclic && page->index > end) {
+			if (page->index > end) {
 				dout("end of range %p\n", page);
-				done = 1;
+				/* can't be range_cyclic (1st pass) because
+				 * end == -1 in that case. */
+				stop = true;
+				if (ceph_wbc.head_snapc)
+					done = true;
 				unlock_page(page);
 				break;
 			}
@@ -884,39 +910,37 @@ get_more_pages:
 				unlock_page(page);
 				break;
 			}
-			if (wbc->sync_mode != WB_SYNC_NONE) {
-				dout("waiting on writeback %p\n", page);
-				wait_on_page_writeback(page);
-			}
-			if (page_offset(page) >=
-			    (snap_size == -1 ? i_size : snap_size)) {
-				dout("%p page eof %llu\n", page,
-				     (snap_size == -1 ? i_size : snap_size));
-				done = 1;
+			if (page_offset(page) >= ceph_wbc.i_size) {
+				dout("%p page eof %llu\n",
+				     page, ceph_wbc.i_size);
+				/* not done if range_cyclic */
+				stop = true;
 				unlock_page(page);
 				break;
 			}
 			if (PageWriteback(page)) {
-				dout("%p under writeback\n", page);
-				unlock_page(page);
-				break;
+				if (wbc->sync_mode == WB_SYNC_NONE) {
+					dout("%p under writeback\n", page);
+					unlock_page(page);
+					continue;
+				}
+				dout("waiting on writeback %p\n", page);
+				wait_on_page_writeback(page);
 			}
 
 			/* only if matching snap context */
 			pgsnapc = page_snap_context(page);
-			if (pgsnapc->seq > snapc->seq) {
-				dout("page snapc %p %lld > oldest %p %lld\n",
+			if (pgsnapc != snapc) {
+				dout("page snapc %p %lld != oldest %p %lld\n",
 				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
 				unlock_page(page);
-				if (!locked_pages)
-					continue; /* keep looking for snap */
-				break;
+				continue;
 			}
 
 			if (!clear_page_dirty_for_io(page)) {
 				dout("%p !clear_page_dirty_for_io\n", page);
 				unlock_page(page);
-				break;
+				continue;
 			}
 
 			/*
@@ -942,7 +966,7 @@ get_more_pages:
 					break;
 				}
 
-				num_ops = 1 + do_sync;
+				num_ops = 1;
 				strip_unit_end = page->index +
 					((len - 1) >> PAGE_SHIFT);
 
@@ -972,8 +996,6 @@ get_more_pages:
 			}
 
 			/* note position of first page in pvec */
-			if (first < 0)
-				first = i;
 			dout("%p will write page %p idx %lu\n",
 			     inode, page, page->index);
 
@@ -984,8 +1006,10 @@ get_more_pages:
 						  BLK_RW_ASYNC);
 			}
 
-			pages[locked_pages] = page;
-			locked_pages++;
+
+			pages[locked_pages++] = page;
+			pvec.pages[i] = NULL;
+
 			len += PAGE_SIZE;
 		}
 
@@ -993,23 +1017,23 @@ get_more_pages:
 		if (!locked_pages)
 			goto release_pvec_pages;
 		if (i) {
-			int j;
-			BUG_ON(!locked_pages || first < 0);
+			unsigned j, n = 0;
+			/* shift unused page to beginning of pvec */
+			for (j = 0; j < pvec_pages; j++) {
+				if (!pvec.pages[j])
+					continue;
+				if (n < j)
+					pvec.pages[n] = pvec.pages[j];
+				n++;
+			}
+			pvec.nr = n;
 
 			if (pvec_pages && i == pvec_pages &&
 			    locked_pages < max_pages) {
 				dout("reached end pvec, trying for more\n");
-				pagevec_reinit(&pvec);
+				pagevec_release(&pvec);
 				goto get_more_pages;
 			}
-
-			/* shift unused pages over in the pvec...  we
-			 * will need to release them below. */
-			for (j = i; j < pvec_pages; j++) {
-				dout(" pvec leftover page %p\n", pvec.pages[j]);
-				pvec.pages[j-i+first] = pvec.pages[j];
-			}
-			pvec.nr -= i-first;
 		}
 
 new_request:
@@ -1019,10 +1043,9 @@ new_request:
 		req = ceph_osdc_new_request(&fsc->client->osdc,
 					&ci->i_layout, vino,
 					offset, &len, 0, num_ops,
-					CEPH_OSD_OP_WRITE,
-					CEPH_OSD_FLAG_WRITE,
-					snapc, truncate_seq,
-					truncate_size, false);
+					CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
+					snapc, ceph_wbc.truncate_seq,
+					ceph_wbc.truncate_size, false);
 		if (IS_ERR(req)) {
 			req = ceph_osdc_new_request(&fsc->client->osdc,
 						&ci->i_layout, vino,
@@ -1031,8 +1054,8 @@ new_request:
 						    CEPH_OSD_SLAB_OPS),
 						CEPH_OSD_OP_WRITE,
 						CEPH_OSD_FLAG_WRITE,
-						snapc, truncate_seq,
-						truncate_size, true);
+						snapc, ceph_wbc.truncate_seq,
+						ceph_wbc.truncate_size, true);
 			BUG_ON(IS_ERR(req));
 		}
 		BUG_ON(len < page_offset(pages[locked_pages - 1]) +
@@ -1048,7 +1071,7 @@ new_request:
 		for (i = 0; i < locked_pages; i++) {
 			u64 cur_offset = page_offset(pages[i]);
 			if (offset + len != cur_offset) {
-				if (op_idx + do_sync + 1 == req->r_num_ops)
+				if (op_idx + 1 == req->r_num_ops)
 					break;
 				osd_req_op_extent_dup_last(req, op_idx,
 							   cur_offset - offset);
@@ -1069,14 +1092,15 @@ new_request:
 			len += PAGE_SIZE;
 		}
 
-		if (snap_size != -1) {
-			len = min(len, snap_size - offset);
+		if (ceph_wbc.size_stable) {
+			len = min(len, ceph_wbc.i_size - offset);
 		} else if (i == locked_pages) {
 			/* writepages_finish() clears writeback pages
 			 * according to the data length, so make sure
 			 * data length covers all locked pages */
 			u64 min_len = len + 1 - PAGE_SIZE;
-			len = min(len, (u64)i_size_read(inode) - offset);
+			len = get_writepages_data_length(inode, pages[i - 1],
+							 offset);
 			len = max(len, min_len);
 		}
 		dout("writepages got pages at %llu~%llu\n", offset, len);
@@ -1085,17 +1109,12 @@ new_request:
 						 0, !!pool, false);
 		osd_req_op_extent_update(req, op_idx, len);
 
-		if (do_sync) {
-			op_idx++;
-			osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0);
-		}
 		BUG_ON(op_idx + 1 != req->r_num_ops);
 
 		pool = NULL;
 		if (i < locked_pages) {
 			BUG_ON(num_ops <= req->r_num_ops);
 			num_ops -= req->r_num_ops;
-			num_ops += do_sync;
 			locked_pages -= i;
 
 			/* allocate new pages array for next request */
@@ -1127,22 +1146,50 @@ new_request:
 		if (pages)
 			goto new_request;
 
-		if (wbc->nr_to_write <= 0)
-			done = 1;
+		/*
+		 * We stop writing back only if we are not doing
+		 * integrity sync. In case of integrity sync we have to
+		 * keep going until we have written all the pages
+		 * we tagged for writeback prior to entering this loop.
+		 */
+		if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
+			done = stop = true;
 
 release_pvec_pages:
 		dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
 		     pvec.nr ? pvec.pages[0] : NULL);
 		pagevec_release(&pvec);
-
-		if (locked_pages && !done)
-			goto retry;
 	}
 
 	if (should_loop && !done) {
 		/* more to do; loop back to beginning of file */
 		dout("writepages looping back to beginning of file\n");
-		should_loop = 0;
+		end = start_index - 1; /* OK even when start_index == 0 */
+
+		/* to write dirty pages associated with next snapc,
+		 * we need to wait until current writes complete */
+		if (wbc->sync_mode != WB_SYNC_NONE &&
+		    start_index == 0 && /* all dirty pages were checked */
+		    !ceph_wbc.head_snapc) {
+			struct page *page;
+			unsigned i, nr;
+			index = 0;
+			while ((index <= end) &&
+			       (nr = pagevec_lookup_tag(&pvec, mapping, &index,
+							PAGECACHE_TAG_WRITEBACK,
+							PAGEVEC_SIZE))) {
+				for (i = 0; i < nr; i++) {
+					page = pvec.pages[i];
+					if (page_snap_context(page) != snapc)
+						continue;
+					wait_on_page_writeback(page);
+				}
+				pagevec_release(&pvec);
+				cond_resched();
+			}
+		}
+
+		start_index = 0;
 		index = 0;
 		goto retry;
 	}
@@ -1152,8 +1199,8 @@ release_pvec_pages:
 
 out:
 	ceph_osdc_put_request(req);
-	ceph_put_snap_context(snapc);
-	dout("writepages done, rc = %d\n", rc);
+	ceph_put_snap_context(last_snapc);
+	dout("writepages dend - startone, rc = %d\n", rc);
 	return rc;
 }
 
@@ -1165,8 +1212,7 @@ out:
 static int context_is_writeable_or_written(struct inode *inode,
 					   struct ceph_snap_context *snapc)
 {
-	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL,
-							      NULL, NULL);
+	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
 	int ret = !oldest || snapc->seq <= oldest->seq;
 
 	ceph_put_snap_context(oldest);
@@ -1211,8 +1257,7 @@ retry_locked:
 		 * this page is already dirty in another (older) snap
 		 * context!  is it writeable now?
 		 */
-		oldest = get_oldest_context(inode, NULL, NULL, NULL);
-
+		oldest = get_oldest_context(inode, NULL, NULL);
 		if (snapc->seq > oldest->seq) {
 			ceph_put_snap_context(oldest);
 			dout(" page %p snapc %p not current or oldest\n",
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 174d6e6569a8..a3ab265d3215 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -209,7 +209,7 @@ void ceph_fscache_register_inode_cookie(struct inode *inode)
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 
 	/* No caching for filesystem */
-	if (fsc->fscache == NULL)
+	if (!fsc->fscache)
 		return;
 
 	/* Only cache for regular files that are read only */
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 7007ae2a5ad2..157fe59fbabe 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -490,13 +490,14 @@ static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
 	}
 
 	/*
-	 * if we are newly issued FILE_SHARED, mark dir not complete; we
-	 * don't know what happened to this directory while we didn't
-	 * have the cap.
+	 * If FILE_SHARED is newly issued, mark dir not complete. We don't
+	 * know what happened to this directory while we didn't have the cap.
+	 * If FILE_SHARED is being revoked, also mark dir not complete. It
+	 * stops on-going cached readdir.
 	 */
-	if ((issued & CEPH_CAP_FILE_SHARED) &&
-	    (had & CEPH_CAP_FILE_SHARED) == 0) {
-		ci->i_shared_gen++;
+	if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
+		if (issued & CEPH_CAP_FILE_SHARED)
+			ci->i_shared_gen++;
 		if (S_ISDIR(ci->vfs_inode.i_mode)) {
 			dout(" marking %p NOT complete\n", &ci->vfs_inode);
 			__ceph_dir_clear_complete(ci);
@@ -611,7 +612,7 @@ void ceph_add_cap(struct inode *inode,
 	}
 
 	if (flags & CEPH_CAP_FLAG_AUTH) {
-		if (ci->i_auth_cap == NULL ||
+		if (!ci->i_auth_cap ||
 		    ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
 			ci->i_auth_cap = cap;
 			cap->mds_wanted = wanted;
@@ -728,7 +729,7 @@ static void __touch_cap(struct ceph_cap *cap)
 	struct ceph_mds_session *s = cap->session;
 
 	spin_lock(&s->s_cap_lock);
-	if (s->s_cap_iterator == NULL) {
+	if (!s->s_cap_iterator) {
 		dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
 		     s->s_mds);
 		list_move_tail(&cap->session_caps, &s->s_caps);
@@ -1248,7 +1249,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 	arg.mode = inode->i_mode;
 
 	arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
-	arg.flags = 0;
+	if (list_empty(&ci->i_cap_snaps))
+		arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP;
+	else
+		arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
 	if (sync)
 		arg.flags |= CEPH_CLIENT_CAPS_SYNC;
 
@@ -1454,13 +1458,19 @@ retry:
 		goto retry;
 	}
 
+	// make sure flushsnap messages are sent in proper order.
+	if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+		__kick_flushing_caps(mdsc, session, ci, 0);
+		ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+	}
+
 	__ceph_flush_snaps(ci, session);
 out:
 	spin_unlock(&ci->i_ceph_lock);
 
 	if (psession) {
 		*psession = session;
-	} else {
+	} else if (session) {
 		mutex_unlock(&session->s_mutex);
 		ceph_put_mds_session(session);
 	}
@@ -1901,11 +1911,7 @@ ack:
 		    (ci->i_ceph_flags &
 		     (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
 			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
-				spin_lock(&mdsc->cap_dirty_lock);
-				oldest_flush_tid = __get_oldest_flush_tid(mdsc);
-				spin_unlock(&mdsc->cap_dirty_lock);
-				__kick_flushing_caps(mdsc, session, ci,
-						     oldest_flush_tid);
+				__kick_flushing_caps(mdsc, session, ci, 0);
 				ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
 			}
 			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
@@ -2110,7 +2116,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 
 	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
 
-	ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
+	ret = file_write_and_wait_range(file, start, end);
 	if (ret < 0)
 		goto out;
 
@@ -3422,7 +3428,7 @@ retry:
 	tcap = __get_cap_for_mds(ci, target);
 	if (tcap) {
 		/* already have caps from the target */
-		if (tcap->cap_id != t_cap_id ||
+		if (tcap->cap_id == t_cap_id &&
 		    ceph_seq_cmp(tcap->seq, t_seq) < 0) {
 			dout(" updating import cap %p mds%d\n", tcap, target);
 			tcap->cap_id = t_cap_id;
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 4e2d112c982f..d635496ea189 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -24,7 +24,7 @@ static int mdsmap_show(struct seq_file *s, void *p)
 	struct ceph_fs_client *fsc = s->private;
 	struct ceph_mdsmap *mdsmap;
 
-	if (fsc->mdsc == NULL || fsc->mdsc->mdsmap == NULL)
+	if (!fsc->mdsc || !fsc->mdsc->mdsmap)
 		return 0;
 	mdsmap = fsc->mdsc->mdsmap;
 	seq_printf(s, "epoch %d\n", mdsmap->m_epoch);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index ef7240ace576..019c2036d36f 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -377,8 +377,10 @@ more:
 		}
 		/* hints to request -> mds selection code */
 		req->r_direct_mode = USE_AUTH_MDS;
-		req->r_direct_hash = ceph_frag_value(frag);
-		__set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
+		if (op == CEPH_MDS_OP_READDIR) {
+			req->r_direct_hash = ceph_frag_value(frag);
+			__set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
+		}
 		if (fi->last_name) {
 			req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL);
 			if (!req->r_path2) {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3d48c415f3cb..65a6fa12c857 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -175,7 +175,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
 		dout("init_file %p %p 0%o (regular)\n", inode, file,
 		     inode->i_mode);
 		cf = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL);
-		if (cf == NULL) {
+		if (!cf) {
 			ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
 			return -ENOMEM;
 		}
@@ -562,8 +562,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
 	ssize_t ret;
 	size_t len = iov_iter_count(to);
 
-	dout("sync_read on file %p %llu~%u %s\n", file, off,
-	     (unsigned)len,
+	dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len,
 	     (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
 
 	if (!len)
@@ -788,7 +787,7 @@ static void ceph_aio_retry_work(struct work_struct *work)
 		goto out;
 	}
 
-	req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
+	req->r_flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
 	ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
 	ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
 
@@ -800,7 +799,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
 	}
 
 	req->r_ops[0] = orig_req->r_ops[0];
-	osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 
 	req->r_mtime = aio_req->mtime;
 	req->r_data_offset = req->r_ops[0].extent.offset;
@@ -847,8 +845,9 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 	if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 		return -EROFS;
 
-	dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
-	     (write ? "write" : "read"), file, pos, (unsigned)count);
+	dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
+	     (write ? "write" : "read"), file, pos, (unsigned)count,
+	     snapc, snapc->seq);
 
 	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 	if (ret < 0)
@@ -861,7 +860,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 		if (ret2 < 0)
 			dout("invalidate_inode_pages2_range returned %d\n", ret2);
 
-		flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
+		flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
 	} else {
 		flags = CEPH_OSD_FLAG_READ;
 	}
@@ -874,8 +873,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 					    vino, pos, &size, 0,
-					    /*include a 'startsync' command*/
-					    write ? 2 : 1,
+					    1,
 					    write ? CEPH_OSD_OP_WRITE :
 						    CEPH_OSD_OP_READ,
 					    flags, snapc,
@@ -887,6 +885,11 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 			break;
 		}
 
+		if (write)
+			size = min_t(u64, size, fsc->mount_options->wsize);
+		else
+			size = min_t(u64, size, fsc->mount_options->rsize);
+
 		len = size;
 		pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
 		if (IS_ERR(pages)) {
@@ -922,7 +925,6 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
 			truncate_inode_pages_range(inode->i_mapping, pos,
 					(pos+len) | (PAGE_SIZE - 1));
 
-			osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
 			req->r_mtime = mtime;
 		}
 
@@ -1048,7 +1050,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 		return -EROFS;
 
-	dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
+	dout("sync_write on file %p %lld~%u snapc %p seq %lld\n",
+	     file, pos, (unsigned)count, snapc, snapc->seq);
 
 	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 	if (ret < 0)
@@ -1060,7 +1063,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 	if (ret < 0)
 		dout("invalidate_inode_pages2_range returned %d\n", ret);
 
-	flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE;
+	flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
 
 	while ((len = iov_iter_count(from)) > 0) {
 		size_t left;
@@ -1307,6 +1310,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 	if (!prealloc_cf)
 		return -ENOMEM;
 
+retry_snap:
 	inode_lock(inode);
 
 	/* We can write back this queue in page reclaim */
@@ -1338,7 +1342,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
 			goto out;
 	}
 
-retry_snap:
 	/* FIXME: not complete since it doesn't account for being at quota */
 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
 		err = -ENOSPC;
@@ -1387,14 +1390,6 @@ retry_snap:
 							 &prealloc_cf);
 		else
 			written = ceph_sync_write(iocb, &data, pos, snapc);
-		if (written == -EOLDSNAPC) {
-			dout("aio_write %p %llx.%llx %llu~%u"
-				"got EOLDSNAPC, retrying\n",
-				inode, ceph_vinop(inode),
-				pos, (unsigned)count);
-			inode_lock(inode);
-			goto retry_snap;
-		}
 		if (written > 0)
 			iov_iter_advance(from, written);
 		ceph_put_snap_context(snapc);
@@ -1428,10 +1423,15 @@ retry_snap:
 	     ceph_cap_string(got));
 	ceph_put_cap_refs(ci, got);
 
+	if (written == -EOLDSNAPC) {
+		dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n",
+		     inode, ceph_vinop(inode), pos, (unsigned)count);
+		goto retry_snap;
+	}
+
 	if (written >= 0) {
 		if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_NEARFULL))
 			iocb->ki_flags |= IOCB_DSYNC;
-
 		written = generic_write_sync(iocb, written);
 	}
 
@@ -1481,13 +1481,13 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
 		offset += file->f_pos;
 		break;
 	case SEEK_DATA:
-		if (offset >= i_size) {
+		if (offset < 0 || offset >= i_size) {
 			ret = -ENXIO;
 			goto out;
 		}
 		break;
 	case SEEK_HOLE:
-		if (offset >= i_size) {
+		if (offset < 0 || offset >= i_size) {
 			ret = -ENXIO;
 			goto out;
 		}
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 220dfd87cbfa..373dab5173ca 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -52,7 +52,7 @@ struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
 	ino_t t = ceph_vino_to_ino(vino);
 
 	inode = iget5_locked(sb, t, ceph_ino_compare, ceph_set_ino_cb, &vino);
-	if (inode == NULL)
+	if (!inode)
 		return ERR_PTR(-ENOMEM);
 	if (inode->i_state & I_NEW) {
 		dout("get_inode created new inode %p %llx.%llx ino %llx\n",
@@ -133,12 +133,9 @@ static struct ceph_inode_frag *__get_or_create_frag(struct ceph_inode_info *ci,
 	}
 
 	frag = kmalloc(sizeof(*frag), GFP_NOFS);
-	if (!frag) {
-		pr_err("__get_or_create_frag ENOMEM on %p %llx.%llx "
-		       "frag %x\n", &ci->vfs_inode,
-		       ceph_vinop(&ci->vfs_inode), f);
+	if (!frag)
 		return ERR_PTR(-ENOMEM);
-	}
+
 	frag->frag = f;
 	frag->split_by = 0;
 	frag->mds = -1;
@@ -1070,7 +1067,6 @@ out_unlock:
 	spin_unlock(&dentry->d_lock);
 	if (old_lease_session)
 		ceph_put_mds_session(old_lease_session);
-	return;
 }
 
 /*
@@ -1177,7 +1173,7 @@ retry_lookup:
 				dn = d_alloc(parent, &dname);
 				dout("d_alloc %p '%.*s' = %p\n", parent,
 				     dname.len, dname.name, dn);
-				if (dn == NULL) {
+				if (!dn) {
 					dput(parent);
 					err = -ENOMEM;
 					goto done;
@@ -1477,7 +1473,6 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 	struct dentry *dn;
 	struct inode *in;
 	int err = 0, skipped = 0, ret, i;
-	struct inode *snapdir = NULL;
 	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
 	u32 frag = le32_to_cpu(rhead->args.readdir.frag);
 	u32 last_hash = 0;
@@ -1510,8 +1505,6 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 	}
 
 	if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
-		snapdir = ceph_get_snapdir(d_inode(parent));
-		parent = d_find_alias(snapdir);
 		dout("readdir_prepopulate %d items under SNAPDIR dn %p\n",
 		     rinfo->dir_nr, parent);
 	} else {
@@ -1519,15 +1512,18 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 		     rinfo->dir_nr, parent);
 		if (rinfo->dir_dir)
 			ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir);
-	}
 
-	if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2 &&
-	    !(rinfo->hash_order && last_hash)) {
-		/* note dir version at start of readdir so we can tell
-		 * if any dentries get dropped */
-		req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
-		req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
-		req->r_readdir_cache_idx = 0;
+		if (ceph_frag_is_leftmost(frag) &&
+		    req->r_readdir_offset == 2 &&
+		    !(rinfo->hash_order && last_hash)) {
+			/* note dir version at start of readdir so we can
+			 * tell if any dentries get dropped */
+			req->r_dir_release_cnt =
+				atomic64_read(&ci->i_release_count);
+			req->r_dir_ordered_cnt =
+				atomic64_read(&ci->i_ordered_count);
+			req->r_readdir_cache_idx = 0;
+		}
 	}
 
 	cache_ctl.index = req->r_readdir_cache_idx;
@@ -1566,7 +1562,7 @@ retry_lookup:
 			dn = d_alloc(parent, &dname);
 			dout("d_alloc %p '%.*s' = %p\n", parent,
 			     dname.len, dname.name, dn);
-			if (dn == NULL) {
+			if (!dn) {
 				dout("d_alloc badness\n");
 				err = -ENOMEM;
 				goto out;
@@ -1650,10 +1646,6 @@ out:
 		req->r_readdir_cache_idx = cache_ctl.index;
 	}
 	ceph_readdir_cache_release(&cache_ctl);
-	if (snapdir) {
-		iput(snapdir);
-		dput(parent);
-	}
 	dout("readdir_prepopulate done\n");
 	return err;
 }
@@ -1841,9 +1833,20 @@ retry:
 	 * possibly truncate them.. so write AND block!
 	 */
 	if (ci->i_wrbuffer_ref_head < ci->i_wrbuffer_ref) {
+		struct ceph_cap_snap *capsnap;
+		to = ci->i_truncate_size;
+		list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
+			// MDS should have revoked Frw caps
+			WARN_ON_ONCE(capsnap->writing);
+			if (capsnap->dirty_pages && capsnap->size > to)
+				to = capsnap->size;
+		}
+		spin_unlock(&ci->i_ceph_lock);
 		dout("__do_pending_vmtruncate %p flushing snaps first\n",
 		     inode);
-		spin_unlock(&ci->i_ceph_lock);
+
+		truncate_pagecache(inode, to);
+
 		filemap_write_and_wait_range(&inode->i_data, 0,
 					     inode->i_sb->s_maxbytes);
 		goto retry;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 666a9f274832..9dd6b836ac9e 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -408,7 +408,7 @@ struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
 {
 	struct ceph_mds_session *session;
 
-	if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
+	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
 		return NULL;
 	session = mdsc->sessions[mds];
 	dout("lookup_mds_session %p %d\n", session,
@@ -483,7 +483,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
 
 		dout("register_session realloc to %d\n", newmax);
 		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
-		if (sa == NULL)
+		if (!sa)
 			goto fail_realloc;
 		if (mdsc->sessions) {
 			memcpy(sa, mdsc->sessions,
@@ -731,9 +731,16 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 
 	inode = NULL;
 	if (req->r_inode) {
-		inode = req->r_inode;
-		ihold(inode);
-	} else if (req->r_dentry) {
+		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
+			inode = req->r_inode;
+			ihold(inode);
+		} else {
+			/* req->r_dentry is non-null for LSSNAP request.
+			 * fall-thru */
+			WARN_ON_ONCE(!req->r_dentry);
+		}
+	}
+	if (!inode && req->r_dentry) {
 		/* ignore race with rename; old or new d_parent is okay */
 		struct dentry *parent;
 		struct inode *dir;
@@ -886,7 +893,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
 
 	/* Calculate serialized length of metadata */
 	metadata_bytes = 4;  /* map length */
-	for (i = 0; metadata[i][0] != NULL; ++i) {
+	for (i = 0; metadata[i][0]; ++i) {
 		metadata_bytes += 8 + strlen(metadata[i][0]) +
 			strlen(metadata[i][1]);
 		metadata_key_count++;
@@ -919,7 +926,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
 	ceph_encode_32(&p, metadata_key_count);
 
 	/* Two length-prefixed strings for each entry in the map */
-	for (i = 0; metadata[i][0] != NULL; ++i) {
+	for (i = 0; metadata[i][0]; ++i) {
 		size_t const key_len = strlen(metadata[i][0]);
 		size_t const val_len = strlen(metadata[i][1]);
 
@@ -1122,7 +1129,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
 
 		spin_lock(&session->s_cap_lock);
 		p = p->next;
-		if (cap->ci == NULL) {
+		if (!cap->ci) {
 			dout("iterate_session_caps  finishing cap %p removal\n",
 			     cap);
 			BUG_ON(cap->session != session);
@@ -1748,7 +1755,7 @@ char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
 	int len, pos;
 	unsigned seq;
 
-	if (dentry == NULL)
+	if (!dentry)
 		return ERR_PTR(-EINVAL);
 
 retry:
@@ -1771,7 +1778,7 @@ retry:
 		len--;  /* no leading '/' */
 
 	path = kmalloc(len+1, GFP_NOFS);
-	if (path == NULL)
+	if (!path)
 		return ERR_PTR(-ENOMEM);
 	pos = len;
 	path[pos] = 0;	/* trailing null */
@@ -2875,7 +2882,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
 	}
 
 	if (list_empty(&ci->i_cap_snaps)) {
-		snap_follows = 0;
+		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
 	} else {
 		struct ceph_cap_snap *capsnap =
 			list_first_entry(&ci->i_cap_snaps,
@@ -3133,7 +3140,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 	     newmap->m_epoch, oldmap->m_epoch);
 
 	for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) {
-		if (mdsc->sessions[i] == NULL)
+		if (!mdsc->sessions[i])
 			continue;
 		s = mdsc->sessions[i];
 		oldstate = ceph_mdsmap_get_state(oldmap, i);
@@ -3280,7 +3287,7 @@ static void handle_lease(struct ceph_mds_client *mdsc,
 	mutex_lock(&session->s_mutex);
 	session->s_seq++;
 
-	if (inode == NULL) {
+	if (!inode) {
 		dout("handle_lease no inode %llx\n", vino.ino);
 		goto release;
 	}
@@ -3438,7 +3445,7 @@ static void delayed_work(struct work_struct *work)
 
 	for (i = 0; i < mdsc->max_sessions; i++) {
 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
-		if (s == NULL)
+		if (!s)
 			continue;
 		if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
 			dout("resending session close request for mds%d\n",
@@ -3490,7 +3497,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
 	fsc->mdsc = mdsc;
 	mutex_init(&mdsc->mutex);
 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
-	if (mdsc->mdsmap == NULL) {
+	if (!mdsc->mdsmap) {
 		kfree(mdsc);
 		return -ENOMEM;
 	}
diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c
index 1a748cf88535..33ced4c22732 100644
--- a/fs/ceph/mdsmap.c
+++ b/fs/ceph/mdsmap.c
@@ -112,7 +112,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 	u16 mdsmap_ev;
 
 	m = kzalloc(sizeof(*m), GFP_NOFS);
-	if (m == NULL)
+	if (!m)
 		return ERR_PTR(-ENOMEM);
 
 	ceph_decode_need(p, end, 1 + 1, bad);
@@ -138,7 +138,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 	m->m_num_mds = m->m_max_mds;
 
 	m->m_info = kcalloc(m->m_num_mds, sizeof(*m->m_info), GFP_NOFS);
-	if (m->m_info == NULL)
+	if (!m->m_info)
 		goto nomem;
 
 	/* pick out active nodes from mds_info (state > 0) */
@@ -232,7 +232,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
 		if (num_export_targets) {
 			info->export_targets = kcalloc(num_export_targets,
 						       sizeof(u32), GFP_NOFS);
-			if (info->export_targets == NULL)
+			if (!info->export_targets)
 				goto nomem;
 			for (j = 0; j < num_export_targets; j++)
 				info->export_targets[j] =
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index dab5d6732345..1ffc8b426c1c 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -299,7 +299,8 @@ static int cmpu64_rev(const void *a, const void *b)
 /*
  * build the snap context for a given realm.
  */
-static int build_snap_context(struct ceph_snap_realm *realm)
+static int build_snap_context(struct ceph_snap_realm *realm,
+			      struct list_head* dirty_realms)
 {
 	struct ceph_snap_realm *parent = realm->parent;
 	struct ceph_snap_context *snapc;
@@ -313,7 +314,7 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 	 */
 	if (parent) {
 		if (!parent->cached_context) {
-			err = build_snap_context(parent);
+			err = build_snap_context(parent, dirty_realms);
 			if (err)
 				goto fail;
 		}
@@ -332,7 +333,7 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 		     " (unchanged)\n",
 		     realm->ino, realm, realm->cached_context,
 		     realm->cached_context->seq,
-		     (unsigned int) realm->cached_context->num_snaps);
+		     (unsigned int)realm->cached_context->num_snaps);
 		return 0;
 	}
 
@@ -373,7 +374,11 @@ static int build_snap_context(struct ceph_snap_realm *realm)
 	     realm->ino, realm, snapc, snapc->seq,
 	     (unsigned int) snapc->num_snaps);
 
-	ceph_put_snap_context(realm->cached_context);
+	if (realm->cached_context) {
+		ceph_put_snap_context(realm->cached_context);
+		/* queue realm for cap_snap creation */
+		list_add_tail(&realm->dirty_item, dirty_realms);
+	}
 	realm->cached_context = snapc;
 	return 0;
 
@@ -394,15 +399,16 @@ fail:
 /*
  * rebuild snap context for the given realm and all of its children.
  */
-static void rebuild_snap_realms(struct ceph_snap_realm *realm)
+static void rebuild_snap_realms(struct ceph_snap_realm *realm,
+				struct list_head *dirty_realms)
 {
 	struct ceph_snap_realm *child;
 
 	dout("rebuild_snap_realms %llx %p\n", realm->ino, realm);
-	build_snap_context(realm);
+	build_snap_context(realm, dirty_realms);
 
 	list_for_each_entry(child, &realm->children, child_item)
-		rebuild_snap_realms(child);
+		rebuild_snap_realms(child, dirty_realms);
 }
 
 
@@ -624,13 +630,11 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
 {
 	struct ceph_inode_info *ci;
 	struct inode *lastinode = NULL;
-	struct ceph_snap_realm *child;
 
 	dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);
 
 	spin_lock(&realm->inodes_with_caps_lock);
-	list_for_each_entry(ci, &realm->inodes_with_caps,
-			    i_snap_realm_item) {
+	list_for_each_entry(ci, &realm->inodes_with_caps, i_snap_realm_item) {
 		struct inode *inode = igrab(&ci->vfs_inode);
 		if (!inode)
 			continue;
@@ -643,14 +647,6 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
 	spin_unlock(&realm->inodes_with_caps_lock);
 	iput(lastinode);
 
-	list_for_each_entry(child, &realm->children, child_item) {
-		dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n",
-		     realm, realm->ino, child, child->ino);
-		list_del_init(&child->dirty_item);
-		list_add(&child->dirty_item, &realm->dirty_item);
-	}
-
-	list_del_init(&realm->dirty_item);
 	dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
 }
 
@@ -721,8 +717,6 @@ more:
 		if (err < 0)
 			goto fail;
 
-		/* queue realm for cap_snap creation */
-		list_add(&realm->dirty_item, &dirty_realms);
 		if (realm->seq > mdsc->last_snap_seq)
 			mdsc->last_snap_seq = realm->seq;
 
@@ -741,7 +735,7 @@ more:
 
 	/* invalidate when we reach the _end_ (root) of the trace */
 	if (invalidate && p >= e)
-		rebuild_snap_realms(realm);
+		rebuild_snap_realms(realm, &dirty_realms);
 
 	if (!first_realm)
 		first_realm = realm;
@@ -758,6 +752,7 @@ more:
 	while (!list_empty(&dirty_realms)) {
 		realm = list_first_entry(&dirty_realms, struct ceph_snap_realm,
 					 dirty_item);
+		list_del_init(&realm->dirty_item);
 		queue_realm_cap_snaps(realm);
 	}
 
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index aa06a8c24792..e4082afedcb1 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -49,9 +49,16 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
 	struct ceph_statfs st;
 	u64 fsid;
 	int err;
+	u64 data_pool;
+
+	if (fsc->mdsc->mdsmap->m_num_data_pg_pools == 1) {
+		data_pool = fsc->mdsc->mdsmap->m_data_pg_pools[0];
+	} else {
+		data_pool = CEPH_NOPOOL;
+	}
 
 	dout("statfs\n");
-	err = ceph_monc_do_statfs(&fsc->client->monc, &st);
+	err = ceph_monc_do_statfs(&fsc->client->monc, data_pool, &st);
 	if (err < 0)
 		return err;
 
@@ -113,7 +120,6 @@ enum {
 	Opt_rasize,
 	Opt_caps_wanted_delay_min,
 	Opt_caps_wanted_delay_max,
-	Opt_cap_release_safety,
 	Opt_readdir_max_entries,
 	Opt_readdir_max_bytes,
 	Opt_congestion_kb,
@@ -152,7 +158,6 @@ static match_table_t fsopt_tokens = {
 	{Opt_rasize, "rasize=%d"},
 	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
 	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
-	{Opt_cap_release_safety, "cap_release_safety=%d"},
 	{Opt_readdir_max_entries, "readdir_max_entries=%d"},
 	{Opt_readdir_max_bytes, "readdir_max_bytes=%d"},
 	{Opt_congestion_kb, "write_congestion_kb=%d"},
@@ -235,27 +240,43 @@ static int parse_fsopt_token(char *c, void *private)
 		break;
 		/* misc */
 	case Opt_wsize:
-		fsopt->wsize = intval;
+		if (intval < PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE)
+			return -EINVAL;
+		fsopt->wsize = ALIGN(intval, PAGE_SIZE);
 		break;
 	case Opt_rsize:
-		fsopt->rsize = intval;
+		if (intval < PAGE_SIZE || intval > CEPH_MAX_READ_SIZE)
+			return -EINVAL;
+		fsopt->rsize = ALIGN(intval, PAGE_SIZE);
 		break;
 	case Opt_rasize:
-		fsopt->rasize = intval;
+		if (intval < 0)
+			return -EINVAL;
+		fsopt->rasize = ALIGN(intval + PAGE_SIZE - 1, PAGE_SIZE);
 		break;
 	case Opt_caps_wanted_delay_min:
+		if (intval < 1)
+			return -EINVAL;
 		fsopt->caps_wanted_delay_min = intval;
 		break;
 	case Opt_caps_wanted_delay_max:
+		if (intval < 1)
+			return -EINVAL;
 		fsopt->caps_wanted_delay_max = intval;
 		break;
 	case Opt_readdir_max_entries:
+		if (intval < 1)
+			return -EINVAL;
 		fsopt->max_readdir = intval;
 		break;
 	case Opt_readdir_max_bytes:
+		if (intval < PAGE_SIZE && intval != 0)
+			return -EINVAL;
 		fsopt->max_readdir_bytes = intval;
 		break;
 	case Opt_congestion_kb:
+		if (intval < 1024) /* at least 1M */
+			return -EINVAL;
 		fsopt->congestion_kb = intval;
 		break;
 	case Opt_dirstat:
@@ -392,7 +413,8 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
 	fsopt->sb_flags = flags;
 	fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
 
-	fsopt->rsize = CEPH_RSIZE_DEFAULT;
+	fsopt->wsize = CEPH_MAX_WRITE_SIZE;
+	fsopt->rsize = CEPH_MAX_READ_SIZE;
 	fsopt->rasize = CEPH_RASIZE_DEFAULT;
 	fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
 	if (!fsopt->snapdir_name) {
@@ -402,7 +424,6 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
 
 	fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
 	fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
-	fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
 	fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
 	fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
 	fsopt->congestion_kb = default_congestion_kb();
@@ -508,7 +529,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 		seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
 	if (fsopt->wsize)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
-	if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
+	if (fsopt->rsize != CEPH_MAX_READ_SIZE)
 		seq_printf(m, ",rsize=%d", fsopt->rsize);
 	if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
 		seq_printf(m, ",rasize=%d", fsopt->rasize);
@@ -520,9 +541,6 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
 	if (fsopt->caps_wanted_delay_max != CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT)
 		seq_printf(m, ",caps_wanted_delay_max=%d",
 			   fsopt->caps_wanted_delay_max);
-	if (fsopt->cap_release_safety != CEPH_CAP_RELEASE_SAFETY_DEFAULT)
-		seq_printf(m, ",cap_release_safety=%d",
-			   fsopt->cap_release_safety);
 	if (fsopt->max_readdir != CEPH_MAX_READDIR_DEFAULT)
 		seq_printf(m, ",readdir_max_entries=%d", fsopt->max_readdir);
 	if (fsopt->max_readdir_bytes != CEPH_MAX_READDIR_BYTES_DEFAULT)
@@ -576,7 +594,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
 
-	if (fsopt->mds_namespace == NULL) {
+	if (!fsopt->mds_namespace) {
 		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
 				   0, true);
 	} else {
@@ -597,13 +615,13 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 	 * to be processed in parallel, limit concurrency.
 	 */
 	fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1);
-	if (fsc->wb_wq == NULL)
+	if (!fsc->wb_wq)
 		goto fail_client;
 	fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
-	if (fsc->pg_inv_wq == NULL)
+	if (!fsc->pg_inv_wq)
 		goto fail_wb_wq;
 	fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
-	if (fsc->trunc_wq == NULL)
+	if (!fsc->trunc_wq)
 		goto fail_pg_inv_wq;
 
 	/* set up mempools */
@@ -674,26 +692,26 @@ static int __init init_caches(void)
 				      __alignof__(struct ceph_inode_info),
 				      SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD|
 				      SLAB_ACCOUNT, ceph_inode_init_once);
-	if (ceph_inode_cachep == NULL)
+	if (!ceph_inode_cachep)
 		return -ENOMEM;
 
 	ceph_cap_cachep = KMEM_CACHE(ceph_cap,
 				     SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
-	if (ceph_cap_cachep == NULL)
+	if (!ceph_cap_cachep)
 		goto bad_cap;
 	ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush,
 					   SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
-	if (ceph_cap_flush_cachep == NULL)
+	if (!ceph_cap_flush_cachep)
 		goto bad_cap_flush;
 
 	ceph_dentry_cachep = KMEM_CACHE(ceph_dentry_info,
 					SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
-	if (ceph_dentry_cachep == NULL)
+	if (!ceph_dentry_cachep)
 		goto bad_dentry;
 
 	ceph_file_cachep = KMEM_CACHE(ceph_file_info, SLAB_MEM_SPREAD);
 
-	if (ceph_file_cachep == NULL)
+	if (!ceph_file_cachep)
 		goto bad_file;
 
 	if ((error = ceph_fscache_register()))
@@ -947,20 +965,10 @@ static int ceph_setup_bdi(struct super_block *sb, struct ceph_fs_client *fsc)
 		return err;
 
 	/* set ra_pages based on rasize mount option? */
-	if (fsc->mount_options->rasize >= PAGE_SIZE)
-		sb->s_bdi->ra_pages =
-			(fsc->mount_options->rasize + PAGE_SIZE - 1)
-			>> PAGE_SHIFT;
-	else
-		sb->s_bdi->ra_pages = VM_MAX_READAHEAD * 1024 / PAGE_SIZE;
-
-	if (fsc->mount_options->rsize > fsc->mount_options->rasize &&
-	    fsc->mount_options->rsize >= PAGE_SIZE)
-		sb->s_bdi->io_pages =
-			(fsc->mount_options->rsize + PAGE_SIZE - 1)
-			>> PAGE_SHIFT;
-	else if (fsc->mount_options->rsize == 0)
-		sb->s_bdi->io_pages = ULONG_MAX;
+	sb->s_bdi->ra_pages = fsc->mount_options->rasize >> PAGE_SHIFT;
+
+	/* set io_pages based on max osd read size */
+	sb->s_bdi->io_pages = fsc->mount_options->rsize >> PAGE_SHIFT;
 
 	return 0;
 }
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index f02a2225fe42..279a2f401cf5 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -46,12 +46,25 @@
 #define ceph_test_mount_opt(fsc, opt) \
 	(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
 
-#define CEPH_RSIZE_DEFAULT              (64*1024*1024) /* max read size */
+/* max size of osd read request, limited by libceph */
+#define CEPH_MAX_READ_SIZE              CEPH_MSG_MAX_DATA_LEN
+/* osd has a configurable limitaion of max write size.
+ * CEPH_MSG_MAX_DATA_LEN should be small enough. */
+#define CEPH_MAX_WRITE_SIZE		CEPH_MSG_MAX_DATA_LEN
 #define CEPH_RASIZE_DEFAULT             (8192*1024)    /* max readahead */
 #define CEPH_MAX_READDIR_DEFAULT        1024
 #define CEPH_MAX_READDIR_BYTES_DEFAULT  (512*1024)
 #define CEPH_SNAPDIRNAME_DEFAULT        ".snap"
 
+/*
+ * Delay telling the MDS we no longer want caps, in case we reopen
+ * the file.  Delay a minimum amount of time, even if we send a cap
+ * message for some other reason.  Otherwise, take the oppotunity to
+ * update the mds to avoid sending another message later.
+ */
+#define CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT      5  /* cap release delay */
+#define CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT     60  /* cap release delay */
+
 struct ceph_mount_options {
 	int flags;
 	int sb_flags;
@@ -61,7 +74,6 @@ struct ceph_mount_options {
 	int rasize;           /* max readahead */
 	int congestion_kb;    /* max writeback in flight */
 	int caps_wanted_delay_min, caps_wanted_delay_max;
-	int cap_release_safety;
 	int max_readdir;       /* max readdir result (entires) */
 	int max_readdir_bytes; /* max readdir result (bytes) */
 
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 11263f102e4c..3542b2c364cf 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -777,7 +777,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 		spin_unlock(&ci->i_ceph_lock);
 
 		/* security module gets xattr while filling trace */
-		if (current->journal_info != NULL) {
+		if (current->journal_info) {
 			pr_warn_ratelimited("sync getxattr %p "
 					    "during filling trace\n", inode);
 			return -EBUSY;
@@ -809,7 +809,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 
 	memcpy(value, xattr->val, xattr->val_len);
 
-	if (current->journal_info != NULL &&
+	if (current->journal_info &&
 	    !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
 		ci->i_ceph_flags |= CEPH_I_SEC_INITED;
 out:
@@ -1058,7 +1058,7 @@ do_sync_unlocked:
 		up_read(&mdsc->snap_rwsem);
 
 	/* security module set xattr while filling trace */
-	if (current->journal_info != NULL) {
+	if (current->journal_info) {
 		pr_warn_ratelimited("sync setxattr %p "
 				    "during filling trace\n", inode);
 		err = -EBUSY;
@@ -1108,7 +1108,7 @@ bool ceph_security_xattr_deadlock(struct inode *in)
 {
 	struct ceph_inode_info *ci;
 	bool ret;
-	if (in->i_security == NULL)
+	if (!in->i_security)
 		return false;
 	ci = ceph_inode(in);
 	spin_lock(&ci->i_ceph_lock);