summary refs log tree commit diff
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-01-24 12:34:13 -0800
committerLinus Torvalds <torvalds@linux-foundation.org>2016-01-24 12:34:13 -0800
commit00e3f5cc305c8a056a22cecedab3a71d59dae1fc (patch)
treeb2f3cff7d986768aab7ebc84b9efefa8ea3ecc00
parent772950ed21c36f4157ff34e7d10fb61975f64558 (diff)
parent7e01726a6853e032536ed7e75c1e1232872ff318 (diff)
downloadlinux-00e3f5cc305c8a056a22cecedab3a71d59dae1fc.tar.gz
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
 "The two main changes are aio support in CephFS, and a series that
  fixes several issues in the authentication key timeout/renewal code.

  On top of that are a variety of cleanups and minor bug fixes"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  libceph: remove outdated comment
  libceph: kill off ceph_x_ticket_handler::validity
  libceph: invalidate AUTH in addition to a service ticket
  libceph: fix authorizer invalidation, take 2
  libceph: clear messenger auth_retry flag if we fault
  libceph: fix ceph_msg_revoke()
  libceph: use list_for_each_entry_safe
  ceph: use i_size_{read,write} to get/set i_size
  ceph: re-send AIO write request when getting -EOLDSNAP error
  ceph: Asynchronous IO support
  ceph: Avoid to propagate the invalid page point
  ceph: fix double page_unlock() in page_mkwrite()
  rbd: delete an unnecessary check before rbd_dev_destroy()
  libceph: use list_next_entry instead of list_entry_next
  ceph: ceph_frag_contains_value can be boolean
  ceph: remove unused functions in ceph_frag.h
-rw-r--r--drivers/block/rbd.c3
-rw-r--r--fs/ceph/addr.c14
-rw-r--r--fs/ceph/cache.c8
-rw-r--r--fs/ceph/file.c509
-rw-r--r--fs/ceph/inode.c8
-rw-r--r--include/linux/ceph/ceph_frag.h37
-rw-r--r--include/linux/ceph/messenger.h2
-rw-r--r--net/ceph/auth_x.c49
-rw-r--r--net/ceph/auth_x.h2
-rw-r--r--net/ceph/messenger.c105
-rw-r--r--net/ceph/mon_client.c4
11 files changed, 501 insertions, 240 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 81ea69fee7ca..4a876785b68c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -5185,8 +5185,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
 
 out_err:
 	rbd_dev_unparent(rbd_dev);
-	if (parent)
-		rbd_dev_destroy(parent);
+	rbd_dev_destroy(parent);
 	return ret;
 }
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index b7d218a168fb..c22213789090 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1108,7 +1108,7 @@ retry_locked:
 		return 0;
 
 	/* past end of file? */
-	i_size = inode->i_size;   /* caller holds i_mutex */
+	i_size = i_size_read(inode);
 
 	if (page_off >= i_size ||
 	    (pos_in_page == 0 && (pos+len) >= i_size &&
@@ -1149,7 +1149,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 		page = grab_cache_page_write_begin(mapping, index, 0);
 		if (!page)
 			return -ENOMEM;
-		*pagep = page;
 
 		dout("write_begin file %p inode %p page %p %d~%d\n", file,
 		     inode, page, (int)pos, (int)len);
@@ -1184,8 +1183,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
 		zero_user_segment(page, from+copied, len);
 
 	/* did file size increase? */
-	/* (no need for i_size_read(); we caller holds i_mutex */
-	if (pos+copied > inode->i_size)
+	if (pos+copied > i_size_read(inode))
 		check_cap = ceph_inode_set_size(inode, pos+copied);
 
 	if (!PageUptodate(page))
@@ -1378,11 +1376,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 
 	ret = VM_FAULT_NOPAGE;
 	if ((off > size) ||
-	    (page->mapping != inode->i_mapping))
+	    (page->mapping != inode->i_mapping)) {
+		unlock_page(page);
 		goto out;
+	}
 
 	ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
-	if (ret == 0) {
+	if (ret >= 0) {
 		/* success.  we'll keep the page locked. */
 		set_page_dirty(page);
 		ret = VM_FAULT_LOCKED;
@@ -1393,8 +1393,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
 			ret = VM_FAULT_SIGBUS;
 	}
 out:
-	if (ret != VM_FAULT_LOCKED)
-		unlock_page(page);
 	if (ret == VM_FAULT_LOCKED ||
 	    ci->i_inline_version != CEPH_INLINE_NONE) {
 		int dirty;
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 7680e2626815..a351480dbabc 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -106,7 +106,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
 
 	memset(&aux, 0, sizeof(aux));
 	aux.mtime = inode->i_mtime;
-	aux.size = inode->i_size;
+	aux.size = i_size_read(inode);
 
 	memcpy(buffer, &aux, sizeof(aux));
 
@@ -117,9 +117,7 @@ static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data,
 					uint64_t *size)
 {
 	const struct ceph_inode_info* ci = cookie_netfs_data;
-	const struct inode* inode = &ci->vfs_inode;
-
-	*size = inode->i_size;
+	*size = i_size_read(&ci->vfs_inode);
 }
 
 static enum fscache_checkaux ceph_fscache_inode_check_aux(
@@ -134,7 +132,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
 
 	memset(&aux, 0, sizeof(aux));
 	aux.mtime = inode->i_mtime;
-	aux.size = inode->i_size;
+	aux.size = i_size_read(inode);
 
 	if (memcmp(data, &aux, sizeof(aux)) != 0)
 		return FSCACHE_CHECKAUX_OBSOLETE;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 10c5ae79696e..86a9c383955e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file)
 }
 
 enum {
-	CHECK_EOF = 1,
-	READ_INLINE = 2,
+	HAVE_RETRIED = 1,
+	CHECK_EOF =    2,
+	READ_INLINE =  3,
 };
 
 /*
@@ -411,17 +412,15 @@ enum {
 static int striped_read(struct inode *inode,
 			u64 off, u64 len,
 			struct page **pages, int num_pages,
-			int *checkeof, bool o_direct,
-			unsigned long buf_align)
+			int *checkeof)
 {
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 pos, this_len, left;
-	int io_align, page_align;
-	int pages_left;
-	int read;
+	loff_t i_size;
+	int page_align, pages_left;
+	int read, ret;
 	struct page **page_pos;
-	int ret;
 	bool hit_stripe, was_short;
 
 	/*
@@ -432,13 +431,9 @@ static int striped_read(struct inode *inode,
 	page_pos = pages;
 	pages_left = num_pages;
 	read = 0;
-	io_align = off & ~PAGE_MASK;
 
 more:
-	if (o_direct)
-		page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
-	else
-		page_align = pos & ~PAGE_MASK;
+	page_align = pos & ~PAGE_MASK;
 	this_len = left;
 	ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
 				  &ci->i_layout, pos, &this_len,
@@ -452,13 +447,12 @@ more:
 	dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
 	     ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
 
+	i_size = i_size_read(inode);
 	if (ret >= 0) {
 		int didpages;
-		if (was_short && (pos + ret < inode->i_size)) {
-			int zlen = min(this_len - ret,
-				       inode->i_size - pos - ret);
-			int zoff = (o_direct ? buf_align : io_align) +
-				    read + ret;
+		if (was_short && (pos + ret < i_size)) {
+			int zlen = min(this_len - ret, i_size - pos - ret);
+			int zoff = (off & ~PAGE_MASK) + read + ret;
 			dout(" zero gap %llu to %llu\n",
 				pos + ret, pos + ret + zlen);
 			ceph_zero_page_vector_range(zoff, zlen, pages);
@@ -473,14 +467,14 @@ more:
 		pages_left -= didpages;
 
 		/* hit stripe and need continue*/
-		if (left && hit_stripe && pos < inode->i_size)
+		if (left && hit_stripe && pos < i_size)
 			goto more;
 	}
 
 	if (read > 0) {
 		ret = read;
 		/* did we bounce off eof? */
-		if (pos + left > inode->i_size)
+		if (pos + left > i_size)
 			*checkeof = CHECK_EOF;
 	}
 
@@ -521,54 +515,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
 	if (ret < 0)
 		return ret;
 
-	if (iocb->ki_flags & IOCB_DIRECT) {
-		while (iov_iter_count(i)) {
-			size_t start;
-			ssize_t n;
-
-			n = dio_get_pagev_size(i);
-			pages = dio_get_pages_alloc(i, n, &start, &num_pages);
-			if (IS_ERR(pages))
-				return PTR_ERR(pages);
-
-			ret = striped_read(inode, off, n,
-					   pages, num_pages, checkeof,
-					   1, start);
-
-			ceph_put_page_vector(pages, num_pages, true);
-
-			if (ret <= 0)
-				break;
-			off += ret;
-			iov_iter_advance(i, ret);
-			if (ret < n)
+	num_pages = calc_pages_for(off, len);
+	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+	if (IS_ERR(pages))
+		return PTR_ERR(pages);
+	ret = striped_read(inode, off, len, pages,
+				num_pages, checkeof);
+	if (ret > 0) {
+		int l, k = 0;
+		size_t left = ret;
+
+		while (left) {
+			size_t page_off = off & ~PAGE_MASK;
+			size_t copy = min_t(size_t, left,
+					    PAGE_SIZE - page_off);
+			l = copy_page_to_iter(pages[k++], page_off, copy, i);
+			off += l;
+			left -= l;
+			if (l < copy)
 				break;
 		}
-	} else {
-		num_pages = calc_pages_for(off, len);
-		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-		if (IS_ERR(pages))
-			return PTR_ERR(pages);
-		ret = striped_read(inode, off, len, pages,
-					num_pages, checkeof, 0, 0);
-		if (ret > 0) {
-			int l, k = 0;
-			size_t left = ret;
-
-			while (left) {
-				size_t page_off = off & ~PAGE_MASK;
-				size_t copy = min_t(size_t,
-						    PAGE_SIZE - page_off, left);
-				l = copy_page_to_iter(pages[k++], page_off,
-						      copy, i);
-				off += l;
-				left -= l;
-				if (l < copy)
-					break;
-			}
-		}
-		ceph_release_page_vector(pages, num_pages);
 	}
+	ceph_release_page_vector(pages, num_pages);
 
 	if (off > iocb->ki_pos) {
 		ret = off - iocb->ki_pos;
@@ -579,6 +547,193 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
 	return ret;
 }
 
+struct ceph_aio_request {
+	struct kiocb *iocb;
+	size_t total_len;
+	int write;
+	int error;
+	struct list_head osd_reqs;
+	unsigned num_reqs;
+	atomic_t pending_reqs;
+	struct timespec mtime;
+	struct ceph_cap_flush *prealloc_cf;
+};
+
+struct ceph_aio_work {
+	struct work_struct work;
+	struct ceph_osd_request *req;
+};
+
+static void ceph_aio_retry_work(struct work_struct *work);
+
+static void ceph_aio_complete(struct inode *inode,
+			      struct ceph_aio_request *aio_req)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int ret;
+
+	if (!atomic_dec_and_test(&aio_req->pending_reqs))
+		return;
+
+	ret = aio_req->error;
+	if (!ret)
+		ret = aio_req->total_len;
+
+	dout("ceph_aio_complete %p rc %d\n", inode, ret);
+
+	if (ret >= 0 && aio_req->write) {
+		int dirty;
+
+		loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
+		if (endoff > i_size_read(inode)) {
+			if (ceph_inode_set_size(inode, endoff))
+				ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
+		}
+
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_inline_version = CEPH_INLINE_NONE;
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
+					       &aio_req->prealloc_cf);
+		spin_unlock(&ci->i_ceph_lock);
+		if (dirty)
+			__mark_inode_dirty(inode, dirty);
+
+	}
+
+	ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
+						CEPH_CAP_FILE_RD));
+
+	aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
+
+	ceph_free_cap_flush(aio_req->prealloc_cf);
+	kfree(aio_req);
+}
+
+static void ceph_aio_complete_req(struct ceph_osd_request *req,
+				  struct ceph_msg *msg)
+{
+	int rc = req->r_result;
+	struct inode *inode = req->r_inode;
+	struct ceph_aio_request *aio_req = req->r_priv;
+	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
+	int num_pages = calc_pages_for((u64)osd_data->alignment,
+				       osd_data->length);
+
+	dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
+	     inode, rc, osd_data->length);
+
+	if (rc == -EOLDSNAPC) {
+		struct ceph_aio_work *aio_work;
+		BUG_ON(!aio_req->write);
+
+		aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
+		if (aio_work) {
+			INIT_WORK(&aio_work->work, ceph_aio_retry_work);
+			aio_work->req = req;
+			queue_work(ceph_inode_to_client(inode)->wb_wq,
+				   &aio_work->work);
+			return;
+		}
+		rc = -ENOMEM;
+	} else if (!aio_req->write) {
+		if (rc == -ENOENT)
+			rc = 0;
+		if (rc >= 0 && osd_data->length > rc) {
+			int zoff = osd_data->alignment + rc;
+			int zlen = osd_data->length - rc;
+			/*
+			 * If read is satisfied by single OSD request,
+			 * it can pass EOF. Otherwise read is within
+			 * i_size.
+			 */
+			if (aio_req->num_reqs == 1) {
+				loff_t i_size = i_size_read(inode);
+				loff_t endoff = aio_req->iocb->ki_pos + rc;
+				if (endoff < i_size)
+					zlen = min_t(size_t, zlen,
+						     i_size - endoff);
+				aio_req->total_len = rc + zlen;
+			}
+
+			if (zlen > 0)
+				ceph_zero_page_vector_range(zoff, zlen,
+							    osd_data->pages);
+		}
+	}
+
+	ceph_put_page_vector(osd_data->pages, num_pages, false);
+	ceph_osdc_put_request(req);
+
+	if (rc < 0)
+		cmpxchg(&aio_req->error, 0, rc);
+
+	ceph_aio_complete(inode, aio_req);
+	return;
+}
+
+static void ceph_aio_retry_work(struct work_struct *work)
+{
+	struct ceph_aio_work *aio_work =
+		container_of(work, struct ceph_aio_work, work);
+	struct ceph_osd_request *orig_req = aio_work->req;
+	struct ceph_aio_request *aio_req = orig_req->r_priv;
+	struct inode *inode = orig_req->r_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_snap_context *snapc;
+	struct ceph_osd_request *req;
+	int ret;
+
+	spin_lock(&ci->i_ceph_lock);
+	if (__ceph_have_pending_cap_snap(ci)) {
+		struct ceph_cap_snap *capsnap =
+			list_last_entry(&ci->i_cap_snaps,
+					struct ceph_cap_snap,
+					ci_item);
+		snapc = ceph_get_snap_context(capsnap->context);
+	} else {
+		BUG_ON(!ci->i_head_snapc);
+		snapc = ceph_get_snap_context(ci->i_head_snapc);
+	}
+	spin_unlock(&ci->i_ceph_lock);
+
+	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
+			false, GFP_NOFS);
+	if (IS_ERR(req)) {
+		ret = PTR_ERR(req);
+		req = orig_req;
+		goto out;
+	}
+
+	req->r_flags =	CEPH_OSD_FLAG_ORDERSNAP |
+			CEPH_OSD_FLAG_ONDISK |
+			CEPH_OSD_FLAG_WRITE;
+	req->r_base_oloc = orig_req->r_base_oloc;
+	req->r_base_oid = orig_req->r_base_oid;
+
+	req->r_ops[0] = orig_req->r_ops[0];
+	osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+
+	ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
+				snapc, CEPH_NOSNAP, &aio_req->mtime);
+
+	ceph_put_snap_context(snapc);
+	ceph_osdc_put_request(orig_req);
+
+	req->r_callback = ceph_aio_complete_req;
+	req->r_inode = inode;
+	req->r_priv = aio_req;
+
+	ret = ceph_osdc_start_request(req->r_osdc, req, false);
+out:
+	if (ret < 0) {
+		BUG_ON(ret == -EOLDSNAPC);
+		req->r_result = ret;
+		ceph_aio_complete_req(req, NULL);
+	}
+
+	kfree(aio_work);
+}
+
 /*
  * Write commit request unsafe callback, called to tell us when a
  * request is unsafe (that is, in flight--has been handed to the
@@ -612,16 +767,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
 }
 
 
-/*
- * Synchronous write, straight from __user pointer or user pages.
- *
- * If write spans object boundary, just do multiple writes.  (For a
- * correct atomic write, we should e.g. take write locks on all
- * objects, rollback on failure, etc.)
- */
 static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
-		       struct ceph_snap_context *snapc)
+ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
+		       struct ceph_snap_context *snapc,
+		       struct ceph_cap_flush **pcf)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
@@ -630,44 +779,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	struct page **pages;
-	int num_pages;
-	int written = 0;
+	struct ceph_aio_request *aio_req = NULL;
+	int num_pages = 0;
 	int flags;
-	int check_caps = 0;
 	int ret;
 	struct timespec mtime = CURRENT_TIME;
-	size_t count = iov_iter_count(from);
+	size_t count = iov_iter_count(iter);
+	loff_t pos = iocb->ki_pos;
+	bool write = iov_iter_rw(iter) == WRITE;
 
-	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
+	if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
 		return -EROFS;
 
-	dout("sync_direct_write on file %p %lld~%u\n", file, pos,
-	     (unsigned)count);
+	dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
+	     (write ? "write" : "read"), file, pos, (unsigned)count);
 
 	ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
 	if (ret < 0)
 		return ret;
 
-	ret = invalidate_inode_pages2_range(inode->i_mapping,
-					    pos >> PAGE_CACHE_SHIFT,
-					    (pos + count) >> PAGE_CACHE_SHIFT);
-	if (ret < 0)
-		dout("invalidate_inode_pages2_range returned %d\n", ret);
+	if (write) {
+		ret = invalidate_inode_pages2_range(inode->i_mapping,
+					pos >> PAGE_CACHE_SHIFT,
+					(pos + count) >> PAGE_CACHE_SHIFT);
+		if (ret < 0)
+			dout("invalidate_inode_pages2_range returned %d\n", ret);
 
-	flags = CEPH_OSD_FLAG_ORDERSNAP |
-		CEPH_OSD_FLAG_ONDISK |
-		CEPH_OSD_FLAG_WRITE;
+		flags = CEPH_OSD_FLAG_ORDERSNAP |
+			CEPH_OSD_FLAG_ONDISK |
+			CEPH_OSD_FLAG_WRITE;
+	} else {
+		flags = CEPH_OSD_FLAG_READ;
+	}
 
-	while (iov_iter_count(from) > 0) {
-		u64 len = dio_get_pagev_size(from);
-		size_t start;
-		ssize_t n;
+	while (iov_iter_count(iter) > 0) {
+		u64 size = dio_get_pagev_size(iter);
+		size_t start = 0;
+		ssize_t len;
 
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-					    vino, pos, &len, 0,
-					    2,/*include a 'startsync' command*/
-					    CEPH_OSD_OP_WRITE, flags, snapc,
+					    vino, pos, &size, 0,
+					    /*include a 'startsync' command*/
+					    write ? 2 : 1,
+					    write ? CEPH_OSD_OP_WRITE :
+						    CEPH_OSD_OP_READ,
+					    flags, snapc,
 					    ci->i_truncate_seq,
 					    ci->i_truncate_size,
 					    false);
@@ -676,10 +833,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 			break;
 		}
 
-		osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
-
-		n = len;
-		pages = dio_get_pages_alloc(from, len, &start, &num_pages);
+		len = size;
+		pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
 		if (IS_ERR(pages)) {
 			ceph_osdc_put_request(req);
 			ret = PTR_ERR(pages);
@@ -687,47 +842,128 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
 		}
 
 		/*
-		 * throw out any page cache pages in this range. this
-		 * may block.
+		 * To simplify error handling, allow AIO when IO within i_size
+		 * or IO can be satisfied by single OSD request.
 		 */
-		truncate_inode_pages_range(inode->i_mapping, pos,
-				   (pos+n) | (PAGE_CACHE_SIZE-1));
-		osd_req_op_extent_osd_data_pages(req, 0, pages, n, start,
-						false, false);
+		if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
+		    (len == count || pos + count <= i_size_read(inode))) {
+			aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
+			if (aio_req) {
+				aio_req->iocb = iocb;
+				aio_req->write = write;
+				INIT_LIST_HEAD(&aio_req->osd_reqs);
+				if (write) {
+					aio_req->mtime = mtime;
+					swap(aio_req->prealloc_cf, *pcf);
+				}
+			}
+			/* ignore error */
+		}
+
+		if (write) {
+			/*
+			 * throw out any page cache pages in this range. this
+			 * may block.
+			 */
+			truncate_inode_pages_range(inode->i_mapping, pos,
+					(pos+len) | (PAGE_CACHE_SIZE - 1));
+
+			osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
+		}
+
+
+		osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
+						 false, false);
 
-		/* BUG_ON(vino.snap != CEPH_NOSNAP); */
 		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
-		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+		if (aio_req) {
+			aio_req->total_len += len;
+			aio_req->num_reqs++;
+			atomic_inc(&aio_req->pending_reqs);
+
+			req->r_callback = ceph_aio_complete_req;
+			req->r_inode = inode;
+			req->r_priv = aio_req;
+			list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
+
+			pos += len;
+			iov_iter_advance(iter, len);
+			continue;
+		}
+
+		ret = ceph_osdc_start_request(req->r_osdc, req, false);
 		if (!ret)
 			ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
+		size = i_size_read(inode);
+		if (!write) {
+			if (ret == -ENOENT)
+				ret = 0;
+			if (ret >= 0 && ret < len && pos + ret < size) {
+				int zlen = min_t(size_t, len - ret,
+						 size - pos - ret);
+				ceph_zero_page_vector_range(start + ret, zlen,
+							    pages);
+				ret += zlen;
+			}
+			if (ret >= 0)
+				len = ret;
+		}
+
 		ceph_put_page_vector(pages, num_pages, false);
 
 		ceph_osdc_put_request(req);
-		if (ret)
+		if (ret < 0)
 			break;
-		pos += n;
-		written += n;
-		iov_iter_advance(from, n);
 
-		if (pos > i_size_read(inode)) {
-			check_caps = ceph_inode_set_size(inode, pos);
-			if (check_caps)
+		pos += len;
+		iov_iter_advance(iter, len);
+
+		if (!write && pos >= size)
+			break;
+
+		if (write && pos > size) {
+			if (ceph_inode_set_size(inode, pos))
 				ceph_check_caps(ceph_inode(inode),
 						CHECK_CAPS_AUTHONLY,
 						NULL);
 		}
 	}
 
-	if (ret != -EOLDSNAPC && written > 0) {
+	if (aio_req) {
+		if (aio_req->num_reqs == 0) {
+			kfree(aio_req);
+			return ret;
+		}
+
+		ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
+					      CEPH_CAP_FILE_RD);
+
+		while (!list_empty(&aio_req->osd_reqs)) {
+			req = list_first_entry(&aio_req->osd_reqs,
+					       struct ceph_osd_request,
+					       r_unsafe_item);
+			list_del_init(&req->r_unsafe_item);
+			if (ret >= 0)
+				ret = ceph_osdc_start_request(req->r_osdc,
+							      req, false);
+			if (ret < 0) {
+				BUG_ON(ret == -EOLDSNAPC);
+				req->r_result = ret;
+				ceph_aio_complete_req(req, NULL);
+			}
+		}
+		return -EIOCBQUEUED;
+	}
+
+	if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
+		ret = pos - iocb->ki_pos;
 		iocb->ki_pos = pos;
-		ret = written;
 	}
 	return ret;
 }
 
-
 /*
  * Synchronous write, straight from __user pointer or user pages.
  *
@@ -897,8 +1133,14 @@ again:
 		     ceph_cap_string(got));
 
 		if (ci->i_inline_version == CEPH_INLINE_NONE) {
-			/* hmm, this isn't really async... */
-			ret = ceph_sync_read(iocb, to, &retry_op);
+			if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
+				ret = ceph_direct_read_write(iocb, to,
+							     NULL, NULL);
+				if (ret >= 0 && ret < len)
+					retry_op = CHECK_EOF;
+			} else {
+				ret = ceph_sync_read(iocb, to, &retry_op);
+			}
 		} else {
 			retry_op = READ_INLINE;
 		}
@@ -916,7 +1158,7 @@ again:
 		pinned_page = NULL;
 	}
 	ceph_put_cap_refs(ci, got);
-	if (retry_op && ret >= 0) {
+	if (retry_op > HAVE_RETRIED && ret >= 0) {
 		int statret;
 		struct page *page = NULL;
 		loff_t i_size;
@@ -968,12 +1210,11 @@ again:
 		if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
 		    ret < len) {
 			dout("sync_read hit hole, ppos %lld < size %lld"
-			     ", reading more\n", iocb->ki_pos,
-			     inode->i_size);
+			     ", reading more\n", iocb->ki_pos, i_size);
 
 			read += ret;
 			len -= ret;
-			retry_op = 0;
+			retry_op = HAVE_RETRIED;
 			goto again;
 		}
 	}
@@ -1052,7 +1293,7 @@ retry_snap:
 	}
 
 	dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
-	     inode, ceph_vinop(inode), pos, count, inode->i_size);
+	     inode, ceph_vinop(inode), pos, count, i_size_read(inode));
 	if (fi->fmode & CEPH_FILE_MODE_LAZY)
 		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
 	else
@@ -1088,8 +1329,8 @@ retry_snap:
 		/* we might need to revert back to that point */
 		data = *from;
 		if (iocb->ki_flags & IOCB_DIRECT)
-			written = ceph_sync_direct_write(iocb, &data, pos,
-							 snapc);
+			written = ceph_direct_read_write(iocb, &data, snapc,
+							 &prealloc_cf);
 		else
 			written = ceph_sync_write(iocb, &data, pos, snapc);
 		if (written == -EOLDSNAPC) {
@@ -1104,7 +1345,7 @@ retry_snap:
 			iov_iter_advance(from, written);
 		ceph_put_snap_context(snapc);
 	} else {
-		loff_t old_size = inode->i_size;
+		loff_t old_size = i_size_read(inode);
 		/*
 		 * No need to acquire the i_truncate_mutex. Because
 		 * the MDS revokes Fwb caps before sending truncate
@@ -1115,7 +1356,7 @@ retry_snap:
 		written = generic_perform_write(file, from, pos);
 		if (likely(written >= 0))
 			iocb->ki_pos = pos + written;
-		if (inode->i_size > old_size)
+		if (i_size_read(inode) > old_size)
 			ceph_fscache_update_objectsize(inode);
 		inode_unlock(inode);
 	}
@@ -1160,6 +1401,7 @@ out_unlocked:
 static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
 {
 	struct inode *inode = file->f_mapping->host;
+	loff_t i_size;
 	int ret;
 
 	inode_lock(inode);
@@ -1172,9 +1414,10 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
 		}
 	}
 
+	i_size = i_size_read(inode);
 	switch (whence) {
 	case SEEK_END:
-		offset += inode->i_size;
+		offset += i_size;
 		break;
 	case SEEK_CUR:
 		/*
@@ -1190,17 +1433,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
 		offset += file->f_pos;
 		break;
 	case SEEK_DATA:
-		if (offset >= inode->i_size) {
+		if (offset >= i_size) {
 			ret = -ENXIO;
 			goto out;
 		}
 		break;
 	case SEEK_HOLE:
-		if (offset >= inode->i_size) {
+		if (offset >= i_size) {
 			ret = -ENXIO;
 			goto out;
 		}
-		offset = inode->i_size;
+		offset = i_size;
 		break;
 	}
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index da55eb8bcffa..fb4ba2e4e2a5 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -548,7 +548,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
 	if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
 	    (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
 		dout("size %lld -> %llu\n", inode->i_size, size);
-		inode->i_size = size;
+		i_size_write(inode, size);
 		inode->i_blocks = (size + (1<<9) - 1) >> 9;
 		ci->i_reported_size = size;
 		if (truncate_seq != ci->i_truncate_seq) {
@@ -808,7 +808,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
 			spin_unlock(&ci->i_ceph_lock);
 
 			err = -EINVAL;
-			if (WARN_ON(symlen != inode->i_size))
+			if (WARN_ON(symlen != i_size_read(inode)))
 				goto out;
 
 			err = -ENOMEM;
@@ -1549,7 +1549,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
 
 	spin_lock(&ci->i_ceph_lock);
 	dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
-	inode->i_size = size;
+	i_size_write(inode, size);
 	inode->i_blocks = (size + (1 << 9) - 1) >> 9;
 
 	/* tell the MDS if we are approaching max_size */
@@ -1911,7 +1911,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 		     inode->i_size, attr->ia_size);
 		if ((issued & CEPH_CAP_FILE_EXCL) &&
 		    attr->ia_size > inode->i_size) {
-			inode->i_size = attr->ia_size;
+			i_size_write(inode, attr->ia_size);
 			inode->i_blocks =
 				(attr->ia_size + (1 << 9) - 1) >> 9;
 			inode->i_ctime = attr->ia_ctime;
diff --git a/include/linux/ceph/ceph_frag.h b/include/linux/ceph/ceph_frag.h
index 5babb8e95352..b827e066e55a 100644
--- a/include/linux/ceph/ceph_frag.h
+++ b/include/linux/ceph/ceph_frag.h
@@ -40,46 +40,11 @@ static inline __u32 ceph_frag_mask_shift(__u32 f)
 	return 24 - ceph_frag_bits(f);
 }
 
-static inline int ceph_frag_contains_value(__u32 f, __u32 v)
+static inline bool ceph_frag_contains_value(__u32 f, __u32 v)
 {
 	return (v & ceph_frag_mask(f)) == ceph_frag_value(f);
 }
-static inline int ceph_frag_contains_frag(__u32 f, __u32 sub)
-{
-	/* is sub as specific as us, and contained by us? */
-	return ceph_frag_bits(sub) >= ceph_frag_bits(f) &&
-	       (ceph_frag_value(sub) & ceph_frag_mask(f)) == ceph_frag_value(f);
-}
 
-static inline __u32 ceph_frag_parent(__u32 f)
-{
-	return ceph_frag_make(ceph_frag_bits(f) - 1,
-			 ceph_frag_value(f) & (ceph_frag_mask(f) << 1));
-}
-static inline int ceph_frag_is_left_child(__u32 f)
-{
-	return ceph_frag_bits(f) > 0 &&
-		(ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 0;
-}
-static inline int ceph_frag_is_right_child(__u32 f)
-{
-	return ceph_frag_bits(f) > 0 &&
-		(ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 1;
-}
-static inline __u32 ceph_frag_sibling(__u32 f)
-{
-	return ceph_frag_make(ceph_frag_bits(f),
-		      ceph_frag_value(f) ^ (0x1000000 >> ceph_frag_bits(f)));
-}
-static inline __u32 ceph_frag_left_child(__u32 f)
-{
-	return ceph_frag_make(ceph_frag_bits(f)+1, ceph_frag_value(f));
-}
-static inline __u32 ceph_frag_right_child(__u32 f)
-{
-	return ceph_frag_make(ceph_frag_bits(f)+1,
-	      ceph_frag_value(f) | (0x1000000 >> (1+ceph_frag_bits(f))));
-}
 static inline __u32 ceph_frag_make_child(__u32 f, int by, int i)
 {
 	int newbits = ceph_frag_bits(f) + by;
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 71b1d6cdcb5d..8dbd7879fdc6 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -220,6 +220,7 @@ struct ceph_connection {
 	struct ceph_entity_addr actual_peer_addr;
 
 	/* message out temps */
+	struct ceph_msg_header out_hdr;
 	struct ceph_msg *out_msg;        /* sending message (== tail of
 					    out_sent) */
 	bool out_msg_done;
@@ -229,7 +230,6 @@ struct ceph_connection {
 	int out_kvec_left;   /* kvec's left in out_kvec */
 	int out_skip;        /* skip this many bytes */
 	int out_kvec_bytes;  /* total bytes left */
-	bool out_kvec_is_msg; /* kvec refers to out_msg */
 	int out_more;        /* there is more data after the kvecs */
 	__le64 out_temp_ack; /* for writing an ack */
 	struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 10d87753ed87..9e43a315e662 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -152,7 +152,6 @@ static int process_one_ticket(struct ceph_auth_client *ac,
 	void *ticket_buf = NULL;
 	void *tp, *tpend;
 	void **ptp;
-	struct ceph_timespec new_validity;
 	struct ceph_crypto_key new_session_key;
 	struct ceph_buffer *new_ticket_blob;
 	unsigned long new_expires, new_renew_after;
@@ -193,8 +192,8 @@ static int process_one_ticket(struct ceph_auth_client *ac,
 	if (ret)
 		goto out;
 
-	ceph_decode_copy(&dp, &new_validity, sizeof(new_validity));
-	ceph_decode_timespec(&validity, &new_validity);
+	ceph_decode_timespec(&validity, dp);
+	dp += sizeof(struct ceph_timespec);
 	new_expires = get_seconds() + validity.tv_sec;
 	new_renew_after = new_expires - (validity.tv_sec / 4);
 	dout(" expires=%lu renew_after=%lu\n", new_expires,
@@ -233,10 +232,10 @@ static int process_one_ticket(struct ceph_auth_client *ac,
 		ceph_buffer_put(th->ticket_blob);
 	th->session_key = new_session_key;
 	th->ticket_blob = new_ticket_blob;
-	th->validity = new_validity;
 	th->secret_id = new_secret_id;
 	th->expires = new_expires;
 	th->renew_after = new_renew_after;
+	th->have_key = true;
 	dout(" got ticket service %d (%s) secret_id %lld len %d\n",
 	     type, ceph_entity_type_name(type), th->secret_id,
 	     (int)th->ticket_blob->vec.iov_len);
@@ -384,6 +383,24 @@ bad:
 	return -ERANGE;
 }
 
+static bool need_key(struct ceph_x_ticket_handler *th)
+{
+	if (!th->have_key)
+		return true;
+
+	return get_seconds() >= th->renew_after;
+}
+
+static bool have_key(struct ceph_x_ticket_handler *th)
+{
+	if (th->have_key) {
+		if (get_seconds() >= th->expires)
+			th->have_key = false;
+	}
+
+	return th->have_key;
+}
+
 static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
 {
 	int want = ac->want_keys;
@@ -402,20 +419,18 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
 			continue;
 
 		th = get_ticket_handler(ac, service);
-
 		if (IS_ERR(th)) {
 			*pneed |= service;
 			continue;
 		}
 
-		if (get_seconds() >= th->renew_after)
+		if (need_key(th))
 			*pneed |= service;
-		if (get_seconds() >= th->expires)
+		if (!have_key(th))
 			xi->have_keys &= ~service;
 	}
 }
 
-
 static int ceph_x_build_request(struct ceph_auth_client *ac,
 				void *buf, void *end)
 {
@@ -667,14 +682,26 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
 	ac->private = NULL;
 }
 
-static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
-				   int peer_type)
+static void invalidate_ticket(struct ceph_auth_client *ac, int peer_type)
 {
 	struct ceph_x_ticket_handler *th;
 
 	th = get_ticket_handler(ac, peer_type);
 	if (!IS_ERR(th))
-		memset(&th->validity, 0, sizeof(th->validity));
+		th->have_key = false;
+}
+
+static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
+					 int peer_type)
+{
+	/*
+	 * We are to invalidate a service ticket in the hopes of
+	 * getting a new, hopefully more valid, one.  But, we won't get
+	 * it unless our AUTH ticket is good, so invalidate AUTH ticket
+	 * as well, just in case.
+	 */
+	invalidate_ticket(ac, peer_type);
+	invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH);
 }
 
 static int calcu_signature(struct ceph_x_authorizer *au,
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index e8b7c6917d47..40b1a3cf7397 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -16,7 +16,7 @@ struct ceph_x_ticket_handler {
 	unsigned int service;
 
 	struct ceph_crypto_key session_key;
-	struct ceph_timespec validity;
+	bool have_key;
 
 	u64 secret_id;
 	struct ceph_buffer *ticket_blob;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9981039ef4ff..9cfedf565f5b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -23,9 +23,6 @@
 #include <linux/ceph/pagelist.h>
 #include <linux/export.h>
 
-#define list_entry_next(pos, member)					\
-	list_entry(pos->member.next, typeof(*pos), member)
-
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
  * hosts in the system.  The messenger provides ordered and reliable
@@ -672,6 +669,8 @@ static void reset_connection(struct ceph_connection *con)
 	}
 	con->in_seq = 0;
 	con->in_seq_acked = 0;
+
+	con->out_skip = 0;
 }
 
 /*
@@ -771,6 +770,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
 
 static void con_out_kvec_reset(struct ceph_connection *con)
 {
+	BUG_ON(con->out_skip);
+
 	con->out_kvec_left = 0;
 	con->out_kvec_bytes = 0;
 	con->out_kvec_cur = &con->out_kvec[0];
@@ -779,9 +780,9 @@ static void con_out_kvec_reset(struct ceph_connection *con)
 static void con_out_kvec_add(struct ceph_connection *con,
 				size_t size, void *data)
 {
-	int index;
+	int index = con->out_kvec_left;
 
-	index = con->out_kvec_left;
+	BUG_ON(con->out_skip);
 	BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
 
 	con->out_kvec[index].iov_len = size;
@@ -790,6 +791,27 @@ static void con_out_kvec_add(struct ceph_connection *con,
 	con->out_kvec_bytes += size;
 }
 
+/*
+ * Chop off a kvec from the end.  Return residual number of bytes for
+ * that kvec, i.e. how many bytes would have been written if the kvec
+ * hadn't been nuked.
+ */
+static int con_out_kvec_skip(struct ceph_connection *con)
+{
+	int off = con->out_kvec_cur - con->out_kvec;
+	int skip = 0;
+
+	if (con->out_kvec_bytes > 0) {
+		skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
+		BUG_ON(con->out_kvec_bytes < skip);
+		BUG_ON(!con->out_kvec_left);
+		con->out_kvec_bytes -= skip;
+		con->out_kvec_left--;
+	}
+
+	return skip;
+}
+
 #ifdef CONFIG_BLOCK
 
 /*
@@ -1042,7 +1064,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
 	/* Move on to the next page */
 
 	BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
-	cursor->page = list_entry_next(cursor->page, lru);
+	cursor->page = list_next_entry(cursor->page, lru);
 	cursor->last_piece = cursor->resid <= PAGE_SIZE;
 
 	return true;
@@ -1166,7 +1188,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
 	if (!cursor->resid && cursor->total_resid) {
 		WARN_ON(!cursor->last_piece);
 		BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
-		cursor->data = list_entry_next(cursor->data, links);
+		cursor->data = list_next_entry(cursor->data, links);
 		__ceph_msg_data_cursor_init(cursor);
 		new_piece = true;
 	}
@@ -1197,7 +1219,6 @@ static void prepare_write_message_footer(struct ceph_connection *con)
 	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
 
 	dout("prepare_write_message_footer %p\n", con);
-	con->out_kvec_is_msg = true;
 	con->out_kvec[v].iov_base = &m->footer;
 	if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
 		if (con->ops->sign_message)
@@ -1225,7 +1246,6 @@ static void prepare_write_message(struct ceph_connection *con)
 	u32 crc;
 
 	con_out_kvec_reset(con);
-	con->out_kvec_is_msg = true;
 	con->out_msg_done = false;
 
 	/* Sneak an ack in there first?  If we can get it into the same
@@ -1265,18 +1285,19 @@ static void prepare_write_message(struct ceph_connection *con)
 
 	/* tag + hdr + front + middle */
 	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
-	con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+	con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
 	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 
 	if (m->middle)
 		con_out_kvec_add(con, m->middle->vec.iov_len,
 			m->middle->vec.iov_base);
 
-	/* fill in crc (except data pages), footer */
+	/* fill in hdr crc and finalize hdr */
 	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
 	con->out_msg->hdr.crc = cpu_to_le32(crc);
-	con->out_msg->footer.flags = 0;
+	memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
 
+	/* fill in front and middle crc, footer */
 	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
 	con->out_msg->footer.front_crc = cpu_to_le32(crc);
 	if (m->middle) {
@@ -1288,6 +1309,7 @@ static void prepare_write_message(struct ceph_connection *con)
 	dout("%s front_crc %u middle_crc %u\n", __func__,
 	     le32_to_cpu(con->out_msg->footer.front_crc),
 	     le32_to_cpu(con->out_msg->footer.middle_crc));
+	con->out_msg->footer.flags = 0;
 
 	/* is there a data payload? */
 	con->out_msg->footer.data_crc = 0;
@@ -1492,7 +1514,6 @@ static int write_partial_kvec(struct ceph_connection *con)
 		}
 	}
 	con->out_kvec_left = 0;
-	con->out_kvec_is_msg = false;
 	ret = 1;
 out:
 	dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
@@ -1584,6 +1605,7 @@ static int write_partial_skip(struct ceph_connection *con)
 {
 	int ret;
 
+	dout("%s %p %d left\n", __func__, con, con->out_skip);
 	while (con->out_skip > 0) {
 		size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
 
@@ -2506,13 +2528,13 @@ more:
 
 more_kvec:
 	/* kvec data queued? */
-	if (con->out_skip) {
-		ret = write_partial_skip(con);
+	if (con->out_kvec_left) {
+		ret = write_partial_kvec(con);
 		if (ret <= 0)
 			goto out;
 	}
-	if (con->out_kvec_left) {
-		ret = write_partial_kvec(con);
+	if (con->out_skip) {
+		ret = write_partial_skip(con);
 		if (ret <= 0)
 			goto out;
 	}
@@ -2805,13 +2827,17 @@ static bool con_backoff(struct ceph_connection *con)
 
 static void con_fault_finish(struct ceph_connection *con)
 {
+	dout("%s %p\n", __func__, con);
+
 	/*
 	 * in case we faulted due to authentication, invalidate our
 	 * current tickets so that we can get new ones.
 	 */
-	if (con->auth_retry && con->ops->invalidate_authorizer) {
-		dout("calling invalidate_authorizer()\n");
-		con->ops->invalidate_authorizer(con);
+	if (con->auth_retry) {
+		dout("auth_retry %d, invalidating\n", con->auth_retry);
+		if (con->ops->invalidate_authorizer)
+			con->ops->invalidate_authorizer(con);
+		con->auth_retry = 0;
 	}
 
 	if (con->ops->fault)
@@ -3050,16 +3076,31 @@ void ceph_msg_revoke(struct ceph_msg *msg)
 		ceph_msg_put(msg);
 	}
 	if (con->out_msg == msg) {
-		dout("%s %p msg %p - was sending\n", __func__, con, msg);
-		con->out_msg = NULL;
-		if (con->out_kvec_is_msg) {
-			con->out_skip = con->out_kvec_bytes;
-			con->out_kvec_is_msg = false;
+		BUG_ON(con->out_skip);
+		/* footer */
+		if (con->out_msg_done) {
+			con->out_skip += con_out_kvec_skip(con);
+		} else {
+			BUG_ON(!msg->data_length);
+			if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
+				con->out_skip += sizeof(msg->footer);
+			else
+				con->out_skip += sizeof(msg->old_footer);
 		}
+		/* data, middle, front */
+		if (msg->data_length)
+			con->out_skip += msg->cursor.total_resid;
+		if (msg->middle)
+			con->out_skip += con_out_kvec_skip(con);
+		con->out_skip += con_out_kvec_skip(con);
+
+		dout("%s %p msg %p - was sending, will write %d skip %d\n",
+		     __func__, con, msg, con->out_kvec_bytes, con->out_skip);
 		msg->hdr.seq = 0;
-
+		con->out_msg = NULL;
 		ceph_msg_put(msg);
 	}
+
 	mutex_unlock(&con->mutex);
 }
 
@@ -3361,9 +3402,7 @@ static void ceph_msg_free(struct ceph_msg *m)
 static void ceph_msg_release(struct kref *kref)
 {
 	struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
-	LIST_HEAD(data);
-	struct list_head *links;
-	struct list_head *next;
+	struct ceph_msg_data *data, *next;
 
 	dout("%s %p\n", __func__, m);
 	WARN_ON(!list_empty(&m->list_head));
@@ -3376,12 +3415,8 @@ static void ceph_msg_release(struct kref *kref)
 		m->middle = NULL;
 	}
 
-	list_splice_init(&m->data, &data);
-	list_for_each_safe(links, next, &data) {
-		struct ceph_msg_data *data;
-
-		data = list_entry(links, struct ceph_msg_data, links);
-		list_del_init(links);
+	list_for_each_entry_safe(data, next, &m->data, links) {
+		list_del_init(&data->links);
 		ceph_msg_data_destroy(data);
 	}
 	m->data_length = 0;
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index edda01626a45..de85dddc3dc0 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -364,10 +364,6 @@ static bool have_debugfs_info(struct ceph_mon_client *monc)
 	return monc->client->have_fsid && monc->auth->global_id > 0;
 }
 
-/*
- * The monitor responds with mount ack indicate mount success.  The
- * included client ticket allows the client to talk to MDSs and OSDs.
- */
 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
 				 struct ceph_msg *msg)
 {