summary refs log tree commit diff
path: root/fs/ocfs2
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2')
-rw-r--r--fs/ocfs2/alloc.c101
-rw-r--r--fs/ocfs2/alloc.h2
-rw-r--r--fs/ocfs2/aops.c50
-rw-r--r--fs/ocfs2/aops.h2
-rw-r--r--fs/ocfs2/cluster/heartbeat.c31
-rw-r--r--fs/ocfs2/cluster/nodemanager.c198
-rw-r--r--fs/ocfs2/cluster/nodemanager.h17
-rw-r--r--fs/ocfs2/cluster/quorum.c4
-rw-r--r--fs/ocfs2/cluster/tcp.c240
-rw-r--r--fs/ocfs2/cluster/tcp.h8
-rw-r--r--fs/ocfs2/cluster/tcp_internal.h23
-rw-r--r--fs/ocfs2/dir.c35
-rw-r--r--fs/ocfs2/dir.h2
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h2
-rw-r--r--fs/ocfs2/dlm/dlmdomain.c9
-rw-r--r--fs/ocfs2/dlm/dlmfs.c10
-rw-r--r--fs/ocfs2/dlm/dlmlock.c4
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c4
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c11
-rw-r--r--fs/ocfs2/dlm/userdlm.c10
-rw-r--r--fs/ocfs2/dlmglue.c149
-rw-r--r--fs/ocfs2/dlmglue.h9
-rw-r--r--fs/ocfs2/export.c2
-rw-r--r--fs/ocfs2/extent_map.c2
-rw-r--r--fs/ocfs2/file.c372
-rw-r--r--fs/ocfs2/file.h11
-rw-r--r--fs/ocfs2/heartbeat.c9
-rw-r--r--fs/ocfs2/inode.c36
-rw-r--r--fs/ocfs2/inode.h11
-rw-r--r--fs/ocfs2/ioctl.c10
-rw-r--r--fs/ocfs2/journal.c318
-rw-r--r--fs/ocfs2/journal.h85
-rw-r--r--fs/ocfs2/localalloc.c128
-rw-r--r--fs/ocfs2/localalloc.h3
-rw-r--r--fs/ocfs2/mmap.c17
-rw-r--r--fs/ocfs2/namei.c304
-rw-r--r--fs/ocfs2/namei.h2
-rw-r--r--fs/ocfs2/ocfs2.h11
-rw-r--r--fs/ocfs2/ocfs2_fs.h14
-rw-r--r--fs/ocfs2/slot_map.c2
-rw-r--r--fs/ocfs2/suballoc.c180
-rw-r--r--fs/ocfs2/suballoc.h16
-rw-r--r--fs/ocfs2/super.c140
-rw-r--r--fs/ocfs2/symlink.c4
-rw-r--r--fs/ocfs2/uptodate.c2
-rw-r--r--fs/ocfs2/vote.c7
46 files changed, 1488 insertions, 1119 deletions
diff --git a/fs/ocfs2/alloc.c b/fs/ocfs2/alloc.c
index f43bc5f18a35..f27e5378caf2 100644
--- a/fs/ocfs2/alloc.c
+++ b/fs/ocfs2/alloc.c
@@ -52,14 +52,14 @@ static int ocfs2_extent_contig(struct inode *inode,
 			       u64 blkno);
 
 static int ocfs2_create_new_meta_bhs(struct ocfs2_super *osb,
-				     struct ocfs2_journal_handle *handle,
+				     handle_t *handle,
 				     struct inode *inode,
 				     int wanted,
 				     struct ocfs2_alloc_context *meta_ac,
 				     struct buffer_head *bhs[]);
 
 static int ocfs2_add_branch(struct ocfs2_super *osb,
-			    struct ocfs2_journal_handle *handle,
+			    handle_t *handle,
 			    struct inode *inode,
 			    struct buffer_head *fe_bh,
 			    struct buffer_head *eb_bh,
@@ -67,14 +67,14 @@ static int ocfs2_add_branch(struct ocfs2_super *osb,
 			    struct ocfs2_alloc_context *meta_ac);
 
 static int ocfs2_shift_tree_depth(struct ocfs2_super *osb,
-				  struct ocfs2_journal_handle *handle,
+				  handle_t *handle,
 				  struct inode *inode,
 				  struct buffer_head *fe_bh,
 				  struct ocfs2_alloc_context *meta_ac,
 				  struct buffer_head **ret_new_eb_bh);
 
 static int ocfs2_do_insert_extent(struct ocfs2_super *osb,
-				  struct ocfs2_journal_handle *handle,
+				  handle_t *handle,
 				  struct inode *inode,
 				  struct buffer_head *fe_bh,
 				  u64 blkno,
@@ -152,7 +152,7 @@ bail:
  * l_count for you
  */
 static int ocfs2_create_new_meta_bhs(struct ocfs2_super *osb,
-				     struct ocfs2_journal_handle *handle,
+				     handle_t *handle,
 				     struct inode *inode,
 				     int wanted,
 				     struct ocfs2_alloc_context *meta_ac,
@@ -253,7 +253,7 @@ bail:
  * contain a single record with e_clusters == 0.
  */
 static int ocfs2_add_branch(struct ocfs2_super *osb,
-			    struct ocfs2_journal_handle *handle,
+			    handle_t *handle,
 			    struct inode *inode,
 			    struct buffer_head *fe_bh,
 			    struct buffer_head *eb_bh,
@@ -418,7 +418,7 @@ bail:
  * after this call.
  */
 static int ocfs2_shift_tree_depth(struct ocfs2_super *osb,
-				  struct ocfs2_journal_handle *handle,
+				  handle_t *handle,
 				  struct inode *inode,
 				  struct buffer_head *fe_bh,
 				  struct ocfs2_alloc_context *meta_ac,
@@ -520,7 +520,7 @@ bail:
  * down.
  */
 static int ocfs2_do_insert_extent(struct ocfs2_super *osb,
-				  struct ocfs2_journal_handle *handle,
+				  handle_t *handle,
 				  struct inode *inode,
 				  struct buffer_head *fe_bh,
 				  u64 start_blk,
@@ -809,7 +809,7 @@ bail:
 
 /* the caller needs to update fe->i_clusters */
 int ocfs2_insert_extent(struct ocfs2_super *osb,
-			struct ocfs2_journal_handle *handle,
+			handle_t *handle,
 			struct inode *inode,
 			struct buffer_head *fe_bh,
 			u64 start_blk,
@@ -951,7 +951,7 @@ static int ocfs2_truncate_log_can_coalesce(struct ocfs2_truncate_log *tl,
 }
 
 static int ocfs2_truncate_log_append(struct ocfs2_super *osb,
-				     struct ocfs2_journal_handle *handle,
+				     handle_t *handle,
 				     u64 start_blk,
 				     unsigned int num_clusters)
 {
@@ -1034,7 +1034,7 @@ bail:
 }
 
 static int ocfs2_replay_truncate_records(struct ocfs2_super *osb,
-					 struct ocfs2_journal_handle *handle,
+					 handle_t *handle,
 					 struct inode *data_alloc_inode,
 					 struct buffer_head *data_alloc_bh)
 {
@@ -1113,7 +1113,7 @@ static int __ocfs2_flush_truncate_log(struct ocfs2_super *osb)
 {
 	int status;
 	unsigned int num_to_flush;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle;
 	struct inode *tl_inode = osb->osb_tl_inode;
 	struct inode *data_alloc_inode = NULL;
 	struct buffer_head *tl_bh = osb->osb_tl_bh;
@@ -1130,7 +1130,7 @@ static int __ocfs2_flush_truncate_log(struct ocfs2_super *osb)
 	if (!OCFS2_IS_VALID_DINODE(di)) {
 		OCFS2_RO_ON_INVALID_DINODE(osb->sb, di);
 		status = -EIO;
-		goto bail;
+		goto out;
 	}
 
 	num_to_flush = le16_to_cpu(tl->tl_used);
@@ -1138,14 +1138,7 @@ static int __ocfs2_flush_truncate_log(struct ocfs2_super *osb)
 	     num_to_flush, (unsigned long long)OCFS2_I(tl_inode)->ip_blkno);
 	if (!num_to_flush) {
 		status = 0;
-		goto bail;
-	}
-
-	handle = ocfs2_alloc_handle(osb);
-	if (!handle) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
+		goto out;
 	}
 
 	data_alloc_inode = ocfs2_get_system_file_inode(osb,
@@ -1154,41 +1147,40 @@ static int __ocfs2_flush_truncate_log(struct ocfs2_super *osb)
 	if (!data_alloc_inode) {
 		status = -EINVAL;
 		mlog(ML_ERROR, "Could not get bitmap inode!\n");
-		goto bail;
+		goto out;
 	}
 
-	ocfs2_handle_add_inode(handle, data_alloc_inode);
-	status = ocfs2_meta_lock(data_alloc_inode, handle, &data_alloc_bh, 1);
+	mutex_lock(&data_alloc_inode->i_mutex);
+
+	status = ocfs2_meta_lock(data_alloc_inode, &data_alloc_bh, 1);
 	if (status < 0) {
 		mlog_errno(status);
-		goto bail;
+		goto out_mutex;
 	}
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_TRUNCATE_LOG_UPDATE);
+	handle = ocfs2_start_trans(osb, OCFS2_TRUNCATE_LOG_UPDATE);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
-		handle = NULL;
 		mlog_errno(status);
-		goto bail;
+		goto out_unlock;
 	}
 
 	status = ocfs2_replay_truncate_records(osb, handle, data_alloc_inode,
 					       data_alloc_bh);
-	if (status < 0) {
+	if (status < 0)
 		mlog_errno(status);
-		goto bail;
-	}
 
-bail:
-	if (handle)
-		ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 
-	if (data_alloc_inode)
-		iput(data_alloc_inode);
+out_unlock:
+	brelse(data_alloc_bh);
+	ocfs2_meta_unlock(data_alloc_inode, 1);
 
-	if (data_alloc_bh)
-		brelse(data_alloc_bh);
+out_mutex:
+	mutex_unlock(&data_alloc_inode->i_mutex);
+	iput(data_alloc_inode);
 
+out:
 	mlog_exit(status);
 	return status;
 }
@@ -1205,10 +1197,12 @@ int ocfs2_flush_truncate_log(struct ocfs2_super *osb)
 	return status;
 }
 
-static void ocfs2_truncate_log_worker(void *data)
+static void ocfs2_truncate_log_worker(struct work_struct *work)
 {
 	int status;
-	struct ocfs2_super *osb = data;
+	struct ocfs2_super *osb =
+		container_of(work, struct ocfs2_super,
+			     osb_truncate_log_wq.work);
 
 	mlog_entry_void();
 
@@ -1347,7 +1341,7 @@ int ocfs2_complete_truncate_log_recovery(struct ocfs2_super *osb,
 	int i;
 	unsigned int clusters, num_recs, start_cluster;
 	u64 start_blk;
-	struct ocfs2_journal_handle *handle;
+	handle_t *handle;
 	struct inode *tl_inode = osb->osb_tl_inode;
 	struct ocfs2_truncate_log *tl;
 
@@ -1373,8 +1367,7 @@ int ocfs2_complete_truncate_log_recovery(struct ocfs2_super *osb,
 			}
 		}
 
-		handle = ocfs2_start_trans(osb, NULL,
-					   OCFS2_TRUNCATE_LOG_UPDATE);
+		handle = ocfs2_start_trans(osb, OCFS2_TRUNCATE_LOG_UPDATE);
 		if (IS_ERR(handle)) {
 			status = PTR_ERR(handle);
 			mlog_errno(status);
@@ -1387,7 +1380,7 @@ int ocfs2_complete_truncate_log_recovery(struct ocfs2_super *osb,
 
 		status = ocfs2_truncate_log_append(osb, handle,
 						   start_blk, clusters);
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
 		if (status < 0) {
 			mlog_errno(status);
 			goto bail_up;
@@ -1441,7 +1434,8 @@ int ocfs2_truncate_log_init(struct ocfs2_super *osb)
 	/* ocfs2_truncate_log_shutdown keys on the existence of
 	 * osb->osb_tl_inode so we don't set any of the osb variables
 	 * until we're sure all is well. */
-	INIT_WORK(&osb->osb_truncate_log_wq, ocfs2_truncate_log_worker, osb);
+	INIT_DELAYED_WORK(&osb->osb_truncate_log_wq,
+			  ocfs2_truncate_log_worker);
 	osb->osb_tl_bh    = tl_bh;
 	osb->osb_tl_inode = tl_inode;
 
@@ -1543,7 +1537,7 @@ static int ocfs2_do_truncate(struct ocfs2_super *osb,
 			     struct inode *inode,
 			     struct buffer_head *fe_bh,
 			     struct buffer_head *old_last_eb_bh,
-			     struct ocfs2_journal_handle *handle,
+			     handle_t *handle,
 			     struct ocfs2_truncate_context *tc)
 {
 	int status, i, depth;
@@ -1782,7 +1776,7 @@ int ocfs2_commit_truncate(struct ocfs2_super *osb,
 	struct ocfs2_extent_block *eb;
 	struct ocfs2_extent_list *el;
 	struct buffer_head *last_eb_bh;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct inode *tl_inode = osb->osb_tl_inode;
 
 	mlog_entry_void();
@@ -1868,7 +1862,7 @@ start:
 
 	credits = ocfs2_calc_tree_trunc_credits(osb->sb, clusters_to_del,
 						fe, el);
-	handle = ocfs2_start_trans(osb, NULL, credits);
+	handle = ocfs2_start_trans(osb, credits);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -1891,7 +1885,7 @@ start:
 	mutex_unlock(&tl_inode->i_mutex);
 	tl_sem = 0;
 
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 	handle = NULL;
 
 	BUG_ON(le32_to_cpu(fe->i_clusters) < target_i_clusters);
@@ -1906,7 +1900,7 @@ bail:
 		mutex_unlock(&tl_inode->i_mutex);
 
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
 
 	if (last_eb_bh)
 		brelse(last_eb_bh);
@@ -1965,7 +1959,7 @@ int ocfs2_prepare_truncate(struct ocfs2_super *osb,
 		goto bail;
 	}
 
-	*tc = kcalloc(1, sizeof(struct ocfs2_truncate_context), GFP_KERNEL);
+	*tc = kzalloc(sizeof(struct ocfs2_truncate_context), GFP_KERNEL);
 	if (!(*tc)) {
 		status = -ENOMEM;
 		mlog_errno(status);
@@ -2011,10 +2005,7 @@ int ocfs2_prepare_truncate(struct ocfs2_super *osb,
 		mutex_lock(&ext_alloc_inode->i_mutex);
 		(*tc)->tc_ext_alloc_inode = ext_alloc_inode;
 
-		status = ocfs2_meta_lock(ext_alloc_inode,
-					 NULL,
-					 &ext_alloc_bh,
-					 1);
+		status = ocfs2_meta_lock(ext_alloc_inode, &ext_alloc_bh, 1);
 		if (status < 0) {
 			mlog_errno(status);
 			goto bail;
diff --git a/fs/ocfs2/alloc.h b/fs/ocfs2/alloc.h
index 12ba897743f4..0b82e8044325 100644
--- a/fs/ocfs2/alloc.h
+++ b/fs/ocfs2/alloc.h
@@ -28,7 +28,7 @@
 
 struct ocfs2_alloc_context;
 int ocfs2_insert_extent(struct ocfs2_super *osb,
-			struct ocfs2_journal_handle *handle,
+			handle_t *handle,
 			struct inode *inode,
 			struct buffer_head *fe_bh,
 			u64 blkno,
diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c
index 3d7c082a8f58..93628b02ef5d 100644
--- a/fs/ocfs2/aops.c
+++ b/fs/ocfs2/aops.c
@@ -200,7 +200,7 @@ static int ocfs2_readpage(struct file *file, struct page *page)
 
 	mlog_entry("(0x%p, %lu)\n", file, (page ? page->index : 0));
 
-	ret = ocfs2_meta_lock_with_page(inode, NULL, NULL, 0, page);
+	ret = ocfs2_meta_lock_with_page(inode, NULL, 0, page);
 	if (ret != 0) {
 		if (ret == AOP_TRUNCATED_PAGE)
 			unlock = 0;
@@ -305,7 +305,7 @@ static int ocfs2_prepare_write(struct file *file, struct page *page,
 
 	mlog_entry("(0x%p, 0x%p, %u, %u)\n", file, page, from, to);
 
-	ret = ocfs2_meta_lock_with_page(inode, NULL, NULL, 0, page);
+	ret = ocfs2_meta_lock_with_page(inode, NULL, 0, page);
 	if (ret != 0) {
 		mlog_errno(ret);
 		goto out;
@@ -355,16 +355,16 @@ static int walk_page_buffers(	handle_t *handle,
 	return ret;
 }
 
-struct ocfs2_journal_handle *ocfs2_start_walk_page_trans(struct inode *inode,
+handle_t *ocfs2_start_walk_page_trans(struct inode *inode,
 							 struct page *page,
 							 unsigned from,
 							 unsigned to)
 {
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	int ret = 0;
 
-	handle = ocfs2_start_trans(osb, NULL, OCFS2_INODE_UPDATE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
 	if (!handle) {
 		ret = -ENOMEM;
 		mlog_errno(ret);
@@ -372,7 +372,7 @@ struct ocfs2_journal_handle *ocfs2_start_walk_page_trans(struct inode *inode,
 	}
 
 	if (ocfs2_should_order_data(inode)) {
-		ret = walk_page_buffers(handle->k_handle,
+		ret = walk_page_buffers(handle,
 					page_buffers(page),
 					from, to, NULL,
 					ocfs2_journal_dirty_data);
@@ -382,7 +382,7 @@ struct ocfs2_journal_handle *ocfs2_start_walk_page_trans(struct inode *inode,
 out:
 	if (ret) {
 		if (handle)
-			ocfs2_commit_trans(handle);
+			ocfs2_commit_trans(osb, handle);
 		handle = ERR_PTR(ret);
 	}
 	return handle;
@@ -394,7 +394,7 @@ static int ocfs2_commit_write(struct file *file, struct page *page,
 	int ret;
 	struct buffer_head *di_bh = NULL;
 	struct inode *inode = page->mapping->host;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct ocfs2_dinode *di;
 
 	mlog_entry("(0x%p, 0x%p, %u, %u)\n", file, page, from, to);
@@ -412,7 +412,7 @@ static int ocfs2_commit_write(struct file *file, struct page *page,
 	 *    stale inode allocation image (i_size, i_clusters, etc).
 	 */
 
-	ret = ocfs2_meta_lock_with_page(inode, NULL, &di_bh, 1, page);
+	ret = ocfs2_meta_lock_with_page(inode, &di_bh, 1, page);
 	if (ret != 0) {
 		mlog_errno(ret);
 		goto out;
@@ -464,7 +464,7 @@ static int ocfs2_commit_write(struct file *file, struct page *page,
 	}
 
 out_commit:
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(OCFS2_SB(inode->i_sb), handle);
 out_unlock_data:
 	ocfs2_data_unlock(inode, 1);
 out_unlock_meta:
@@ -490,7 +490,7 @@ static sector_t ocfs2_bmap(struct address_space *mapping, sector_t block)
 	 * accessed concurrently from multiple nodes.
 	 */
 	if (!INODE_JOURNAL(inode)) {
-		err = ocfs2_meta_lock(inode, NULL, NULL, 0);
+		err = ocfs2_meta_lock(inode, NULL, 0);
 		if (err) {
 			if (err != -ENOENT)
 				mlog_errno(err);
@@ -540,8 +540,7 @@ static int ocfs2_direct_IO_get_blocks(struct inode *inode, sector_t iblock,
 				     struct buffer_head *bh_result, int create)
 {
 	int ret;
-	u64 vbo_max; /* file offset, max_blocks from iblock */
-	u64 p_blkno;
+	u64 p_blkno, inode_blocks;
 	int contig_blocks;
 	unsigned char blocksize_bits = inode->i_sb->s_blocksize_bits;
 	unsigned long max_blocks = bh_result->b_size >> inode->i_blkbits;
@@ -550,12 +549,23 @@ static int ocfs2_direct_IO_get_blocks(struct inode *inode, sector_t iblock,
 	 * nicely aligned and of the right size, so there's no need
 	 * for us to check any of that. */
 
-	vbo_max = ((u64)iblock + max_blocks) << blocksize_bits;
-
 	spin_lock(&OCFS2_I(inode)->ip_lock);
-	if ((iblock + max_blocks) >
-	    ocfs2_clusters_to_blocks(inode->i_sb,
-				     OCFS2_I(inode)->ip_clusters)) {
+	inode_blocks = ocfs2_clusters_to_blocks(inode->i_sb,
+						OCFS2_I(inode)->ip_clusters);
+
+	/*
+	 * For a read which begins past the end of file, we return a hole.
+	 */
+	if (!create && (iblock >= inode_blocks)) {
+		spin_unlock(&OCFS2_I(inode)->ip_lock);
+		ret = 0;
+		goto bail;
+	}
+
+	/*
+	 * Any write past EOF is not allowed because we'd be extending.
+	 */
+	if (create && (iblock + max_blocks) > inode_blocks) {
 		spin_unlock(&OCFS2_I(inode)->ip_lock);
 		ret = -EIO;
 		goto bail;
@@ -595,7 +605,7 @@ static void ocfs2_dio_end_io(struct kiocb *iocb,
 			     ssize_t bytes,
 			     void *private)
 {
-	struct inode *inode = iocb->ki_filp->f_dentry->d_inode;
+	struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode;
 
 	/* this io's submitter should not have unlocked this before we could */
 	BUG_ON(!ocfs2_iocb_is_rw_locked(iocb));
@@ -611,7 +621,7 @@ static ssize_t ocfs2_direct_IO(int rw,
 			       unsigned long nr_segs)
 {
 	struct file *file = iocb->ki_filp;
-	struct inode *inode = file->f_dentry->d_inode->i_mapping->host;
+	struct inode *inode = file->f_path.dentry->d_inode->i_mapping->host;
 	int ret;
 
 	mlog_entry_void();
diff --git a/fs/ocfs2/aops.h b/fs/ocfs2/aops.h
index e88c3f0b8fa9..f446a15eab88 100644
--- a/fs/ocfs2/aops.h
+++ b/fs/ocfs2/aops.h
@@ -25,7 +25,7 @@
 int ocfs2_prepare_write_nolock(struct inode *inode, struct page *page,
 			       unsigned from, unsigned to);
 
-struct ocfs2_journal_handle *ocfs2_start_walk_page_trans(struct inode *inode,
+handle_t *ocfs2_start_walk_page_trans(struct inode *inode,
 							 struct page *page,
 							 unsigned from,
 							 unsigned to);
diff --git a/fs/ocfs2/cluster/heartbeat.c b/fs/ocfs2/cluster/heartbeat.c
index 305cba3681fe..277ca67a2ad6 100644
--- a/fs/ocfs2/cluster/heartbeat.c
+++ b/fs/ocfs2/cluster/heartbeat.c
@@ -141,7 +141,7 @@ struct o2hb_region {
 	 * recognizes a node going up and down in one iteration */
 	u64			hr_generation;
 
-	struct work_struct	hr_write_timeout_work;
+	struct delayed_work	hr_write_timeout_work;
 	unsigned long		hr_last_timeout_start;
 
 	/* Used during o2hb_check_slot to hold a copy of the block
@@ -156,9 +156,11 @@ struct o2hb_bio_wait_ctxt {
 	int               wc_error;
 };
 
-static void o2hb_write_timeout(void *arg)
+static void o2hb_write_timeout(struct work_struct *work)
 {
-	struct o2hb_region *reg = arg;
+	struct o2hb_region *reg =
+		container_of(work, struct o2hb_region,
+			     hr_write_timeout_work.work);
 
 	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
 	     "milliseconds\n", reg->hr_dev_name,
@@ -1404,7 +1406,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
 		goto out;
 	}
 
-	INIT_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout, reg);
+	INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
 
 	/*
 	 * A node is considered live after it has beat LIVE_THRESHOLD
@@ -1445,6 +1447,15 @@ out:
 	return ret;
 }
 
+static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
+                                      char *page)
+{
+	if (!reg->hr_task)
+		return 0;
+
+	return sprintf(page, "%u\n", reg->hr_task->pid);
+}
+
 struct o2hb_region_attribute {
 	struct configfs_attribute attr;
 	ssize_t (*show)(struct o2hb_region *, char *);
@@ -1483,11 +1494,19 @@ static struct o2hb_region_attribute o2hb_region_attr_dev = {
 	.store	= o2hb_region_dev_write,
 };
 
+static struct o2hb_region_attribute o2hb_region_attr_pid = {
+       .attr   = { .ca_owner = THIS_MODULE,
+                   .ca_name = "pid",
+                   .ca_mode = S_IRUGO | S_IRUSR },
+       .show   = o2hb_region_pid_read,
+};
+
 static struct configfs_attribute *o2hb_region_attrs[] = {
 	&o2hb_region_attr_block_bytes.attr,
 	&o2hb_region_attr_start_block.attr,
 	&o2hb_region_attr_blocks.attr,
 	&o2hb_region_attr_dev.attr,
+	&o2hb_region_attr_pid.attr,
 	NULL,
 };
 
@@ -1551,7 +1570,7 @@ static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *g
 	struct o2hb_region *reg = NULL;
 	struct config_item *ret = NULL;
 
-	reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL);
+	reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
 	if (reg == NULL)
 		goto out; /* ENOMEM */
 
@@ -1677,7 +1696,7 @@ struct config_group *o2hb_alloc_hb_set(void)
 	struct o2hb_heartbeat_group *hs = NULL;
 	struct config_group *ret = NULL;
 
-	hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
+	hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
 	if (hs == NULL)
 		goto out;
 
diff --git a/fs/ocfs2/cluster/nodemanager.c b/fs/ocfs2/cluster/nodemanager.c
index d11753c50bc1..b17333a0606b 100644
--- a/fs/ocfs2/cluster/nodemanager.c
+++ b/fs/ocfs2/cluster/nodemanager.c
@@ -35,7 +35,7 @@
 /* for now we operate under the assertion that there can be only one
  * cluster active at a time.  Changing this will require trickling
  * cluster references throughout where nodes are looked up */
-static struct o2nm_cluster *o2nm_single_cluster = NULL;
+struct o2nm_cluster *o2nm_single_cluster = NULL;
 
 #define OCFS2_MAX_HB_CTL_PATH 256
 static char ocfs2_hb_ctl_path[OCFS2_MAX_HB_CTL_PATH] = "/sbin/ocfs2_hb_ctl";
@@ -97,17 +97,6 @@ const char *o2nm_get_hb_ctl_path(void)
 }
 EXPORT_SYMBOL_GPL(o2nm_get_hb_ctl_path);
 
-struct o2nm_cluster {
-	struct config_group	cl_group;
-	unsigned		cl_has_local:1;
-	u8			cl_local_node;
-	rwlock_t		cl_nodes_lock;
-	struct o2nm_node  	*cl_nodes[O2NM_MAX_NODES];
-	struct rb_root		cl_node_ip_tree;
-	/* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */
-	unsigned long	cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
-};
-
 struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
 {
 	struct o2nm_node *node = NULL;
@@ -543,6 +532,179 @@ static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
 }
 #endif
 
+struct o2nm_cluster_attribute {
+	struct configfs_attribute attr;
+	ssize_t (*show)(struct o2nm_cluster *, char *);
+	ssize_t (*store)(struct o2nm_cluster *, const char *, size_t);
+};
+
+static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
+                                       unsigned int *val)
+{
+	unsigned long tmp;
+	char *p = (char *)page;
+
+	tmp = simple_strtoul(p, &p, 0);
+	if (!p || (*p && (*p != '\n')))
+		return -EINVAL;
+
+	if (tmp == 0)
+		return -EINVAL;
+	if (tmp >= (u32)-1)
+		return -ERANGE;
+
+	*val = tmp;
+
+	return count;
+}
+
+static ssize_t o2nm_cluster_attr_idle_timeout_ms_read(
+	struct o2nm_cluster *cluster, char *page)
+{
+	return sprintf(page, "%u\n", cluster->cl_idle_timeout_ms);
+}
+
+static ssize_t o2nm_cluster_attr_idle_timeout_ms_write(
+	struct o2nm_cluster *cluster, const char *page, size_t count)
+{
+	ssize_t ret;
+	unsigned int val;
+
+	ret =  o2nm_cluster_attr_write(page, count, &val);
+
+	if (ret > 0) {
+		if (cluster->cl_idle_timeout_ms != val
+			&& o2net_num_connected_peers()) {
+			mlog(ML_NOTICE,
+			     "o2net: cannot change idle timeout after "
+			     "the first peer has agreed to it."
+			     "  %d connected peers\n",
+			     o2net_num_connected_peers());
+			ret = -EINVAL;
+		} else if (val <= cluster->cl_keepalive_delay_ms) {
+			mlog(ML_NOTICE, "o2net: idle timeout must be larger "
+			     "than keepalive delay\n");
+			ret = -EINVAL;
+		} else {
+			cluster->cl_idle_timeout_ms = val;
+		}
+	}
+
+	return ret;
+}
+
+static ssize_t o2nm_cluster_attr_keepalive_delay_ms_read(
+	struct o2nm_cluster *cluster, char *page)
+{
+	return sprintf(page, "%u\n", cluster->cl_keepalive_delay_ms);
+}
+
+static ssize_t o2nm_cluster_attr_keepalive_delay_ms_write(
+	struct o2nm_cluster *cluster, const char *page, size_t count)
+{
+	ssize_t ret;
+	unsigned int val;
+
+	ret =  o2nm_cluster_attr_write(page, count, &val);
+
+	if (ret > 0) {
+		if (cluster->cl_keepalive_delay_ms != val
+		    && o2net_num_connected_peers()) {
+			mlog(ML_NOTICE,
+			     "o2net: cannot change keepalive delay after"
+			     " the first peer has agreed to it."
+			     "  %d connected peers\n",
+			     o2net_num_connected_peers());
+			ret = -EINVAL;
+		} else if (val >= cluster->cl_idle_timeout_ms) {
+			mlog(ML_NOTICE, "o2net: keepalive delay must be "
+			     "smaller than idle timeout\n");
+			ret = -EINVAL;
+		} else {
+			cluster->cl_keepalive_delay_ms = val;
+		}
+	}
+
+	return ret;
+}
+
+static ssize_t o2nm_cluster_attr_reconnect_delay_ms_read(
+	struct o2nm_cluster *cluster, char *page)
+{
+	return sprintf(page, "%u\n", cluster->cl_reconnect_delay_ms);
+}
+
+static ssize_t o2nm_cluster_attr_reconnect_delay_ms_write(
+	struct o2nm_cluster *cluster, const char *page, size_t count)
+{
+	return o2nm_cluster_attr_write(page, count,
+	                               &cluster->cl_reconnect_delay_ms);
+}
+static struct o2nm_cluster_attribute o2nm_cluster_attr_idle_timeout_ms = {
+	.attr	= { .ca_owner = THIS_MODULE,
+		    .ca_name = "idle_timeout_ms",
+		    .ca_mode = S_IRUGO | S_IWUSR },
+	.show	= o2nm_cluster_attr_idle_timeout_ms_read,
+	.store	= o2nm_cluster_attr_idle_timeout_ms_write,
+};
+
+static struct o2nm_cluster_attribute o2nm_cluster_attr_keepalive_delay_ms = {
+	.attr	= { .ca_owner = THIS_MODULE,
+		    .ca_name = "keepalive_delay_ms",
+		    .ca_mode = S_IRUGO | S_IWUSR },
+	.show	= o2nm_cluster_attr_keepalive_delay_ms_read,
+	.store	= o2nm_cluster_attr_keepalive_delay_ms_write,
+};
+
+static struct o2nm_cluster_attribute o2nm_cluster_attr_reconnect_delay_ms = {
+	.attr	= { .ca_owner = THIS_MODULE,
+		    .ca_name = "reconnect_delay_ms",
+		    .ca_mode = S_IRUGO | S_IWUSR },
+	.show	= o2nm_cluster_attr_reconnect_delay_ms_read,
+	.store	= o2nm_cluster_attr_reconnect_delay_ms_write,
+};
+
+static struct configfs_attribute *o2nm_cluster_attrs[] = {
+	&o2nm_cluster_attr_idle_timeout_ms.attr,
+	&o2nm_cluster_attr_keepalive_delay_ms.attr,
+	&o2nm_cluster_attr_reconnect_delay_ms.attr,
+	NULL,
+};
+static ssize_t o2nm_cluster_show(struct config_item *item,
+                                 struct configfs_attribute *attr,
+                                 char *page)
+{
+	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
+	struct o2nm_cluster_attribute *o2nm_cluster_attr =
+		container_of(attr, struct o2nm_cluster_attribute, attr);
+	ssize_t ret = 0;
+
+	if (o2nm_cluster_attr->show)
+		ret = o2nm_cluster_attr->show(cluster, page);
+	return ret;
+}
+
+static ssize_t o2nm_cluster_store(struct config_item *item,
+                                  struct configfs_attribute *attr,
+                                  const char *page, size_t count)
+{
+	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
+	struct o2nm_cluster_attribute *o2nm_cluster_attr =
+		container_of(attr, struct o2nm_cluster_attribute, attr);
+	ssize_t ret;
+
+	if (o2nm_cluster_attr->store == NULL) {
+		ret = -EINVAL;
+		goto out;
+	}
+
+	ret = o2nm_cluster_attr->store(cluster, page, count);
+	if (ret < count)
+		goto out;
+out:
+	return ret;
+}
+
 static struct config_item *o2nm_node_group_make_item(struct config_group *group,
 						     const char *name)
 {
@@ -552,7 +714,7 @@ static struct config_item *o2nm_node_group_make_item(struct config_group *group,
 	if (strlen(name) > O2NM_MAX_NAME_LEN)
 		goto out; /* ENAMETOOLONG */
 
-	node = kcalloc(1, sizeof(struct o2nm_node), GFP_KERNEL);
+	node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL);
 	if (node == NULL)
 		goto out; /* ENOMEM */
 
@@ -624,10 +786,13 @@ static void o2nm_cluster_release(struct config_item *item)
 
 static struct configfs_item_operations o2nm_cluster_item_ops = {
 	.release	= o2nm_cluster_release,
+	.show_attribute		= o2nm_cluster_show,
+	.store_attribute	= o2nm_cluster_store,
 };
 
 static struct config_item_type o2nm_cluster_type = {
 	.ct_item_ops	= &o2nm_cluster_item_ops,
+	.ct_attrs	= o2nm_cluster_attrs,
 	.ct_owner	= THIS_MODULE,
 };
 
@@ -660,8 +825,8 @@ static struct config_group *o2nm_cluster_group_make_group(struct config_group *g
 	if (o2nm_single_cluster)
 		goto out; /* ENOSPC */
 
-	cluster = kcalloc(1, sizeof(struct o2nm_cluster), GFP_KERNEL);
-	ns = kcalloc(1, sizeof(struct o2nm_node_group), GFP_KERNEL);
+	cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL);
+	ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL);
 	defs = kcalloc(3, sizeof(struct config_group *), GFP_KERNEL);
 	o2hb_group = o2hb_alloc_hb_set();
 	if (cluster == NULL || ns == NULL || o2hb_group == NULL || defs == NULL)
@@ -678,6 +843,9 @@ static struct config_group *o2nm_cluster_group_make_group(struct config_group *g
 	cluster->cl_group.default_groups[2] = NULL;
 	rwlock_init(&cluster->cl_nodes_lock);
 	cluster->cl_node_ip_tree = RB_ROOT;
+	cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
+	cluster->cl_idle_timeout_ms    = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
+	cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
 
 	ret = &cluster->cl_group;
 	o2nm_single_cluster = cluster;
diff --git a/fs/ocfs2/cluster/nodemanager.h b/fs/ocfs2/cluster/nodemanager.h
index fce8033c310f..8fb23cacc2f5 100644
--- a/fs/ocfs2/cluster/nodemanager.h
+++ b/fs/ocfs2/cluster/nodemanager.h
@@ -53,6 +53,23 @@ struct o2nm_node {
 	unsigned long		nd_set_attributes;
 };
 
+struct o2nm_cluster {
+	struct config_group	cl_group;
+	unsigned		cl_has_local:1;
+	u8			cl_local_node;
+	rwlock_t		cl_nodes_lock;
+	struct o2nm_node  	*cl_nodes[O2NM_MAX_NODES];
+	struct rb_root		cl_node_ip_tree;
+	unsigned int		cl_idle_timeout_ms;
+	unsigned int		cl_keepalive_delay_ms;
+	unsigned int		cl_reconnect_delay_ms;
+
+	/* this bitmap is part of a hack for disk bitmap.. will go eventually. - zab */
+	unsigned long	cl_nodes_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
+};
+
+extern struct o2nm_cluster *o2nm_single_cluster;
+
 u8 o2nm_this_node(void);
 
 int o2nm_configured_node_map(unsigned long *map, unsigned bytes);
diff --git a/fs/ocfs2/cluster/quorum.c b/fs/ocfs2/cluster/quorum.c
index 7bba98fbfc15..4705d659fe57 100644
--- a/fs/ocfs2/cluster/quorum.c
+++ b/fs/ocfs2/cluster/quorum.c
@@ -88,7 +88,7 @@ void o2quo_disk_timeout(void)
 	o2quo_fence_self();
 }
 
-static void o2quo_make_decision(void *arg)
+static void o2quo_make_decision(struct work_struct *work)
 {
 	int quorum;
 	int lowest_hb, lowest_reachable = 0, fence = 0;
@@ -306,7 +306,7 @@ void o2quo_init(void)
 	struct o2quo_state *qs = &o2quo_state;
 
 	spin_lock_init(&qs->qs_lock);
-	INIT_WORK(&qs->qs_work, o2quo_make_decision, NULL);
+	INIT_WORK(&qs->qs_work, o2quo_make_decision);
 }
 
 void o2quo_exit(void)
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index b650efa8c8be..ae4ff4a6636b 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -140,13 +140,35 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] =
 		 [O2NET_ERR_DIED]	= -EHOSTDOWN,};
 
 /* can't quite avoid *all* internal declarations :/ */
-static void o2net_sc_connect_completed(void *arg);
-static void o2net_rx_until_empty(void *arg);
-static void o2net_shutdown_sc(void *arg);
+static void o2net_sc_connect_completed(struct work_struct *work);
+static void o2net_rx_until_empty(struct work_struct *work);
+static void o2net_shutdown_sc(struct work_struct *work);
 static void o2net_listen_data_ready(struct sock *sk, int bytes);
-static void o2net_sc_send_keep_req(void *arg);
+static void o2net_sc_send_keep_req(struct work_struct *work);
 static void o2net_idle_timer(unsigned long data);
 static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
+
+/*
+ * FIXME: These should use to_o2nm_cluster_from_node(), but we end up
+ * losing our parent link to the cluster during shutdown. This can be
+ * solved by adding a pre-removal callback to configfs, or passing
+ * around the cluster with the node. -jeffm
+ */
+static inline int o2net_reconnect_delay(struct o2nm_node *node)
+{
+	return o2nm_single_cluster->cl_reconnect_delay_ms;
+}
+
+static inline int o2net_keepalive_delay(struct o2nm_node *node)
+{
+	return o2nm_single_cluster->cl_keepalive_delay_ms;
+}
+
+static inline int o2net_idle_timeout(struct o2nm_node *node)
+{
+	return o2nm_single_cluster->cl_idle_timeout_ms;
+}
 
 static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
 {
@@ -271,6 +293,8 @@ static void sc_kref_release(struct kref *kref)
 {
 	struct o2net_sock_container *sc = container_of(kref,
 					struct o2net_sock_container, sc_kref);
+	BUG_ON(timer_pending(&sc->sc_idle_timeout));
+
 	sclog(sc, "releasing\n");
 
 	if (sc->sc_sock) {
@@ -300,7 +324,7 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
 	struct page *page = NULL;
 
 	page = alloc_page(GFP_NOFS);
-	sc = kcalloc(1, sizeof(*sc), GFP_NOFS);
+	sc = kzalloc(sizeof(*sc), GFP_NOFS);
 	if (sc == NULL || page == NULL)
 		goto out;
 
@@ -308,10 +332,10 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
 	o2nm_node_get(node);
 	sc->sc_node = node;
 
-	INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc);
-	INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc);
-	INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc);
-	INIT_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req, sc);
+	INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed);
+	INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty);
+	INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc);
+	INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req);
 
 	init_timer(&sc->sc_idle_timeout);
 	sc->sc_idle_timeout.function = o2net_idle_timer;
@@ -342,7 +366,7 @@ static void o2net_sc_queue_work(struct o2net_sock_container *sc,
 		sc_put(sc);
 }
 static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
-					struct work_struct *work,
+					struct delayed_work *work,
 					int delay)
 {
 	sc_get(sc);
@@ -350,12 +374,19 @@ static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
 		sc_put(sc);
 }
 static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
-					 struct work_struct *work)
+					 struct delayed_work *work)
 {
 	if (cancel_delayed_work(work))
 		sc_put(sc);
 }
 
+static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
+
+int o2net_num_connected_peers(void)
+{
+	return atomic_read(&o2net_connected_peers);
+}
+
 static void o2net_set_nn_state(struct o2net_node *nn,
 			       struct o2net_sock_container *sc,
 			       unsigned valid, int err)
@@ -366,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
 
 	assert_spin_locked(&nn->nn_lock);
 
+	if (old_sc && !sc)
+		atomic_dec(&o2net_connected_peers);
+	else if (!old_sc && sc)
+		atomic_inc(&o2net_connected_peers);
+
 	/* the node num comparison and single connect/accept path should stop
 	 * an non-null sc from being overwritten with another */
 	BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
@@ -424,9 +460,9 @@ static void o2net_set_nn_state(struct o2net_node *nn,
 		/* delay if we're withing a RECONNECT_DELAY of the
 		 * last attempt */
 		delay = (nn->nn_last_connect_attempt +
-			 msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+			 msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
 			- jiffies;
-		if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS))
+		if (delay > msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
 			delay = 0;
 		mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
 		queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
@@ -564,9 +600,11 @@ static void o2net_ensure_shutdown(struct o2net_node *nn,
  * ourselves as state_change couldn't get the nn_lock and call set_nn_state
  * itself.
  */
-static void o2net_shutdown_sc(void *arg)
+static void o2net_shutdown_sc(struct work_struct *work)
 {
-	struct o2net_sock_container *sc = arg;
+	struct o2net_sock_container *sc =
+		container_of(work, struct o2net_sock_container,
+			     sc_shutdown_work);
 	struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
 
 	sclog(sc, "shutting down\n");
@@ -676,7 +714,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
 		goto out;
 	}
 
-       	nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS);
+       	nmh = kzalloc(sizeof(struct o2net_msg_handler), GFP_NOFS);
 	if (nmh == NULL) {
 		ret = -ENOMEM;
 		goto out;
@@ -1097,13 +1135,51 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
 		return -1;
 	}
 
+	/*
+	 * Ensure timeouts are consistent with other nodes, otherwise
+	 * we can end up with one node thinking that the other must be down,
+	 * but isn't. This can ultimately cause corruption.
+	 */
+	if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
+				o2net_idle_timeout(sc->sc_node)) {
+		mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
+		     "%u ms, but we use %u ms locally.  disconnecting\n",
+		     SC_NODEF_ARGS(sc),
+		     be32_to_cpu(hand->o2net_idle_timeout_ms),
+		     o2net_idle_timeout(sc->sc_node));
+		o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+		return -1;
+	}
+
+	if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
+			o2net_keepalive_delay(sc->sc_node)) {
+		mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
+		     "%u ms, but we use %u ms locally.  disconnecting\n",
+		     SC_NODEF_ARGS(sc),
+		     be32_to_cpu(hand->o2net_keepalive_delay_ms),
+		     o2net_keepalive_delay(sc->sc_node));
+		o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+		return -1;
+	}
+
+	if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
+			O2HB_MAX_WRITE_TIMEOUT_MS) {
+		mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
+		     "%u ms, but we use %u ms locally.  disconnecting\n",
+		     SC_NODEF_ARGS(sc),
+		     be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
+		     O2HB_MAX_WRITE_TIMEOUT_MS);
+		o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+		return -1;
+	}
+
 	sc->sc_handshake_ok = 1;
 
 	spin_lock(&nn->nn_lock);
 	/* set valid and queue the idle timers only if it hasn't been
 	 * shut down already */
 	if (nn->nn_sc == sc) {
-		o2net_sc_postpone_idle(sc);
+		o2net_sc_reset_idle_timer(sc);
 		o2net_set_nn_state(nn, sc, 1, 0);
 	}
 	spin_unlock(&nn->nn_lock);
@@ -1129,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
 	sclog(sc, "receiving\n");
 	do_gettimeofday(&sc->sc_tv_advance_start);
 
+	if (unlikely(sc->sc_handshake_ok == 0)) {
+		if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
+			data = page_address(sc->sc_page) + sc->sc_page_off;
+			datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
+			ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
+			if (ret > 0)
+				sc->sc_page_off += ret;
+		}
+
+		if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
+			o2net_check_handshake(sc);
+			if (unlikely(sc->sc_handshake_ok == 0))
+				ret = -EPROTO;
+		}
+		goto out;
+	}
+
 	/* do we need more header? */
 	if (sc->sc_page_off < sizeof(struct o2net_msg)) {
 		data = page_address(sc->sc_page) + sc->sc_page_off;
@@ -1136,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
 		ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
 		if (ret > 0) {
 			sc->sc_page_off += ret;
-
-			/* this working relies on the handshake being
-			 * smaller than the normal message header */
-			if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
-			    !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
-				ret = -EPROTO;
-				goto out;
-			}
-
 			/* only swab incoming here.. we can
 			 * only get here once as we cross from
 			 * being under to over */
@@ -1201,9 +1285,10 @@ out:
 /* this work func is triggerd by data ready.  it reads until it can read no
  * more.  it interprets 0, eof, as fatal.  if data_ready hits while we're doing
  * our work the work struct will be marked and we'll be called again. */
-static void o2net_rx_until_empty(void *arg)
+static void o2net_rx_until_empty(struct work_struct *work)
 {
-	struct o2net_sock_container *sc = arg;
+	struct o2net_sock_container *sc =
+		container_of(work, struct o2net_sock_container, sc_rx_work);
 	int ret;
 
 	do {
@@ -1245,26 +1330,43 @@ static int o2net_set_nodelay(struct socket *sock)
 	return ret;
 }
 
+static void o2net_initialize_handshake(void)
+{
+	o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
+		O2HB_MAX_WRITE_TIMEOUT_MS);
+	o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
+		o2net_idle_timeout(NULL));
+	o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
+		o2net_keepalive_delay(NULL));
+	o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
+		o2net_reconnect_delay(NULL));
+}
+
 /* ------------------------------------------------------------ */
 
 /* called when a connect completes and after a sock is accepted.  the
  * rx path will see the response and mark the sc valid */
-static void o2net_sc_connect_completed(void *arg)
+static void o2net_sc_connect_completed(struct work_struct *work)
 {
-	struct o2net_sock_container *sc = arg;
+	struct o2net_sock_container *sc =
+		container_of(work, struct o2net_sock_container,
+			     sc_connect_work);
 
 	mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
               (unsigned long long)O2NET_PROTOCOL_VERSION,
 	      (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
 
+	o2net_initialize_handshake();
 	o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
 	sc_put(sc);
 }
 
 /* this is called as a work_struct func. */
-static void o2net_sc_send_keep_req(void *arg)
+static void o2net_sc_send_keep_req(struct work_struct *work)
 {
-	struct o2net_sock_container *sc = arg;
+	struct o2net_sock_container *sc =
+		container_of(work, struct o2net_sock_container,
+			     sc_keepalive_work.work);
 
 	o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req));
 	sc_put(sc);
@@ -1280,8 +1382,10 @@ static void o2net_idle_timer(unsigned long data)
 
 	do_gettimeofday(&now);
 
-	printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for 10 "
-	     "seconds, shutting it down.\n", SC_NODEF_ARGS(sc));
+	printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u "
+	     "seconds, shutting it down.\n", SC_NODEF_ARGS(sc),
+		     o2net_idle_timeout(sc->sc_node) / 1000,
+		     o2net_idle_timeout(sc->sc_node) % 1000);
 	mlog(ML_NOTICE, "here are some times that might help debug the "
 	     "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
 	     "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
@@ -1299,14 +1403,21 @@ static void o2net_idle_timer(unsigned long data)
 	o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
 }
 
-static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
+static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
 {
 	o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
 	o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
-				    O2NET_KEEPALIVE_DELAY_SECS * HZ);
+		      msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node)));
 	do_gettimeofday(&sc->sc_tv_timer);
 	mod_timer(&sc->sc_idle_timeout,
-		  jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ));
+	       jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node)));
+}
+
+static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
+{
+	/* Only push out an existing timer */
+	if (timer_pending(&sc->sc_idle_timeout))
+		o2net_sc_reset_idle_timer(sc);
 }
 
 /* this work func is kicked whenever a path sets the nn state which doesn't
@@ -1314,14 +1425,15 @@ static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
  * having a connect attempt fail, etc. This centralizes the logic which decides
  * if a connect attempt should be made or if we should give up and all future
  * transmit attempts should fail */
-static void o2net_start_connect(void *arg)
+static void o2net_start_connect(struct work_struct *work)
 {
-	struct o2net_node *nn = arg;
+	struct o2net_node *nn =
+		container_of(work, struct o2net_node, nn_connect_work.work);
 	struct o2net_sock_container *sc = NULL;
 	struct o2nm_node *node = NULL, *mynode = NULL;
 	struct socket *sock = NULL;
 	struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
-	int ret = 0;
+	int ret = 0, stop;
 
 	/* if we're greater we initiate tx, otherwise we accept */
 	if (o2nm_this_node() <= o2net_num_from_nn(nn))
@@ -1342,10 +1454,9 @@ static void o2net_start_connect(void *arg)
 
 	spin_lock(&nn->nn_lock);
 	/* see if we already have one pending or have given up */
-	if (nn->nn_sc || nn->nn_persistent_error)
-		arg = NULL;
+	stop = (nn->nn_sc || nn->nn_persistent_error);
 	spin_unlock(&nn->nn_lock);
-	if (arg == NULL) /* *shrug*, needed some indicator */
+	if (stop)
 		goto out;
 
 	nn->nn_last_connect_attempt = jiffies;
@@ -1421,24 +1532,29 @@ out:
 	return;
 }
 
-static void o2net_connect_expired(void *arg)
+static void o2net_connect_expired(struct work_struct *work)
 {
-	struct o2net_node *nn = arg;
+	struct o2net_node *nn =
+		container_of(work, struct o2net_node, nn_connect_expired.work);
 
 	spin_lock(&nn->nn_lock);
 	if (!nn->nn_sc_valid) {
+		struct o2nm_node *node = nn->nn_sc->sc_node;
 		mlog(ML_ERROR, "no connection established with node %u after "
-		     "%u seconds, giving up and returning errors.\n",
-		     o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS);
+		     "%u.%u seconds, giving up and returning errors.\n",
+		     o2net_num_from_nn(nn),
+		     o2net_idle_timeout(node) / 1000,
+		     o2net_idle_timeout(node) % 1000);
 
 		o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
 	}
 	spin_unlock(&nn->nn_lock);
 }
 
-static void o2net_still_up(void *arg)
+static void o2net_still_up(struct work_struct *work)
 {
-	struct o2net_node *nn = arg;
+	struct o2net_node *nn =
+		container_of(work, struct o2net_node, nn_still_up.work);
 
 	o2quo_hb_still_up(o2net_num_from_nn(nn));
 }
@@ -1469,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
 
 	if (node_num != o2nm_this_node())
 		o2net_disconnect_node(node);
+
+	BUG_ON(atomic_read(&o2net_connected_peers) < 0);
 }
 
 static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1480,14 +1598,14 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
 
 	/* ensure an immediate connect attempt */
 	nn->nn_last_connect_attempt = jiffies -
-		(msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);
+		(msecs_to_jiffies(o2net_reconnect_delay(node)) + 1);
 
 	if (node_num != o2nm_this_node()) {
 		/* heartbeat doesn't work unless a local node number is
 		 * configured and doing so brings up the o2net_wq, so we can
 		 * use it.. */
 		queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
-				   O2NET_IDLE_TIMEOUT_SECS * HZ);
+		                   msecs_to_jiffies(o2net_idle_timeout(node)));
 
 		/* believe it or not, accept and node hearbeating testing
 		 * can succeed for this node before we got here.. so
@@ -1632,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
 	o2net_register_callbacks(sc->sc_sock->sk, sc);
 	o2net_sc_queue_work(sc, &sc->sc_rx_work);
 
+	o2net_initialize_handshake();
 	o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
 
 out:
@@ -1644,9 +1763,9 @@ out:
 	return ret;
 }
 
-static void o2net_accept_many(void *arg)
+static void o2net_accept_many(struct work_struct *work)
 {
-	struct socket *sock = arg;
+	struct socket *sock = o2net_listen_sock;
 	while (o2net_accept_one(sock) == 0)
 		cond_resched();
 }
@@ -1700,7 +1819,7 @@ static int o2net_open_listening_sock(__be16 port)
 	write_unlock_bh(&sock->sk->sk_callback_lock);
 
 	o2net_listen_sock = sock;
-	INIT_WORK(&o2net_listen_work, o2net_accept_many, sock);
+	INIT_WORK(&o2net_listen_work, o2net_accept_many);
 
 	sock->sk->sk_reuse = 1;
 	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
@@ -1799,9 +1918,9 @@ int o2net_init(void)
 
 	o2quo_init();
 
-	o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL);
-	o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);
-	o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);
+	o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL);
+	o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
+	o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
 	if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
 		kfree(o2net_hand);
 		kfree(o2net_keep_req);
@@ -1819,9 +1938,10 @@ int o2net_init(void)
 		struct o2net_node *nn = o2net_nn_from_num(i);
 
 		spin_lock_init(&nn->nn_lock);
-		INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn);
-		INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn);
-		INIT_WORK(&nn->nn_still_up, o2net_still_up, nn);
+		INIT_DELAYED_WORK(&nn->nn_connect_work, o2net_start_connect);
+		INIT_DELAYED_WORK(&nn->nn_connect_expired,
+				  o2net_connect_expired);
+		INIT_DELAYED_WORK(&nn->nn_still_up, o2net_still_up);
 		/* until we see hb from a node we'll return einval */
 		nn->nn_persistent_error = -ENOTCONN;
 		init_waitqueue_head(&nn->nn_sc_wq);
diff --git a/fs/ocfs2/cluster/tcp.h b/fs/ocfs2/cluster/tcp.h
index 616ff2b8434a..21a4e43df836 100644
--- a/fs/ocfs2/cluster/tcp.h
+++ b/fs/ocfs2/cluster/tcp.h
@@ -54,6 +54,13 @@ typedef int (o2net_msg_handler_func)(struct o2net_msg *msg, u32 len, void *data)
 
 #define O2NET_MAX_PAYLOAD_BYTES  (4096 - sizeof(struct o2net_msg))
 
+/* same as hb delay, we're waiting for another node to recognize our hb */
+#define O2NET_RECONNECT_DELAY_MS_DEFAULT	2000
+
+#define O2NET_KEEPALIVE_DELAY_MS_DEFAULT	5000
+#define O2NET_IDLE_TIMEOUT_MS_DEFAULT		10000
+
+
 /* TODO: figure this out.... */
 static inline int o2net_link_down(int err, struct socket *sock)
 {
@@ -101,6 +108,7 @@ void o2net_unregister_hb_callbacks(void);
 int o2net_start_listening(struct o2nm_node *node);
 void o2net_stop_listening(struct o2nm_node *node);
 void o2net_disconnect_node(struct o2nm_node *node);
+int o2net_num_connected_peers(void);
 
 int o2net_init(void);
 void o2net_exit(void);
diff --git a/fs/ocfs2/cluster/tcp_internal.h b/fs/ocfs2/cluster/tcp_internal.h
index 4b46aac7d243..b700dc9624d1 100644
--- a/fs/ocfs2/cluster/tcp_internal.h
+++ b/fs/ocfs2/cluster/tcp_internal.h
@@ -27,23 +27,20 @@
 #define O2NET_MSG_KEEP_REQ_MAGIC  ((u16)0xfa57)
 #define O2NET_MSG_KEEP_RESP_MAGIC ((u16)0xfa58)
 
-/* same as hb delay, we're waiting for another node to recognize our hb */
-#define O2NET_RECONNECT_DELAY_MS	O2HB_REGION_TIMEOUT_MS
-
 /* we're delaying our quorum decision so that heartbeat will have timed
  * out truly dead nodes by the time we come around to making decisions
  * on their number */
 #define O2NET_QUORUM_DELAY_MS	((o2hb_dead_threshold + 2) * O2HB_REGION_TIMEOUT_MS)
 
-#define O2NET_KEEPALIVE_DELAY_SECS	5
-#define O2NET_IDLE_TIMEOUT_SECS		10
-
 /* 
  * This version number represents quite a lot, unfortunately.  It not
  * only represents the raw network message protocol on the wire but also
  * locking semantics of the file system using the protocol.  It should 
  * be somewhere else, I'm sure, but right now it isn't.
  *
+ * New in version 5:
+ * 	- Network timeout checking protocol
+ *
  * New in version 4:
  * 	- Remove i_generation from lock names for better stat performance.
  *
@@ -54,10 +51,14 @@
  * 	- full 64 bit i_size in the metadata lock lvbs
  * 	- introduction of "rw" lock and pushing meta/data locking down
  */
-#define O2NET_PROTOCOL_VERSION 4ULL
+#define O2NET_PROTOCOL_VERSION 5ULL
 struct o2net_handshake {
 	__be64	protocol_version;
 	__be64	connector_id;
+	__be32  o2hb_heartbeat_timeout_ms;
+	__be32  o2net_idle_timeout_ms;
+	__be32  o2net_keepalive_delay_ms;
+	__be32  o2net_reconnect_delay_ms;
 };
 
 struct o2net_node {
@@ -86,18 +87,18 @@ struct o2net_node {
 	 * connect attempt fails and so can be self-arming.  shutdown is
 	 * careful to first mark the nn such that no connects will be attempted
 	 * before canceling delayed connect work and flushing the queue. */
-	struct work_struct		nn_connect_work;
+	struct delayed_work		nn_connect_work;
 	unsigned long			nn_last_connect_attempt;
 
 	/* this is queued as nodes come up and is canceled when a connection is
 	 * established.  this expiring gives up on the node and errors out
 	 * transmits */
-	struct work_struct		nn_connect_expired;
+	struct delayed_work		nn_connect_expired;
 
 	/* after we give up on a socket we wait a while before deciding
 	 * that it is still heartbeating and that we should do some
 	 * quorum work */
-	struct work_struct		nn_still_up;
+	struct delayed_work		nn_still_up;
 };
 
 struct o2net_sock_container {
@@ -129,7 +130,7 @@ struct o2net_sock_container {
 	struct work_struct	sc_shutdown_work;
 
 	struct timer_list	sc_idle_timeout;
-	struct work_struct	sc_keepalive_work;
+	struct delayed_work	sc_keepalive_work;
 
 	unsigned		sc_handshake_ok:1;
 
diff --git a/fs/ocfs2/dir.c b/fs/ocfs2/dir.c
index 04e01915b86e..66821e178167 100644
--- a/fs/ocfs2/dir.c
+++ b/fs/ocfs2/dir.c
@@ -79,9 +79,10 @@ int ocfs2_readdir(struct file * filp, void * dirent, filldir_t filldir)
 	struct buffer_head * bh, * tmp;
 	struct ocfs2_dir_entry * de;
 	int err;
-	struct inode *inode = filp->f_dentry->d_inode;
+	struct inode *inode = filp->f_path.dentry->d_inode;
 	struct super_block * sb = inode->i_sb;
 	unsigned int ra_sectors = 16;
+	int lock_level = 0;
 
 	mlog_entry("dirino=%llu\n",
 		   (unsigned long long)OCFS2_I(inode)->ip_blkno);
@@ -89,7 +90,15 @@ int ocfs2_readdir(struct file * filp, void * dirent, filldir_t filldir)
 	stored = 0;
 	bh = NULL;
 
-	error = ocfs2_meta_lock(inode, NULL, NULL, 0);
+	error = ocfs2_meta_lock_atime(inode, filp->f_vfsmnt, &lock_level);
+	if (lock_level && error >= 0) {
+		/* We release EX lock which used to update atime
+		 * and get PR lock again to reduce contention
+		 * on commonly accessed directories. */
+		ocfs2_meta_unlock(inode, 1);
+		lock_level = 0;
+		error = ocfs2_meta_lock(inode, NULL, 0);
+	}
 	if (error < 0) {
 		if (error != -ENOENT)
 			mlog_errno(error);
@@ -198,7 +207,7 @@ revalidate:
 
 	stored = 0;
 bail:
-	ocfs2_meta_unlock(inode, 0);
+	ocfs2_meta_unlock(inode, lock_level);
 
 bail_nolock:
 	mlog_exit(stored);
@@ -340,7 +349,7 @@ int ocfs2_empty_dir(struct inode *inode)
 
 /* returns a bh of the 1st new block in the allocation. */
 int ocfs2_do_extend_dir(struct super_block *sb,
-			struct ocfs2_journal_handle *handle,
+			handle_t *handle,
 			struct inode *dir,
 			struct buffer_head *parent_fe_bh,
 			struct ocfs2_alloc_context *data_ac,
@@ -398,7 +407,7 @@ static int ocfs2_extend_dir(struct ocfs2_super *osb,
 	struct ocfs2_dinode *fe = (struct ocfs2_dinode *) parent_fe_bh->b_data;
 	struct ocfs2_alloc_context *data_ac = NULL;
 	struct ocfs2_alloc_context *meta_ac = NULL;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct buffer_head *new_bh = NULL;
 	struct ocfs2_dir_entry * de;
 	struct super_block *sb = osb->sb;
@@ -409,13 +418,6 @@ static int ocfs2_extend_dir(struct ocfs2_super *osb,
 	mlog(0, "extending dir %llu (i_size = %lld)\n",
 	     (unsigned long long)OCFS2_I(dir)->ip_blkno, dir_i_size);
 
-	handle = ocfs2_alloc_handle(osb);
-	if (handle == NULL) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
 	/* dir->i_size is always block aligned. */
 	spin_lock(&OCFS2_I(dir)->ip_lock);
 	if (dir_i_size == ocfs2_clusters_to_bytes(sb, OCFS2_I(dir)->ip_clusters)) {
@@ -428,8 +430,7 @@ static int ocfs2_extend_dir(struct ocfs2_super *osb,
 		}
 
 		if (!num_free_extents) {
-			status = ocfs2_reserve_new_metadata(osb, handle,
-							    fe, &meta_ac);
+			status = ocfs2_reserve_new_metadata(osb, fe, &meta_ac);
 			if (status < 0) {
 				if (status != -ENOSPC)
 					mlog_errno(status);
@@ -437,7 +438,7 @@ static int ocfs2_extend_dir(struct ocfs2_super *osb,
 			}
 		}
 
-		status = ocfs2_reserve_clusters(osb, handle, 1, &data_ac);
+		status = ocfs2_reserve_clusters(osb, 1, &data_ac);
 		if (status < 0) {
 			if (status != -ENOSPC)
 				mlog_errno(status);
@@ -450,7 +451,7 @@ static int ocfs2_extend_dir(struct ocfs2_super *osb,
 		credits = OCFS2_SIMPLE_DIR_EXTEND_CREDITS;
 	}
 
-	handle = ocfs2_start_trans(osb, handle, credits);
+	handle = ocfs2_start_trans(osb, credits);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -496,7 +497,7 @@ static int ocfs2_extend_dir(struct ocfs2_super *osb,
 	get_bh(*new_de_bh);
 bail:
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
 
 	if (data_ac)
 		ocfs2_free_alloc_context(data_ac);
diff --git a/fs/ocfs2/dir.h b/fs/ocfs2/dir.h
index 5f614ec9649c..3f67e146864a 100644
--- a/fs/ocfs2/dir.h
+++ b/fs/ocfs2/dir.h
@@ -45,7 +45,7 @@ int ocfs2_prepare_dir_for_insert(struct ocfs2_super *osb,
 				 struct buffer_head **ret_de_bh);
 struct ocfs2_alloc_context;
 int ocfs2_do_extend_dir(struct super_block *sb,
-			struct ocfs2_journal_handle *handle,
+			handle_t *handle,
 			struct inode *dir,
 			struct buffer_head *parent_fe_bh,
 			struct ocfs2_alloc_context *data_ac,
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index fa968180b072..6b6ff76538c5 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -153,7 +153,7 @@ static inline struct hlist_head *dlm_lockres_hash(struct dlm_ctxt *dlm, unsigned
  * called functions that cannot be directly called from the
  * net message handlers for some reason, usually because
  * they need to send net messages of their own. */
-void dlm_dispatch_work(void *data);
+void dlm_dispatch_work(struct work_struct *work);
 
 struct dlm_lock_resource;
 struct dlm_work_item;
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 8d1065f8b3bd..f0b25f2dd205 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -68,7 +68,8 @@ static void **dlm_alloc_pagevec(int pages)
 			goto out_free;
 
 	mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
-	     pages, DLM_HASH_PAGES, (unsigned long)DLM_BUCKETS_PER_PAGE);
+	     pages, (unsigned long)DLM_HASH_PAGES,
+	     (unsigned long)DLM_BUCKETS_PER_PAGE);
 	return vec;
 out_free:
 	dlm_free_pagevec(vec, i);
@@ -919,7 +920,7 @@ static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
 
 	mlog_entry("%p", dlm);
 
-	ctxt = kcalloc(1, sizeof(*ctxt), GFP_KERNEL);
+	ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
 	if (!ctxt) {
 		status = -ENOMEM;
 		mlog_errno(status);
@@ -1222,7 +1223,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
 	int i;
 	struct dlm_ctxt *dlm = NULL;
 
-	dlm = kcalloc(1, sizeof(*dlm), GFP_KERNEL);
+	dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
 	if (!dlm) {
 		mlog_errno(-ENOMEM);
 		goto leave;
@@ -1296,7 +1297,7 @@ static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
 
 	spin_lock_init(&dlm->work_lock);
 	INIT_LIST_HEAD(&dlm->work_list);
-	INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work, dlm);
+	INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
 
 	kref_init(&dlm->dlm_refs);
 	dlm->dlm_state = DLM_CTXT_NEW;
diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlm/dlmfs.c
index 16b8d1ba7066..b7f0ba97a1a2 100644
--- a/fs/ocfs2/dlm/dlmfs.c
+++ b/fs/ocfs2/dlm/dlmfs.c
@@ -66,7 +66,7 @@ static struct file_operations dlmfs_file_operations;
 static struct inode_operations dlmfs_dir_inode_operations;
 static struct inode_operations dlmfs_root_inode_operations;
 static struct inode_operations dlmfs_file_inode_operations;
-static kmem_cache_t *dlmfs_inode_cache;
+static struct kmem_cache *dlmfs_inode_cache;
 
 struct workqueue_struct *user_dlm_worker;
 
@@ -176,7 +176,7 @@ static ssize_t dlmfs_file_read(struct file *filp,
 	int bytes_left;
 	ssize_t readlen;
 	char *lvb_buf;
-	struct inode *inode = filp->f_dentry->d_inode;
+	struct inode *inode = filp->f_path.dentry->d_inode;
 
 	mlog(0, "inode %lu, count = %zu, *ppos = %llu\n",
 		inode->i_ino, count, *ppos);
@@ -220,7 +220,7 @@ static ssize_t dlmfs_file_write(struct file *filp,
 	int bytes_left;
 	ssize_t writelen;
 	char *lvb_buf;
-	struct inode *inode = filp->f_dentry->d_inode;
+	struct inode *inode = filp->f_path.dentry->d_inode;
 
 	mlog(0, "inode %lu, count = %zu, *ppos = %llu\n",
 		inode->i_ino, count, *ppos);
@@ -257,7 +257,7 @@ static ssize_t dlmfs_file_write(struct file *filp,
 }
 
 static void dlmfs_init_once(void *foo,
-			    kmem_cache_t *cachep,
+			    struct kmem_cache *cachep,
 			    unsigned long flags)
 {
 	struct dlmfs_inode_private *ip =
@@ -276,7 +276,7 @@ static struct inode *dlmfs_alloc_inode(struct super_block *sb)
 {
 	struct dlmfs_inode_private *ip;
 
-	ip = kmem_cache_alloc(dlmfs_inode_cache, SLAB_NOFS);
+	ip = kmem_cache_alloc(dlmfs_inode_cache, GFP_NOFS);
 	if (!ip)
 		return NULL;
 
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 42a1b91979b5..e5ca3db197f6 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -408,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie,
 	struct dlm_lock *lock;
 	int kernel_allocated = 0;
 
-	lock = kcalloc(1, sizeof(*lock), GFP_NOFS);
+	lock = kzalloc(sizeof(*lock), GFP_NOFS);
 	if (!lock)
 		return NULL;
 
 	if (!lksb) {
 		/* zero memory only if kernel-allocated */
-		lksb = kcalloc(1, sizeof(*lksb), GFP_NOFS);
+		lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
 		if (!lksb) {
 			kfree(lock);
 			return NULL;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index f784177b6241..0ad872055cb3 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -221,7 +221,7 @@ EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
 #endif  /*  0  */
 
 
-static kmem_cache_t *dlm_mle_cache = NULL;
+static struct kmem_cache *dlm_mle_cache = NULL;
 
 
 static void dlm_mle_release(struct kref *kref);
@@ -1939,7 +1939,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
 			       int ignore_higher, u8 request_from, u32 flags)
 {
 	struct dlm_work_item *item;
-	item = kcalloc(1, sizeof(*item), GFP_NOFS);
+	item = kzalloc(sizeof(*item), GFP_NOFS);
 	if (!item)
 		return -ENOMEM;
 
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 9d950d7cea38..367a11e9e2ed 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -153,9 +153,10 @@ static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
 }
 
 /* Worker function used during recovery. */
-void dlm_dispatch_work(void *data)
+void dlm_dispatch_work(struct work_struct *work)
 {
-	struct dlm_ctxt *dlm = (struct dlm_ctxt *)data;
+	struct dlm_ctxt *dlm =
+		container_of(work, struct dlm_ctxt, dispatched_work);
 	LIST_HEAD(tmp_list);
 	struct list_head *iter, *iter2;
 	struct dlm_work_item *item;
@@ -756,7 +757,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
 		}
 		BUG_ON(num == dead_node);
 
-		ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS);
+		ndata = kzalloc(sizeof(*ndata), GFP_NOFS);
 		if (!ndata) {
 			dlm_destroy_recovery_area(dlm, dead_node);
 			return -ENOMEM;
@@ -841,7 +842,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
 	}
 	BUG_ON(lr->dead_node != dlm->reco.dead_node);
 
-	item = kcalloc(1, sizeof(*item), GFP_NOFS);
+	item = kzalloc(sizeof(*item), GFP_NOFS);
 	if (!item) {
 		dlm_put(dlm);
 		return -ENOMEM;
@@ -1322,7 +1323,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
 
 	ret = -ENOMEM;
 	buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
-	item = kcalloc(1, sizeof(*item), GFP_NOFS);
+	item = kzalloc(sizeof(*item), GFP_NOFS);
 	if (!buf || !item)
 		goto leave;
 
diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlm/userdlm.c
index eead48bbfac6..7d2f578b267d 100644
--- a/fs/ocfs2/dlm/userdlm.c
+++ b/fs/ocfs2/dlm/userdlm.c
@@ -171,15 +171,14 @@ static inline void user_dlm_grab_inode_ref(struct user_lock_res *lockres)
 		BUG();
 }
 
-static void user_dlm_unblock_lock(void *opaque);
+static void user_dlm_unblock_lock(struct work_struct *work);
 
 static void __user_dlm_queue_lockres(struct user_lock_res *lockres)
 {
 	if (!(lockres->l_flags & USER_LOCK_QUEUED)) {
 		user_dlm_grab_inode_ref(lockres);
 
-		INIT_WORK(&lockres->l_work, user_dlm_unblock_lock,
-			  lockres);
+		INIT_WORK(&lockres->l_work, user_dlm_unblock_lock);
 
 		queue_work(user_dlm_worker, &lockres->l_work);
 		lockres->l_flags |= USER_LOCK_QUEUED;
@@ -279,10 +278,11 @@ static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres)
 	iput(inode);
 }
 
-static void user_dlm_unblock_lock(void *opaque)
+static void user_dlm_unblock_lock(struct work_struct *work)
 {
 	int new_level, status;
-	struct user_lock_res *lockres = (struct user_lock_res *) opaque;
+	struct user_lock_res *lockres =
+		container_of(work, struct user_lock_res, l_work);
 	struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres);
 
 	mlog(0, "processing lockres %.*s\n", lockres->l_namelen,
diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c
index 8801e41afe80..e335541727f9 100644
--- a/fs/ocfs2/dlmglue.c
+++ b/fs/ocfs2/dlmglue.c
@@ -49,6 +49,7 @@
 #include "dcache.h"
 #include "dlmglue.h"
 #include "extent_map.h"
+#include "file.h"
 #include "heartbeat.h"
 #include "inode.h"
 #include "journal.h"
@@ -769,7 +770,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb,
 			     int dlm_flags)
 {
 	int ret = 0;
-	enum dlm_status status;
+	enum dlm_status status = DLM_NORMAL;
 	unsigned long flags;
 
 	mlog_entry_void();
@@ -1063,10 +1064,10 @@ static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
 	mlog_exit_void();
 }
 
-int ocfs2_create_new_lock(struct ocfs2_super *osb,
-			  struct ocfs2_lock_res *lockres,
-			  int ex,
-			  int local)
+static int ocfs2_create_new_lock(struct ocfs2_super *osb,
+				 struct ocfs2_lock_res *lockres,
+				 int ex,
+				 int local)
 {
 	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
 	unsigned long flags;
@@ -1137,6 +1138,7 @@ int ocfs2_rw_lock(struct inode *inode, int write)
 {
 	int status, level;
 	struct ocfs2_lock_res *lockres;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
 	BUG_ON(!inode);
 
@@ -1146,6 +1148,9 @@ int ocfs2_rw_lock(struct inode *inode, int write)
 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
 	     write ? "EXMODE" : "PRMODE");
 
+	if (ocfs2_mount_local(osb))
+		return 0;
+
 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
 
 	level = write ? LKM_EXMODE : LKM_PRMODE;
@@ -1163,6 +1168,7 @@ void ocfs2_rw_unlock(struct inode *inode, int write)
 {
 	int level = write ? LKM_EXMODE : LKM_PRMODE;
 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
 	mlog_entry_void();
 
@@ -1170,7 +1176,8 @@ void ocfs2_rw_unlock(struct inode *inode, int write)
 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
 	     write ? "EXMODE" : "PRMODE");
 
-	ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
+	if (!ocfs2_mount_local(osb))
+		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
 	mlog_exit_void();
 }
@@ -1181,6 +1188,7 @@ int ocfs2_data_lock_full(struct inode *inode,
 {
 	int status = 0, level;
 	struct ocfs2_lock_res *lockres;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
 	BUG_ON(!inode);
 
@@ -1200,6 +1208,9 @@ int ocfs2_data_lock_full(struct inode *inode,
 		goto out;
 	}
 
+	if (ocfs2_mount_local(osb))
+		goto out;
+
 	lockres = &OCFS2_I(inode)->ip_data_lockres;
 
 	level = write ? LKM_EXMODE : LKM_PRMODE;
@@ -1268,6 +1279,7 @@ void ocfs2_data_unlock(struct inode *inode,
 {
 	int level = write ? LKM_EXMODE : LKM_PRMODE;
 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
 	mlog_entry_void();
 
@@ -1275,7 +1287,8 @@ void ocfs2_data_unlock(struct inode *inode,
 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
 	     write ? "EXMODE" : "PRMODE");
 
-	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
+	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
+	    !ocfs2_mount_local(osb))
 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
 	mlog_exit_void();
@@ -1466,8 +1479,9 @@ static int ocfs2_meta_lock_update(struct inode *inode,
 {
 	int status = 0;
 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
-	struct ocfs2_lock_res *lockres;
+	struct ocfs2_lock_res *lockres = NULL;
 	struct ocfs2_dinode *fe;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
 	mlog_entry_void();
 
@@ -1482,10 +1496,12 @@ static int ocfs2_meta_lock_update(struct inode *inode,
 	}
 	spin_unlock(&oi->ip_lock);
 
-	lockres = &oi->ip_meta_lockres;
+	if (!ocfs2_mount_local(osb)) {
+		lockres = &oi->ip_meta_lockres;
 
-	if (!ocfs2_should_refresh_lock_res(lockres))
-		goto bail;
+		if (!ocfs2_should_refresh_lock_res(lockres))
+			goto bail;
+	}
 
 	/* This will discard any caching information we might have had
 	 * for the inode metadata. */
@@ -1495,7 +1511,7 @@ static int ocfs2_meta_lock_update(struct inode *inode,
 	 * map (directories, bitmap files, etc) */
 	ocfs2_extent_map_trunc(inode, 0);
 
-	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
+	if (lockres && ocfs2_meta_lvb_is_trustable(inode, lockres)) {
 		mlog(0, "Trusting LVB on inode %llu\n",
 		     (unsigned long long)oi->ip_blkno);
 		ocfs2_refresh_inode_from_lvb(inode);
@@ -1542,7 +1558,8 @@ static int ocfs2_meta_lock_update(struct inode *inode,
 
 	status = 0;
 bail_refresh:
-	ocfs2_complete_lock_res_refresh(lockres, status);
+	if (lockres)
+		ocfs2_complete_lock_res_refresh(lockres, status);
 bail:
 	mlog_exit(status);
 	return status;
@@ -1579,13 +1596,12 @@ static int ocfs2_assign_bh(struct inode *inode,
  * the result of the lock will be communicated via the callback.
  */
 int ocfs2_meta_lock_full(struct inode *inode,
-			 struct ocfs2_journal_handle *handle,
 			 struct buffer_head **ret_bh,
 			 int ex,
 			 int arg_flags)
 {
 	int status, level, dlm_flags, acquired;
-	struct ocfs2_lock_res *lockres;
+	struct ocfs2_lock_res *lockres = NULL;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 	struct buffer_head *local_bh = NULL;
 
@@ -1607,6 +1623,9 @@ int ocfs2_meta_lock_full(struct inode *inode,
 		goto bail;
 	}
 
+	if (ocfs2_mount_local(osb))
+		goto local;
+
 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
 		wait_event(osb->recovery_event,
 			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
@@ -1636,6 +1655,7 @@ int ocfs2_meta_lock_full(struct inode *inode,
 		wait_event(osb->recovery_event,
 			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
 
+local:
 	/*
 	 * We only see this flag if we're being called from
 	 * ocfs2_read_locked_inode(). It means we're locking an inode
@@ -1644,7 +1664,8 @@ int ocfs2_meta_lock_full(struct inode *inode,
 	 */
 	if (inode->i_state & I_NEW) {
 		status = 0;
-		ocfs2_complete_lock_res_refresh(lockres, 0);
+		if (lockres)
+			ocfs2_complete_lock_res_refresh(lockres, 0);
 		goto bail;
 	}
 
@@ -1668,12 +1689,6 @@ int ocfs2_meta_lock_full(struct inode *inode,
 		}
 	}
 
-	if (handle) {
-		status = ocfs2_handle_add_lock(handle, inode);
-		if (status < 0)
-			mlog_errno(status);
-	}
-
 bail:
 	if (status < 0) {
 		if (ret_bh && (*ret_bh)) {
@@ -1713,18 +1728,16 @@ bail:
  * the lock inversion simply.
  */
 int ocfs2_meta_lock_with_page(struct inode *inode,
-			      struct ocfs2_journal_handle *handle,
 			      struct buffer_head **ret_bh,
 			      int ex,
 			      struct page *page)
 {
 	int ret;
 
-	ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
-				   OCFS2_LOCK_NONBLOCK);
+	ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
 	if (ret == -EAGAIN) {
 		unlock_page(page);
-		if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
+		if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
 			ocfs2_meta_unlock(inode, ex);
 		ret = AOP_TRUNCATED_PAGE;
 	}
@@ -1732,11 +1745,50 @@ int ocfs2_meta_lock_with_page(struct inode *inode,
 	return ret;
 }
 
+int ocfs2_meta_lock_atime(struct inode *inode,
+			  struct vfsmount *vfsmnt,
+			  int *level)
+{
+	int ret;
+
+	mlog_entry_void();
+	ret = ocfs2_meta_lock(inode, NULL, 0);
+	if (ret < 0) {
+		mlog_errno(ret);
+		return ret;
+	}
+
+	/*
+	 * If we should update atime, we will get EX lock,
+	 * otherwise we just get PR lock.
+	 */
+	if (ocfs2_should_update_atime(inode, vfsmnt)) {
+		struct buffer_head *bh = NULL;
+
+		ocfs2_meta_unlock(inode, 0);
+		ret = ocfs2_meta_lock(inode, &bh, 1);
+		if (ret < 0) {
+			mlog_errno(ret);
+			return ret;
+		}
+		*level = 1;
+		if (ocfs2_should_update_atime(inode, vfsmnt))
+			ocfs2_update_inode_atime(inode, bh);
+		if (bh)
+			brelse(bh);
+	} else
+		*level = 0;
+
+	mlog_exit(ret);
+	return ret;
+}
+
 void ocfs2_meta_unlock(struct inode *inode,
 		       int ex)
 {
 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
 	mlog_entry_void();
 
@@ -1744,7 +1796,8 @@ void ocfs2_meta_unlock(struct inode *inode,
 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
 	     ex ? "EXMODE" : "PRMODE");
 
-	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
+	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
+	    !ocfs2_mount_local(osb))
 		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
 
 	mlog_exit_void();
@@ -1753,7 +1806,7 @@ void ocfs2_meta_unlock(struct inode *inode,
 int ocfs2_super_lock(struct ocfs2_super *osb,
 		     int ex)
 {
-	int status;
+	int status = 0;
 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
 	struct buffer_head *bh;
@@ -1764,6 +1817,9 @@ int ocfs2_super_lock(struct ocfs2_super *osb,
 	if (ocfs2_is_hard_readonly(osb))
 		return -EROFS;
 
+	if (ocfs2_mount_local(osb))
+		goto bail;
+
 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
 	if (status < 0) {
 		mlog_errno(status);
@@ -1802,7 +1858,8 @@ void ocfs2_super_unlock(struct ocfs2_super *osb,
 	int level = ex ? LKM_EXMODE : LKM_PRMODE;
 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
 
-	ocfs2_cluster_unlock(osb, lockres, level);
+	if (!ocfs2_mount_local(osb))
+		ocfs2_cluster_unlock(osb, lockres, level);
 }
 
 int ocfs2_rename_lock(struct ocfs2_super *osb)
@@ -1813,6 +1870,9 @@ int ocfs2_rename_lock(struct ocfs2_super *osb)
 	if (ocfs2_is_hard_readonly(osb))
 		return -EROFS;
 
+	if (ocfs2_mount_local(osb))
+		return 0;
+
 	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
 	if (status < 0)
 		mlog_errno(status);
@@ -1824,7 +1884,8 @@ void ocfs2_rename_unlock(struct ocfs2_super *osb)
 {
 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
 
-	ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
+	if (!ocfs2_mount_local(osb))
+		ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
 }
 
 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
@@ -1839,6 +1900,9 @@ int ocfs2_dentry_lock(struct dentry *dentry, int ex)
 	if (ocfs2_is_hard_readonly(osb))
 		return -EROFS;
 
+	if (ocfs2_mount_local(osb))
+		return 0;
+
 	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
 	if (ret < 0)
 		mlog_errno(ret);
@@ -1852,7 +1916,8 @@ void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
 
-	ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
+	if (!ocfs2_mount_local(osb))
+		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
 }
 
 /* Reference counting of the dlm debug structure. We want this because
@@ -2115,12 +2180,15 @@ static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
 
 int ocfs2_dlm_init(struct ocfs2_super *osb)
 {
-	int status;
+	int status = 0;
 	u32 dlm_key;
-	struct dlm_ctxt *dlm;
+	struct dlm_ctxt *dlm = NULL;
 
 	mlog_entry_void();
 
+	if (ocfs2_mount_local(osb))
+		goto local;
+
 	status = ocfs2_dlm_init_debug(osb);
 	if (status < 0) {
 		mlog_errno(status);
@@ -2148,11 +2216,12 @@ int ocfs2_dlm_init(struct ocfs2_super *osb)
 		goto bail;
 	}
 
+	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
+
+local:
 	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
 	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
 
-	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
-
 	osb->dlm = dlm;
 
 	status = 0;
@@ -2649,6 +2718,15 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
        	inode = ocfs2_lock_res_inode(lockres);
 	mapping = inode->i_mapping;
 
+	/*
+	 * We need this before the filemap_fdatawrite() so that it can
+	 * transfer the dirty bit from the PTE to the
+	 * page. Unfortunately this means that even for EX->PR
+	 * downconverts, we'll lose our mappings and have to build
+	 * them up again.
+	 */
+	unmap_mapping_range(mapping, 0, 0, 0);
+
 	if (filemap_fdatawrite(mapping)) {
 		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
 		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
@@ -2656,7 +2734,6 @@ static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
 	sync_mapping_buffers(mapping);
 	if (blocking == LKM_EXMODE) {
 		truncate_inode_pages(mapping, 0);
-		unmap_mapping_range(mapping, 0, 0, 0);
 	} else {
 		/* We only need to wait on the I/O if we're not also
 		 * truncating pages because truncate_inode_pages waits
diff --git a/fs/ocfs2/dlmglue.h b/fs/ocfs2/dlmglue.h
index 4a2769387229..c343fca68cf1 100644
--- a/fs/ocfs2/dlmglue.h
+++ b/fs/ocfs2/dlmglue.h
@@ -68,8 +68,6 @@ void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
 				u64 parent, struct inode *inode);
 void ocfs2_lock_res_free(struct ocfs2_lock_res *res);
 int ocfs2_create_new_inode_locks(struct inode *inode);
-int ocfs2_create_new_lock(struct ocfs2_super *osb,
-			  struct ocfs2_lock_res *lockres, int ex, int local);
 int ocfs2_drop_inode_locks(struct inode *inode);
 int ocfs2_data_lock_full(struct inode *inode,
 			 int write,
@@ -82,19 +80,20 @@ void ocfs2_data_unlock(struct inode *inode,
 		       int write);
 int ocfs2_rw_lock(struct inode *inode, int write);
 void ocfs2_rw_unlock(struct inode *inode, int write);
+int ocfs2_meta_lock_atime(struct inode *inode,
+			  struct vfsmount *vfsmnt,
+			  int *level);
 int ocfs2_meta_lock_full(struct inode *inode,
-			 struct ocfs2_journal_handle *handle,
 			 struct buffer_head **ret_bh,
 			 int ex,
 			 int arg_flags);
 int ocfs2_meta_lock_with_page(struct inode *inode,
-			      struct ocfs2_journal_handle *handle,
 			      struct buffer_head **ret_bh,
 			      int ex,
 			      struct page *page);
 /* 99% of the time we don't want to supply any additional flags --
  * those are for very specific cases only. */
-#define ocfs2_meta_lock(i, h, b, e) ocfs2_meta_lock_full(i, h, b, e, 0)
+#define ocfs2_meta_lock(i, b, e) ocfs2_meta_lock_full(i, b, e, 0)
 void ocfs2_meta_unlock(struct inode *inode,
 		       int ex);
 int ocfs2_super_lock(struct ocfs2_super *osb,
diff --git a/fs/ocfs2/export.c b/fs/ocfs2/export.c
index fb91089a60a7..06be6e774cf9 100644
--- a/fs/ocfs2/export.c
+++ b/fs/ocfs2/export.c
@@ -100,7 +100,7 @@ static struct dentry *ocfs2_get_parent(struct dentry *child)
 	mlog(0, "find parent of directory %llu\n",
 	     (unsigned long long)OCFS2_I(dir)->ip_blkno);
 
-	status = ocfs2_meta_lock(dir, NULL, NULL, 0);
+	status = ocfs2_meta_lock(dir, NULL, 0);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
diff --git a/fs/ocfs2/extent_map.c b/fs/ocfs2/extent_map.c
index fcd4475d1f89..80ac69f11d9f 100644
--- a/fs/ocfs2/extent_map.c
+++ b/fs/ocfs2/extent_map.c
@@ -61,7 +61,7 @@ struct ocfs2_em_insert_context {
 	struct ocfs2_extent_map_entry *right_ent;
 };
 
-static kmem_cache_t *ocfs2_em_ent_cachep = NULL;
+static struct kmem_cache *ocfs2_em_ent_cachep = NULL;
 
 
 static struct ocfs2_extent_map_entry *
diff --git a/fs/ocfs2/file.c b/fs/ocfs2/file.c
index 1be74c4e7814..10953a508f2f 100644
--- a/fs/ocfs2/file.c
+++ b/fs/ocfs2/file.c
@@ -31,6 +31,8 @@
 #include <linux/pagemap.h>
 #include <linux/uio.h>
 #include <linux/sched.h>
+#include <linux/pipe_fs_i.h>
+#include <linux/mount.h>
 
 #define MLOG_MASK_PREFIX ML_INODE
 #include <cluster/masklog.h>
@@ -66,7 +68,7 @@ static int ocfs2_file_open(struct inode *inode, struct file *file)
 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
 
 	mlog_entry("(0x%p, 0x%p, '%.*s')\n", inode, file,
-		   file->f_dentry->d_name.len, file->f_dentry->d_name.name);
+		   file->f_path.dentry->d_name.len, file->f_path.dentry->d_name.name);
 
 	spin_lock(&oi->ip_lock);
 
@@ -96,8 +98,8 @@ static int ocfs2_file_release(struct inode *inode, struct file *file)
 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
 
 	mlog_entry("(0x%p, 0x%p, '%.*s')\n", inode, file,
-		       file->f_dentry->d_name.len,
-		       file->f_dentry->d_name.name);
+		       file->f_path.dentry->d_name.len,
+		       file->f_path.dentry->d_name.name);
 
 	spin_lock(&oi->ip_lock);
 	if (!--oi->ip_open_count)
@@ -134,7 +136,77 @@ bail:
 	return (err < 0) ? -EIO : 0;
 }
 
-int ocfs2_set_inode_size(struct ocfs2_journal_handle *handle,
+int ocfs2_should_update_atime(struct inode *inode,
+			      struct vfsmount *vfsmnt)
+{
+	struct timespec now;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+
+	if (ocfs2_is_hard_readonly(osb) || ocfs2_is_soft_readonly(osb))
+		return 0;
+
+	if ((inode->i_flags & S_NOATIME) ||
+	    ((inode->i_sb->s_flags & MS_NODIRATIME) && S_ISDIR(inode->i_mode)))
+		return 0;
+
+	/*
+	 * We can be called with no vfsmnt structure - NFSD will
+	 * sometimes do this.
+	 *
+	 * Note that our action here is different than touch_atime() -
+	 * if we can't tell whether this is a noatime mount, then we
+	 * don't know whether to trust the value of s_atime_quantum.
+	 */
+	if (vfsmnt == NULL)
+		return 0;
+
+	if ((vfsmnt->mnt_flags & MNT_NOATIME) ||
+	    ((vfsmnt->mnt_flags & MNT_NODIRATIME) && S_ISDIR(inode->i_mode)))
+		return 0;
+
+	if (vfsmnt->mnt_flags & MNT_RELATIME) {
+		if ((timespec_compare(&inode->i_atime, &inode->i_mtime) <= 0) ||
+		    (timespec_compare(&inode->i_atime, &inode->i_ctime) <= 0))
+			return 1;
+
+		return 0;
+	}
+
+	now = CURRENT_TIME;
+	if ((now.tv_sec - inode->i_atime.tv_sec <= osb->s_atime_quantum))
+		return 0;
+	else
+		return 1;
+}
+
+int ocfs2_update_inode_atime(struct inode *inode,
+			     struct buffer_head *bh)
+{
+	int ret;
+	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+	handle_t *handle;
+
+	mlog_entry_void();
+
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
+	if (handle == NULL) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		goto out;
+	}
+
+	inode->i_atime = CURRENT_TIME;
+	ret = ocfs2_mark_inode_dirty(handle, inode, bh);
+	if (ret < 0)
+		mlog_errno(ret);
+
+	ocfs2_commit_trans(OCFS2_SB(inode->i_sb), handle);
+out:
+	mlog_exit(ret);
+	return ret;
+}
+
+int ocfs2_set_inode_size(handle_t *handle,
 			 struct inode *inode,
 			 struct buffer_head *fe_bh,
 			 u64 new_i_size)
@@ -163,10 +235,9 @@ static int ocfs2_simple_size_update(struct inode *inode,
 {
 	int ret;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 
-	handle = ocfs2_start_trans(osb, NULL,
-				   OCFS2_INODE_UPDATE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
 	if (handle == NULL) {
 		ret = -ENOMEM;
 		mlog_errno(ret);
@@ -178,7 +249,7 @@ static int ocfs2_simple_size_update(struct inode *inode,
 	if (ret < 0)
 		mlog_errno(ret);
 
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 out:
 	return ret;
 }
@@ -189,14 +260,14 @@ static int ocfs2_orphan_for_truncate(struct ocfs2_super *osb,
 				     u64 new_i_size)
 {
 	int status;
-	struct ocfs2_journal_handle *handle;
+	handle_t *handle;
 
 	mlog_entry_void();
 
 	/* TODO: This needs to actually orphan the inode in this
 	 * transaction. */
 
-	handle = ocfs2_start_trans(osb, NULL, OCFS2_INODE_UPDATE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		mlog_errno(status);
@@ -207,7 +278,7 @@ static int ocfs2_orphan_for_truncate(struct ocfs2_super *osb,
 	if (status < 0)
 		mlog_errno(status);
 
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 out:
 	mlog_exit(status);
 	return status;
@@ -328,7 +399,7 @@ int ocfs2_do_extend_allocation(struct ocfs2_super *osb,
 			       struct inode *inode,
 			       u32 clusters_to_add,
 			       struct buffer_head *fe_bh,
-			       struct ocfs2_journal_handle *handle,
+			       handle_t *handle,
 			       struct ocfs2_alloc_context *data_ac,
 			       struct ocfs2_alloc_context *meta_ac,
 			       enum ocfs2_alloc_restarted *reason_ret)
@@ -433,7 +504,7 @@ static int ocfs2_extend_allocation(struct inode *inode,
 	u32 prev_clusters;
 	struct buffer_head *bh = NULL;
 	struct ocfs2_dinode *fe = NULL;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct ocfs2_alloc_context *data_ac = NULL;
 	struct ocfs2_alloc_context *meta_ac = NULL;
 	enum ocfs2_alloc_restarted why;
@@ -463,13 +534,6 @@ restart_all:
 	     (unsigned long long)OCFS2_I(inode)->ip_blkno, i_size_read(inode),
 	     fe->i_clusters, clusters_to_add);
 
-	handle = ocfs2_alloc_handle(osb);
-	if (handle == NULL) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto leave;
-	}
-
 	num_free_extents = ocfs2_num_free_extents(osb,
 						  inode,
 						  fe);
@@ -480,10 +544,7 @@ restart_all:
 	}
 
 	if (!num_free_extents) {
-		status = ocfs2_reserve_new_metadata(osb,
-						    handle,
-						    fe,
-						    &meta_ac);
+		status = ocfs2_reserve_new_metadata(osb, fe, &meta_ac);
 		if (status < 0) {
 			if (status != -ENOSPC)
 				mlog_errno(status);
@@ -491,10 +552,7 @@ restart_all:
 		}
 	}
 
-	status = ocfs2_reserve_clusters(osb,
-					handle,
-					clusters_to_add,
-					&data_ac);
+	status = ocfs2_reserve_clusters(osb, clusters_to_add, &data_ac);
 	if (status < 0) {
 		if (status != -ENOSPC)
 			mlog_errno(status);
@@ -509,7 +567,7 @@ restart_all:
 	drop_alloc_sem = 1;
 
 	credits = ocfs2_calc_extend_credits(osb->sb, fe, clusters_to_add);
-	handle = ocfs2_start_trans(osb, handle, credits);
+	handle = ocfs2_start_trans(osb, credits);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -589,7 +647,7 @@ leave:
 		drop_alloc_sem = 0;
 	}
 	if (handle) {
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
 		handle = NULL;
 	}
 	if (data_ac) {
@@ -624,7 +682,7 @@ static int ocfs2_write_zero_page(struct inode *inode,
 	struct page *page;
 	unsigned long index;
 	unsigned int offset;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	int ret;
 
 	offset = (size & (PAGE_CACHE_SIZE-1)); /* Within page */
@@ -668,7 +726,7 @@ static int ocfs2_write_zero_page(struct inode *inode,
 		ret = 0;
 
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(OCFS2_SB(inode->i_sb), handle);
 out_unlock:
 	unlock_page(page);
 	page_cache_release(page);
@@ -789,7 +847,7 @@ int ocfs2_setattr(struct dentry *dentry, struct iattr *attr)
 	struct super_block *sb = inode->i_sb;
 	struct ocfs2_super *osb = OCFS2_SB(sb);
 	struct buffer_head *bh = NULL;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 
 	mlog_entry("(0x%p, '%.*s')\n", dentry,
 	           dentry->d_name.len, dentry->d_name.name);
@@ -825,7 +883,7 @@ int ocfs2_setattr(struct dentry *dentry, struct iattr *attr)
 		}
 	}
 
-	status = ocfs2_meta_lock(inode, NULL, &bh, 1);
+	status = ocfs2_meta_lock(inode, &bh, 1);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
@@ -845,7 +903,7 @@ int ocfs2_setattr(struct dentry *dentry, struct iattr *attr)
 		}
 	}
 
-	handle = ocfs2_start_trans(osb, NULL, OCFS2_INODE_UPDATE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		mlog_errno(status);
@@ -863,7 +921,7 @@ int ocfs2_setattr(struct dentry *dentry, struct iattr *attr)
 		mlog_errno(status);
 
 bail_commit:
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 bail_unlock:
 	ocfs2_meta_unlock(inode, 1);
 bail_unlock_rw:
@@ -906,19 +964,39 @@ bail:
 	return err;
 }
 
+int ocfs2_permission(struct inode *inode, int mask, struct nameidata *nd)
+{
+	int ret;
+
+	mlog_entry_void();
+
+	ret = ocfs2_meta_lock(inode, NULL, 0);
+	if (ret) {
+		mlog_errno(ret);
+		goto out;
+	}
+
+	ret = generic_permission(inode, mask, NULL);
+
+	ocfs2_meta_unlock(inode, 0);
+out:
+	mlog_exit(ret);
+	return ret;
+}
+
 static int ocfs2_write_remove_suid(struct inode *inode)
 {
 	int ret;
 	struct buffer_head *bh = NULL;
 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
-	struct ocfs2_journal_handle *handle;
+	handle_t *handle;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 	struct ocfs2_dinode *di;
 
 	mlog_entry("(Inode %llu, mode 0%o)\n",
 		   (unsigned long long)oi->ip_blkno, inode->i_mode);
 
-	handle = ocfs2_start_trans(osb, NULL, OCFS2_INODE_UPDATE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
 	if (handle == NULL) {
 		ret = -ENOMEM;
 		mlog_errno(ret);
@@ -951,75 +1029,29 @@ static int ocfs2_write_remove_suid(struct inode *inode)
 out_bh:
 	brelse(bh);
 out_trans:
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 out:
 	mlog_exit(ret);
 	return ret;
 }
 
-static inline int ocfs2_write_should_remove_suid(struct inode *inode)
-{
-	mode_t mode = inode->i_mode;
-
-	if (!capable(CAP_FSETID)) {
-		if (unlikely(mode & S_ISUID))
-			return 1;
-
-		if (unlikely((mode & S_ISGID) && (mode & S_IXGRP)))
-			return 1;
-	}
-	return 0;
-}
-
-static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
-				    const struct iovec *iov,
-				    unsigned long nr_segs,
-				    loff_t pos)
+static int ocfs2_prepare_inode_for_write(struct dentry *dentry,
+					 loff_t *ppos,
+					 size_t count,
+					 int appending)
 {
-	int ret, rw_level = -1, meta_level = -1, have_alloc_sem = 0;
+	int ret = 0, meta_level = appending;
+	struct inode *inode = dentry->d_inode;
 	u32 clusters;
-	struct file *filp = iocb->ki_filp;
-	struct inode *inode = filp->f_dentry->d_inode;
 	loff_t newsize, saved_pos;
 
-	mlog_entry("(0x%p, %u, '%.*s')\n", filp,
-		   (unsigned int)nr_segs,
-		   filp->f_dentry->d_name.len,
-		   filp->f_dentry->d_name.name);
-
-	/* happy write of zero bytes */
-	if (iocb->ki_left == 0)
-		return 0;
-
-	if (!inode) {
-		mlog(0, "bad inode\n");
-		return -EIO;
-	}
-
-	mutex_lock(&inode->i_mutex);
-	/* to match setattr's i_mutex -> i_alloc_sem -> rw_lock ordering */
-	if (filp->f_flags & O_DIRECT) {
-		have_alloc_sem = 1;
-		down_read(&inode->i_alloc_sem);
-	}
-
-	/* concurrent O_DIRECT writes are allowed */
-	rw_level = (filp->f_flags & O_DIRECT) ? 0 : 1;
-	ret = ocfs2_rw_lock(inode, rw_level);
-	if (ret < 0) {
-		rw_level = -1;
-		mlog_errno(ret);
-		goto out;
-	}
-
 	/* 
 	 * We sample i_size under a read level meta lock to see if our write
 	 * is extending the file, if it is we back off and get a write level
 	 * meta lock.
 	 */
-	meta_level = (filp->f_flags & O_APPEND) ? 1 : 0;
 	for(;;) {
-		ret = ocfs2_meta_lock(inode, NULL, NULL, meta_level);
+		ret = ocfs2_meta_lock(inode, NULL, meta_level);
 		if (ret < 0) {
 			meta_level = -1;
 			mlog_errno(ret);
@@ -1035,7 +1067,7 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
 		 * inode. There's also the dinode i_size state which
 		 * can be lost via setattr during extending writes (we
 		 * set inode->i_size at the end of a write. */
-		if (ocfs2_write_should_remove_suid(inode)) {
+		if (should_remove_suid(dentry)) {
 			if (meta_level == 0) {
 				ocfs2_meta_unlock(inode, meta_level);
 				meta_level = 1;
@@ -1045,19 +1077,19 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
 			ret = ocfs2_write_remove_suid(inode);
 			if (ret < 0) {
 				mlog_errno(ret);
-				goto out;
+				goto out_unlock;
 			}
 		}
 
 		/* work on a copy of ppos until we're sure that we won't have
 		 * to recalculate it due to relocking. */
-		if (filp->f_flags & O_APPEND) {
+		if (appending) {
 			saved_pos = i_size_read(inode);
 			mlog(0, "O_APPEND: inode->i_size=%llu\n", saved_pos);
 		} else {
-			saved_pos = iocb->ki_pos;
+			saved_pos = *ppos;
 		}
-		newsize = iocb->ki_left + saved_pos;
+		newsize = count + saved_pos;
 
 		mlog(0, "pos=%lld newsize=%lld cursize=%lld\n",
 		     (long long) saved_pos, (long long) newsize,
@@ -1090,19 +1122,66 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
 		if (!clusters)
 			break;
 
-		ret = ocfs2_extend_file(inode, NULL, newsize, iocb->ki_left);
+		ret = ocfs2_extend_file(inode, NULL, newsize, count);
 		if (ret < 0) {
 			if (ret != -ENOSPC)
 				mlog_errno(ret);
-			goto out;
+			goto out_unlock;
 		}
 		break;
 	}
 
-	/* ok, we're done with i_size and alloc work */
-	iocb->ki_pos = saved_pos;
+	if (appending)
+		*ppos = saved_pos;
+
+out_unlock:
 	ocfs2_meta_unlock(inode, meta_level);
-	meta_level = -1;
+
+out:
+	return ret;
+}
+
+static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
+				    const struct iovec *iov,
+				    unsigned long nr_segs,
+				    loff_t pos)
+{
+	int ret, rw_level, have_alloc_sem = 0;
+	struct file *filp = iocb->ki_filp;
+	struct inode *inode = filp->f_path.dentry->d_inode;
+	int appending = filp->f_flags & O_APPEND ? 1 : 0;
+
+	mlog_entry("(0x%p, %u, '%.*s')\n", filp,
+		   (unsigned int)nr_segs,
+		   filp->f_path.dentry->d_name.len,
+		   filp->f_path.dentry->d_name.name);
+
+	/* happy write of zero bytes */
+	if (iocb->ki_left == 0)
+		return 0;
+
+	mutex_lock(&inode->i_mutex);
+	/* to match setattr's i_mutex -> i_alloc_sem -> rw_lock ordering */
+	if (filp->f_flags & O_DIRECT) {
+		have_alloc_sem = 1;
+		down_read(&inode->i_alloc_sem);
+	}
+
+	/* concurrent O_DIRECT writes are allowed */
+	rw_level = (filp->f_flags & O_DIRECT) ? 0 : 1;
+	ret = ocfs2_rw_lock(inode, rw_level);
+	if (ret < 0) {
+		rw_level = -1;
+		mlog_errno(ret);
+		goto out;
+	}
+
+	ret = ocfs2_prepare_inode_for_write(filp->f_path.dentry, &iocb->ki_pos,
+					    iocb->ki_left, appending);
+	if (ret < 0) {
+		mlog_errno(ret);
+		goto out;
+	}
 
 	/* communicate with ocfs2_dio_end_io */
 	ocfs2_iocb_set_rw_locked(iocb);
@@ -1128,8 +1207,6 @@ static ssize_t ocfs2_file_aio_write(struct kiocb *iocb,
 	}
 
 out:
-	if (meta_level != -1)
-		ocfs2_meta_unlock(inode, meta_level);
 	if (have_alloc_sem)
 		up_read(&inode->i_alloc_sem);
 	if (rw_level != -1) 
@@ -1140,19 +1217,90 @@ out:
 	return ret;
 }
 
+static ssize_t ocfs2_file_splice_write(struct pipe_inode_info *pipe,
+				       struct file *out,
+				       loff_t *ppos,
+				       size_t len,
+				       unsigned int flags)
+{
+	int ret;
+	struct inode *inode = out->f_path.dentry->d_inode;
+
+	mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", out, pipe,
+		   (unsigned int)len,
+		   out->f_path.dentry->d_name.len,
+		   out->f_path.dentry->d_name.name);
+
+	inode_double_lock(inode, pipe->inode);
+
+	ret = ocfs2_rw_lock(inode, 1);
+	if (ret < 0) {
+		mlog_errno(ret);
+		goto out;
+	}
+
+	ret = ocfs2_prepare_inode_for_write(out->f_path.dentry, ppos, len, 0);
+	if (ret < 0) {
+		mlog_errno(ret);
+		goto out_unlock;
+	}
+
+	/* ok, we're done with i_size and alloc work */
+	ret = generic_file_splice_write_nolock(pipe, out, ppos, len, flags);
+
+out_unlock:
+	ocfs2_rw_unlock(inode, 1);
+out:
+	inode_double_unlock(inode, pipe->inode);
+
+	mlog_exit(ret);
+	return ret;
+}
+
+static ssize_t ocfs2_file_splice_read(struct file *in,
+				      loff_t *ppos,
+				      struct pipe_inode_info *pipe,
+				      size_t len,
+				      unsigned int flags)
+{
+	int ret = 0;
+	struct inode *inode = in->f_path.dentry->d_inode;
+
+	mlog_entry("(0x%p, 0x%p, %u, '%.*s')\n", in, pipe,
+		   (unsigned int)len,
+		   in->f_path.dentry->d_name.len,
+		   in->f_path.dentry->d_name.name);
+
+	/*
+	 * See the comment in ocfs2_file_aio_read()
+	 */
+	ret = ocfs2_meta_lock(inode, NULL, 0);
+	if (ret < 0) {
+		mlog_errno(ret);
+		goto bail;
+	}
+	ocfs2_meta_unlock(inode, 0);
+
+	ret = generic_file_splice_read(in, ppos, pipe, len, flags);
+
+bail:
+	mlog_exit(ret);
+	return ret;
+}
+
 static ssize_t ocfs2_file_aio_read(struct kiocb *iocb,
 				   const struct iovec *iov,
 				   unsigned long nr_segs,
 				   loff_t pos)
 {
-	int ret = 0, rw_level = -1, have_alloc_sem = 0;
+	int ret = 0, rw_level = -1, have_alloc_sem = 0, lock_level = 0;
 	struct file *filp = iocb->ki_filp;
-	struct inode *inode = filp->f_dentry->d_inode;
+	struct inode *inode = filp->f_path.dentry->d_inode;
 
 	mlog_entry("(0x%p, %u, '%.*s')\n", filp,
 		   (unsigned int)nr_segs,
-		   filp->f_dentry->d_name.len,
-		   filp->f_dentry->d_name.name);
+		   filp->f_path.dentry->d_name.len,
+		   filp->f_path.dentry->d_name.name);
 
 	if (!inode) {
 		ret = -EINVAL;
@@ -1187,12 +1335,12 @@ static ssize_t ocfs2_file_aio_read(struct kiocb *iocb,
 	 * like i_size. This allows the checks down below
 	 * generic_file_aio_read() a chance of actually working. 
 	 */
-	ret = ocfs2_meta_lock(inode, NULL, NULL, 0);
+	ret = ocfs2_meta_lock_atime(inode, filp->f_vfsmnt, &lock_level);
 	if (ret < 0) {
 		mlog_errno(ret);
 		goto bail;
 	}
-	ocfs2_meta_unlock(inode, 0);
+	ocfs2_meta_unlock(inode, lock_level);
 
 	ret = generic_file_aio_read(iocb, iov, nr_segs, iocb->ki_pos);
 	if (ret == -EINVAL)
@@ -1220,11 +1368,13 @@ bail:
 struct inode_operations ocfs2_file_iops = {
 	.setattr	= ocfs2_setattr,
 	.getattr	= ocfs2_getattr,
+	.permission	= ocfs2_permission,
 };
 
 struct inode_operations ocfs2_special_file_iops = {
 	.setattr	= ocfs2_setattr,
 	.getattr	= ocfs2_getattr,
+	.permission	= ocfs2_permission,
 };
 
 const struct file_operations ocfs2_fops = {
@@ -1238,6 +1388,8 @@ const struct file_operations ocfs2_fops = {
 	.aio_read	= ocfs2_file_aio_read,
 	.aio_write	= ocfs2_file_aio_write,
 	.ioctl		= ocfs2_ioctl,
+	.splice_read	= ocfs2_file_splice_read,
+	.splice_write	= ocfs2_file_splice_write,
 };
 
 const struct file_operations ocfs2_dops = {
diff --git a/fs/ocfs2/file.h b/fs/ocfs2/file.h
index 740c9e7ca599..601a453f18a8 100644
--- a/fs/ocfs2/file.h
+++ b/fs/ocfs2/file.h
@@ -41,17 +41,24 @@ int ocfs2_do_extend_allocation(struct ocfs2_super *osb,
 			       struct inode *inode,
 			       u32 clusters_to_add,
 			       struct buffer_head *fe_bh,
-			       struct ocfs2_journal_handle *handle,
+			       handle_t *handle,
 			       struct ocfs2_alloc_context *data_ac,
 			       struct ocfs2_alloc_context *meta_ac,
 			       enum ocfs2_alloc_restarted *reason);
 int ocfs2_setattr(struct dentry *dentry, struct iattr *attr);
 int ocfs2_getattr(struct vfsmount *mnt, struct dentry *dentry,
 		  struct kstat *stat);
+int ocfs2_permission(struct inode *inode, int mask,
+		     struct nameidata *nd);
 
-int ocfs2_set_inode_size(struct ocfs2_journal_handle *handle,
+int ocfs2_set_inode_size(handle_t *handle,
 			 struct inode *inode,
 			 struct buffer_head *fe_bh,
 			 u64 new_i_size);
 
+int ocfs2_should_update_atime(struct inode *inode,
+			      struct vfsmount *vfsmnt);
+int ocfs2_update_inode_atime(struct inode *inode,
+			     struct buffer_head *bh);
+
 #endif /* OCFS2_FILE_H */
diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index cbfd45a97a63..8fc52d6d0ce7 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -154,6 +154,9 @@ int ocfs2_register_hb_callbacks(struct ocfs2_super *osb)
 {
 	int status;
 
+	if (ocfs2_mount_local(osb))
+		return 0;
+
 	status = o2hb_register_callback(&osb->osb_hb_down);
 	if (status < 0) {
 		mlog_errno(status);
@@ -172,6 +175,9 @@ void ocfs2_clear_hb_callbacks(struct ocfs2_super *osb)
 {
 	int status;
 
+	if (ocfs2_mount_local(osb))
+		return;
+
 	status = o2hb_unregister_callback(&osb->osb_hb_down);
 	if (status < 0)
 		mlog_errno(status);
@@ -186,6 +192,9 @@ void ocfs2_stop_heartbeat(struct ocfs2_super *osb)
 	int ret;
 	char *argv[5], *envp[3];
 
+	if (ocfs2_mount_local(osb))
+		return;
+
 	if (!osb->uuid_str) {
 		/* This can happen if we don't get far enough in mount... */
 		mlog(0, "No UUID with which to stop heartbeat!\n\n");
diff --git a/fs/ocfs2/inode.c b/fs/ocfs2/inode.c
index 16e8e74dc966..e4d91493d7d7 100644
--- a/fs/ocfs2/inode.c
+++ b/fs/ocfs2/inode.c
@@ -360,7 +360,6 @@ int ocfs2_populate_inode(struct inode *inode, struct ocfs2_dinode *fe,
 				  inode);
 
 	ocfs2_set_inode_flags(inode);
-	inode->i_flags |= S_NOATIME;
 
 	status = 0;
 bail:
@@ -424,7 +423,8 @@ static int ocfs2_read_locked_inode(struct inode *inode,
 	 * cluster lock before trusting anything anyway.
 	 */
 	can_lock = !(args->fi_flags & OCFS2_FI_FLAG_SYSFILE)
-		&& !(args->fi_flags & OCFS2_FI_FLAG_NOLOCK);
+		&& !(args->fi_flags & OCFS2_FI_FLAG_NOLOCK)
+		&& !ocfs2_mount_local(osb);
 
 	/*
 	 * To maintain backwards compatibility with older versions of
@@ -441,7 +441,7 @@ static int ocfs2_read_locked_inode(struct inode *inode,
 				  generation, inode);
 
 	if (can_lock) {
-		status = ocfs2_meta_lock(inode, NULL, NULL, 0);
+		status = ocfs2_meta_lock(inode, NULL, 0);
 		if (status) {
 			make_bad_inode(inode);
 			mlog_errno(status);
@@ -512,7 +512,7 @@ static int ocfs2_truncate_for_delete(struct ocfs2_super *osb,
 				     struct buffer_head *fe_bh)
 {
 	int status = 0;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct ocfs2_truncate_context *tc = NULL;
 	struct ocfs2_dinode *fe;
 
@@ -524,7 +524,7 @@ static int ocfs2_truncate_for_delete(struct ocfs2_super *osb,
 	if (!fe->i_clusters)
 		goto bail;
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_INODE_UPDATE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -538,7 +538,7 @@ static int ocfs2_truncate_for_delete(struct ocfs2_super *osb,
 		goto bail;
 	}
 
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 	handle = NULL;
 
 	status = ocfs2_prepare_truncate(osb, inode, fe_bh, &tc);
@@ -554,7 +554,7 @@ static int ocfs2_truncate_for_delete(struct ocfs2_super *osb,
 	}
 bail:
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
 
 	mlog_exit(status);
 	return status;
@@ -568,7 +568,7 @@ static int ocfs2_remove_inode(struct inode *inode,
 	int status;
 	struct inode *inode_alloc_inode = NULL;
 	struct buffer_head *inode_alloc_bh = NULL;
-	struct ocfs2_journal_handle *handle;
+	handle_t *handle;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 	struct ocfs2_dinode *di = (struct ocfs2_dinode *) di_bh->b_data;
 
@@ -582,7 +582,7 @@ static int ocfs2_remove_inode(struct inode *inode,
 	}
 
 	mutex_lock(&inode_alloc_inode->i_mutex);
-	status = ocfs2_meta_lock(inode_alloc_inode, NULL, &inode_alloc_bh, 1);
+	status = ocfs2_meta_lock(inode_alloc_inode, &inode_alloc_bh, 1);
 	if (status < 0) {
 		mutex_unlock(&inode_alloc_inode->i_mutex);
 
@@ -590,7 +590,7 @@ static int ocfs2_remove_inode(struct inode *inode,
 		goto bail;
 	}
 
-	handle = ocfs2_start_trans(osb, NULL, OCFS2_DELETE_INODE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_DELETE_INODE_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		mlog_errno(status);
@@ -629,7 +629,7 @@ static int ocfs2_remove_inode(struct inode *inode,
 		mlog_errno(status);
 
 bail_commit:
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 bail_unlock:
 	ocfs2_meta_unlock(inode_alloc_inode, 1);
 	mutex_unlock(&inode_alloc_inode->i_mutex);
@@ -705,7 +705,7 @@ static int ocfs2_wipe_inode(struct inode *inode,
 	 * delete_inode operation. We do this now to avoid races with
 	 * recovery completion on other nodes. */
 	mutex_lock(&orphan_dir_inode->i_mutex);
-	status = ocfs2_meta_lock(orphan_dir_inode, NULL, &orphan_dir_bh, 1);
+	status = ocfs2_meta_lock(orphan_dir_inode, &orphan_dir_bh, 1);
 	if (status < 0) {
 		mutex_unlock(&orphan_dir_inode->i_mutex);
 
@@ -933,7 +933,7 @@ void ocfs2_delete_inode(struct inode *inode)
 	 * allocation lock here as it won't be needed - nobody will
 	 * have the file open.
 	 */
-	status = ocfs2_meta_lock(inode, NULL, &di_bh, 1);
+	status = ocfs2_meta_lock(inode, &di_bh, 1);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
@@ -1067,12 +1067,6 @@ void ocfs2_clear_inode(struct inode *inode)
 	mlog_bug_on_msg(oi->ip_open_count,
 			"Clear inode of %llu has open count %d\n",
 			(unsigned long long)oi->ip_blkno, oi->ip_open_count);
-	mlog_bug_on_msg(!list_empty(&oi->ip_handle_list),
-			"Clear inode of %llu has non empty handle list\n",
-			(unsigned long long)oi->ip_blkno);
-	mlog_bug_on_msg(oi->ip_handle,
-			"Clear inode of %llu has non empty handle pointer\n",
-			(unsigned long long)oi->ip_blkno);
 
 	/* Clear all other flags. */
 	oi->ip_flags = OCFS2_INODE_CACHE_INLINE;
@@ -1186,7 +1180,7 @@ int ocfs2_inode_revalidate(struct dentry *dentry)
 
 	/* Let ocfs2_meta_lock do the work of updating our struct
 	 * inode for us. */
-	status = ocfs2_meta_lock(inode, NULL, NULL, 0);
+	status = ocfs2_meta_lock(inode, NULL, 0);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
@@ -1204,7 +1198,7 @@ bail:
  * struct inode.
  * Only takes ip_lock.
  */
-int ocfs2_mark_inode_dirty(struct ocfs2_journal_handle *handle,
+int ocfs2_mark_inode_dirty(handle_t *handle,
 			   struct inode *inode,
 			   struct buffer_head *bh)
 {
diff --git a/fs/ocfs2/inode.h b/fs/ocfs2/inode.h
index 9957810fdf85..1a7dd2945b34 100644
--- a/fs/ocfs2/inode.h
+++ b/fs/ocfs2/inode.h
@@ -48,13 +48,6 @@ struct ocfs2_inode_info
 
 	struct mutex			ip_io_mutex;
 
-	/* Used by the journalling code to attach an inode to a
-	 * handle.  These are protected by ip_io_mutex in order to lock
-	 * out other I/O to the inode until we either commit or
-	 * abort. */
-	struct list_head		ip_handle_list;
-	struct ocfs2_journal_handle	*ip_handle;
-
 	u32				ip_flags; /* see below */
 	u32				ip_attr; /* inode attributes */
 
@@ -113,7 +106,7 @@ static inline struct ocfs2_inode_info *OCFS2_I(struct inode *inode)
 #define INODE_JOURNAL(i) (OCFS2_I(i)->ip_flags & OCFS2_INODE_JOURNAL)
 #define SET_INODE_JOURNAL(i) (OCFS2_I(i)->ip_flags |= OCFS2_INODE_JOURNAL)
 
-extern kmem_cache_t *ocfs2_inode_cache;
+extern struct kmem_cache *ocfs2_inode_cache;
 
 extern const struct address_space_operations ocfs2_aops;
 
@@ -143,7 +136,7 @@ ssize_t ocfs2_rw_direct(int rw, struct file *filp, char *buf,
 void ocfs2_sync_blockdev(struct super_block *sb);
 void ocfs2_refresh_inode(struct inode *inode,
 			 struct ocfs2_dinode *fe);
-int ocfs2_mark_inode_dirty(struct ocfs2_journal_handle *handle,
+int ocfs2_mark_inode_dirty(handle_t *handle,
 			   struct inode *inode,
 			   struct buffer_head *bh);
 int ocfs2_aio_read(struct file *file, struct kiocb *req, struct iocb *iocb);
diff --git a/fs/ocfs2/ioctl.c b/fs/ocfs2/ioctl.c
index 3663cef80689..4768be5f3086 100644
--- a/fs/ocfs2/ioctl.c
+++ b/fs/ocfs2/ioctl.c
@@ -26,7 +26,7 @@ static int ocfs2_get_inode_attr(struct inode *inode, unsigned *flags)
 {
 	int status;
 
-	status = ocfs2_meta_lock(inode, NULL, NULL, 0);
+	status = ocfs2_meta_lock(inode, NULL, 0);
 	if (status < 0) {
 		mlog_errno(status);
 		return status;
@@ -43,14 +43,14 @@ static int ocfs2_set_inode_attr(struct inode *inode, unsigned flags,
 {
 	struct ocfs2_inode_info *ocfs2_inode = OCFS2_I(inode);
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct buffer_head *bh = NULL;
 	unsigned oldflags;
 	int status;
 
 	mutex_lock(&inode->i_mutex);
 
-	status = ocfs2_meta_lock(inode, NULL, &bh, 1);
+	status = ocfs2_meta_lock(inode, &bh, 1);
 	if (status < 0) {
 		mlog_errno(status);
 		goto bail;
@@ -67,7 +67,7 @@ static int ocfs2_set_inode_attr(struct inode *inode, unsigned flags,
 	if (!S_ISDIR(inode->i_mode))
 		flags &= ~OCFS2_DIRSYNC_FL;
 
-	handle = ocfs2_start_trans(osb, NULL, OCFS2_INODE_UPDATE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_INODE_UPDATE_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		mlog_errno(status);
@@ -96,7 +96,7 @@ static int ocfs2_set_inode_attr(struct inode *inode, unsigned flags,
 	if (status < 0)
 		mlog_errno(status);
 
-	ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
 bail_unlock:
 	ocfs2_meta_unlock(inode, 1);
 bail:
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index fd9734def551..825cb0ae1b4c 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -57,9 +57,6 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
 static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
-static void ocfs2_handle_cleanup_locks(struct ocfs2_journal *journal,
-				       struct ocfs2_journal_handle *handle);
-static void ocfs2_commit_unstarted_handle(struct ocfs2_journal_handle *handle);
 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
 				      int dirty);
 static int ocfs2_trylock_journal(struct ocfs2_super *osb,
@@ -113,46 +110,18 @@ finally:
 	return status;
 }
 
-struct ocfs2_journal_handle *ocfs2_alloc_handle(struct ocfs2_super *osb)
-{
-	struct ocfs2_journal_handle *retval = NULL;
-
-	retval = kcalloc(1, sizeof(*retval), GFP_NOFS);
-	if (!retval) {
-		mlog(ML_ERROR, "Failed to allocate memory for journal "
-		     "handle!\n");
-		return NULL;
-	}
-
-	retval->max_buffs = 0;
-	retval->num_locks = 0;
-	retval->k_handle = NULL;
-
-	INIT_LIST_HEAD(&retval->locks);
-	INIT_LIST_HEAD(&retval->inode_list);
-	retval->journal = osb->journal;
-
-	return retval;
-}
-
 /* pass it NULL and it will allocate a new handle object for you.  If
  * you pass it a handle however, it may still return error, in which
  * case it has free'd the passed handle for you. */
-struct ocfs2_journal_handle *ocfs2_start_trans(struct ocfs2_super *osb,
-					       struct ocfs2_journal_handle *handle,
-					       int max_buffs)
+handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
 {
-	int ret;
 	journal_t *journal = osb->journal->j_journal;
-
-	mlog_entry("(max_buffs = %d)\n", max_buffs);
+	handle_t *handle;
 
 	BUG_ON(!osb || !osb->journal->j_journal);
 
-	if (ocfs2_is_hard_readonly(osb)) {
-		ret = -EROFS;
-		goto done_free;
-	}
+	if (ocfs2_is_hard_readonly(osb))
+		return ERR_PTR(-EROFS);
 
 	BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE);
 	BUG_ON(max_buffs <= 0);
@@ -163,154 +132,41 @@ struct ocfs2_journal_handle *ocfs2_start_trans(struct ocfs2_super *osb,
 		BUG();
 	}
 
-	if (!handle)
-		handle = ocfs2_alloc_handle(osb);
-	if (!handle) {
-		ret = -ENOMEM;
-		mlog(ML_ERROR, "Failed to allocate memory for journal "
-		     "handle!\n");
-		goto done_free;
-	}
-
-	handle->max_buffs = max_buffs;
-
 	down_read(&osb->journal->j_trans_barrier);
 
-	/* actually start the transaction now */
-	handle->k_handle = journal_start(journal, max_buffs);
-	if (IS_ERR(handle->k_handle)) {
+	handle = journal_start(journal, max_buffs);
+	if (IS_ERR(handle)) {
 		up_read(&osb->journal->j_trans_barrier);
 
-		ret = PTR_ERR(handle->k_handle);
-		handle->k_handle = NULL;
-		mlog_errno(ret);
+		mlog_errno(PTR_ERR(handle));
 
 		if (is_journal_aborted(journal)) {
 			ocfs2_abort(osb->sb, "Detected aborted journal");
-			ret = -EROFS;
+			handle = ERR_PTR(-EROFS);
 		}
-		goto done_free;
+	} else {
+		if (!ocfs2_mount_local(osb))
+			atomic_inc(&(osb->journal->j_num_trans));
 	}
 
-	atomic_inc(&(osb->journal->j_num_trans));
-	handle->flags |= OCFS2_HANDLE_STARTED;
-
-	mlog_exit_ptr(handle);
 	return handle;
-
-done_free:
-	if (handle)
-		ocfs2_commit_unstarted_handle(handle); /* will kfree handle */
-
-	mlog_exit(ret);
-	return ERR_PTR(ret);
-}
-
-void ocfs2_handle_add_inode(struct ocfs2_journal_handle *handle,
-			    struct inode *inode)
-{
-	BUG_ON(!handle);
-	BUG_ON(!inode);
-
-	atomic_inc(&inode->i_count);
-
-	/* we're obviously changing it... */
-	mutex_lock(&inode->i_mutex);
-
-	/* sanity check */
-	BUG_ON(OCFS2_I(inode)->ip_handle);
-	BUG_ON(!list_empty(&OCFS2_I(inode)->ip_handle_list));
-
-	OCFS2_I(inode)->ip_handle = handle;
-	list_move_tail(&(OCFS2_I(inode)->ip_handle_list), &(handle->inode_list));
-}
-
-static void ocfs2_handle_unlock_inodes(struct ocfs2_journal_handle *handle)
-{
-	struct list_head *p, *n;
-	struct inode *inode;
-	struct ocfs2_inode_info *oi;
-
-	list_for_each_safe(p, n, &handle->inode_list) {
-		oi = list_entry(p, struct ocfs2_inode_info,
-				ip_handle_list);
-		inode = &oi->vfs_inode;
-
-		OCFS2_I(inode)->ip_handle = NULL;
-		list_del_init(&OCFS2_I(inode)->ip_handle_list);
-
-		mutex_unlock(&inode->i_mutex);
-		iput(inode);
-	}
 }
 
-/* This is trivial so we do it out of the main commit
- * paths. Beware, it can be called from start_trans too! */
-static void ocfs2_commit_unstarted_handle(struct ocfs2_journal_handle *handle)
+int ocfs2_commit_trans(struct ocfs2_super *osb,
+		       handle_t *handle)
 {
-	mlog_entry_void();
-
-	BUG_ON(handle->flags & OCFS2_HANDLE_STARTED);
-
-	ocfs2_handle_unlock_inodes(handle);
-	/* You are allowed to add journal locks before the transaction
-	 * has started. */
-	ocfs2_handle_cleanup_locks(handle->journal, handle);
-
-	kfree(handle);
-
-	mlog_exit_void();
-}
-
-void ocfs2_commit_trans(struct ocfs2_journal_handle *handle)
-{
-	handle_t *jbd_handle;
-	int retval;
-	struct ocfs2_journal *journal = handle->journal;
-
-	mlog_entry_void();
+	int ret;
+	struct ocfs2_journal *journal = osb->journal;
 
 	BUG_ON(!handle);
 
-	if (!(handle->flags & OCFS2_HANDLE_STARTED)) {
-		ocfs2_commit_unstarted_handle(handle);
-		mlog_exit_void();
-		return;
-	}
-
-	/* release inode semaphores we took during this transaction */
-	ocfs2_handle_unlock_inodes(handle);
-
-	/* ocfs2_extend_trans may have had to call journal_restart
-	 * which will always commit the transaction, but may return
-	 * error for any number of reasons. If this is the case, we
-	 * clear k_handle as it's not valid any more. */
-	if (handle->k_handle) {
-		jbd_handle = handle->k_handle;
-
-		if (handle->flags & OCFS2_HANDLE_SYNC)
-			jbd_handle->h_sync = 1;
-		else
-			jbd_handle->h_sync = 0;
-
-		/* actually stop the transaction. if we've set h_sync,
-		 * it'll have been committed when we return */
-		retval = journal_stop(jbd_handle);
-		if (retval < 0) {
-			mlog_errno(retval);
-			mlog(ML_ERROR, "Could not commit transaction\n");
-			BUG();
-		}
-
-		handle->k_handle = NULL; /* it's been free'd in journal_stop */
-	}
-
-	ocfs2_handle_cleanup_locks(journal, handle);
+	ret = journal_stop(handle);
+	if (ret < 0)
+		mlog_errno(ret);
 
 	up_read(&journal->j_trans_barrier);
 
-	kfree(handle);
-	mlog_exit_void();
+	return ret;
 }
 
 /*
@@ -326,20 +182,18 @@ void ocfs2_commit_trans(struct ocfs2_journal_handle *handle)
  * good because transaction ids haven't yet been recorded on the
  * cluster locks associated with this handle.
  */
-int ocfs2_extend_trans(struct ocfs2_journal_handle *handle,
-		       int nblocks)
+int ocfs2_extend_trans(handle_t *handle, int nblocks)
 {
 	int status;
 
 	BUG_ON(!handle);
-	BUG_ON(!(handle->flags & OCFS2_HANDLE_STARTED));
 	BUG_ON(!nblocks);
 
 	mlog_entry_void();
 
 	mlog(0, "Trying to extend transaction by %d blocks\n", nblocks);
 
-	status = journal_extend(handle->k_handle, nblocks);
+	status = journal_extend(handle, nblocks);
 	if (status < 0) {
 		mlog_errno(status);
 		goto bail;
@@ -347,15 +201,12 @@ int ocfs2_extend_trans(struct ocfs2_journal_handle *handle,
 
 	if (status > 0) {
 		mlog(0, "journal_extend failed, trying journal_restart\n");
-		status = journal_restart(handle->k_handle, nblocks);
+		status = journal_restart(handle, nblocks);
 		if (status < 0) {
-			handle->k_handle = NULL;
 			mlog_errno(status);
 			goto bail;
 		}
-		handle->max_buffs = nblocks;
-	} else
-		handle->max_buffs += nblocks;
+	}
 
 	status = 0;
 bail:
@@ -364,7 +215,7 @@ bail:
 	return status;
 }
 
-int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
+int ocfs2_journal_access(handle_t *handle,
 			 struct inode *inode,
 			 struct buffer_head *bh,
 			 int type)
@@ -374,7 +225,6 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
 	BUG_ON(!inode);
 	BUG_ON(!handle);
 	BUG_ON(!bh);
-	BUG_ON(!(handle->flags & OCFS2_HANDLE_STARTED));
 
 	mlog_entry("bh->b_blocknr=%llu, type=%d (\"%s\"), bh->b_size = %zu\n",
 		   (unsigned long long)bh->b_blocknr, type,
@@ -403,11 +253,11 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
 	switch (type) {
 	case OCFS2_JOURNAL_ACCESS_CREATE:
 	case OCFS2_JOURNAL_ACCESS_WRITE:
-		status = journal_get_write_access(handle->k_handle, bh);
+		status = journal_get_write_access(handle, bh);
 		break;
 
 	case OCFS2_JOURNAL_ACCESS_UNDO:
-		status = journal_get_undo_access(handle->k_handle, bh);
+		status = journal_get_undo_access(handle, bh);
 		break;
 
 	default:
@@ -424,17 +274,15 @@ int ocfs2_journal_access(struct ocfs2_journal_handle *handle,
 	return status;
 }
 
-int ocfs2_journal_dirty(struct ocfs2_journal_handle *handle,
+int ocfs2_journal_dirty(handle_t *handle,
 			struct buffer_head *bh)
 {
 	int status;
 
-	BUG_ON(!(handle->flags & OCFS2_HANDLE_STARTED));
-
 	mlog_entry("(bh->b_blocknr=%llu)\n",
 		   (unsigned long long)bh->b_blocknr);
 
-	status = journal_dirty_metadata(handle->k_handle, bh);
+	status = journal_dirty_metadata(handle, bh);
 	if (status < 0)
 		mlog(ML_ERROR, "Could not dirty metadata buffer. "
 		     "(bh->b_blocknr=%llu)\n",
@@ -456,59 +304,6 @@ int ocfs2_journal_dirty_data(handle_t *handle,
 	return err;
 }
 
-/* We always assume you're adding a metadata lock at level 'ex' */
-int ocfs2_handle_add_lock(struct ocfs2_journal_handle *handle,
-			  struct inode *inode)
-{
-	int status;
-	struct ocfs2_journal_lock *lock;
-
-	BUG_ON(!inode);
-
-	lock = kmem_cache_alloc(ocfs2_lock_cache, GFP_NOFS);
-	if (!lock) {
-		status = -ENOMEM;
-		mlog_errno(-ENOMEM);
-		goto bail;
-	}
-
-	if (!igrab(inode))
-		BUG();
-	lock->jl_inode = inode;
-
-	list_add_tail(&(lock->jl_lock_list), &(handle->locks));
-	handle->num_locks++;
-
-	status = 0;
-bail:
-	mlog_exit(status);
-	return status;
-}
-
-static void ocfs2_handle_cleanup_locks(struct ocfs2_journal *journal,
-				       struct ocfs2_journal_handle *handle)
-{
-	struct list_head *p, *n;
-	struct ocfs2_journal_lock *lock;
-	struct inode *inode;
-
-	list_for_each_safe(p, n, &(handle->locks)) {
-		lock = list_entry(p, struct ocfs2_journal_lock,
-				  jl_lock_list);
-		list_del(&lock->jl_lock_list);
-		handle->num_locks--;
-
-		inode = lock->jl_inode;
-		ocfs2_meta_unlock(inode, 1);
-		if (atomic_read(&inode->i_count) == 1)
-			mlog(ML_ERROR,
-			     "Inode %llu, I'm doing a last iput for!",
-			     (unsigned long long)OCFS2_I(inode)->ip_blkno);
-		iput(inode);
-		kmem_cache_free(ocfs2_lock_cache, lock);
-	}
-}
-
 #define OCFS2_DEFAULT_COMMIT_INTERVAL 	(HZ * 5)
 
 void ocfs2_set_journal_params(struct ocfs2_super *osb)
@@ -562,8 +357,7 @@ int ocfs2_journal_init(struct ocfs2_journal *journal, int *dirty)
 	/* Skip recovery waits here - journal inode metadata never
 	 * changes in a live cluster so it can be considered an
 	 * exception to the rule. */
-	status = ocfs2_meta_lock_full(inode, NULL, &bh, 1,
-				      OCFS2_META_LOCK_RECOVERY);
+	status = ocfs2_meta_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
 	if (status < 0) {
 		if (status != -ERESTARTSYS)
 			mlog(ML_ERROR, "Could not get lock on journal!\n");
@@ -715,9 +509,23 @@ void ocfs2_journal_shutdown(struct ocfs2_super *osb)
 
 	BUG_ON(atomic_read(&(osb->journal->j_num_trans)) != 0);
 
-	status = ocfs2_journal_toggle_dirty(osb, 0);
-	if (status < 0)
-		mlog_errno(status);
+	if (ocfs2_mount_local(osb)) {
+		journal_lock_updates(journal->j_journal);
+		status = journal_flush(journal->j_journal);
+		journal_unlock_updates(journal->j_journal);
+		if (status < 0)
+			mlog_errno(status);
+	}
+
+	if (status == 0) {
+		/*
+		 * Do not toggle if flush was unsuccessful otherwise
+		 * will leave dirty metadata in a "clean" journal
+		 */
+		status = ocfs2_journal_toggle_dirty(osb, 0);
+		if (status < 0)
+			mlog_errno(status);
+	}
 
 	/* Shutdown the kernel journal system */
 	journal_destroy(journal->j_journal);
@@ -757,7 +565,7 @@ static void ocfs2_clear_journal_error(struct super_block *sb,
 	}
 }
 
-int ocfs2_journal_load(struct ocfs2_journal *journal)
+int ocfs2_journal_load(struct ocfs2_journal *journal, int local)
 {
 	int status = 0;
 	struct ocfs2_super *osb;
@@ -784,14 +592,18 @@ int ocfs2_journal_load(struct ocfs2_journal *journal)
 	}
 
 	/* Launch the commit thread */
-	osb->commit_task = kthread_run(ocfs2_commit_thread, osb, "ocfs2cmt");
-	if (IS_ERR(osb->commit_task)) {
-		status = PTR_ERR(osb->commit_task);
+	if (!local) {
+		osb->commit_task = kthread_run(ocfs2_commit_thread, osb,
+					       "ocfs2cmt");
+		if (IS_ERR(osb->commit_task)) {
+			status = PTR_ERR(osb->commit_task);
+			osb->commit_task = NULL;
+			mlog(ML_ERROR, "unable to launch ocfs2commit thread, "
+			     "error=%d", status);
+			goto done;
+		}
+	} else
 		osb->commit_task = NULL;
-		mlog(ML_ERROR, "unable to launch ocfs2commit thread, error=%d",
-		     status);
-		goto done;
-	}
 
 done:
 	mlog_exit(status);
@@ -911,11 +723,12 @@ struct ocfs2_la_recovery_item {
  * NOTE: This function can and will sleep on recovery of other nodes
  * during cluster locking, just like any other ocfs2 process.
  */
-void ocfs2_complete_recovery(void *data)
+void ocfs2_complete_recovery(struct work_struct *work)
 {
 	int ret;
-	struct ocfs2_super *osb = data;
-	struct ocfs2_journal *journal = osb->journal;
+	struct ocfs2_journal *journal =
+		container_of(work, struct ocfs2_journal, j_recovery_work);
+	struct ocfs2_super *osb = journal->j_osb;
 	struct ocfs2_dinode *la_dinode, *tl_dinode;
 	struct ocfs2_la_recovery_item *item;
 	struct list_head *p, *n;
@@ -1160,8 +973,7 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
 	}
 	SET_INODE_JOURNAL(inode);
 
-	status = ocfs2_meta_lock_full(inode, NULL, &bh, 1,
-				      OCFS2_META_LOCK_RECOVERY);
+	status = ocfs2_meta_lock_full(inode, &bh, 1, OCFS2_META_LOCK_RECOVERY);
 	if (status < 0) {
 		mlog(0, "status returned from ocfs2_meta_lock=%d\n", status);
 		if (status != -ERESTARTSYS)
@@ -1350,7 +1162,7 @@ static int ocfs2_trylock_journal(struct ocfs2_super *osb,
 	SET_INODE_JOURNAL(inode);
 
 	flags = OCFS2_META_LOCK_RECOVERY | OCFS2_META_LOCK_NOQUEUE;
-	status = ocfs2_meta_lock_full(inode, NULL, NULL, 1, flags);
+	status = ocfs2_meta_lock_full(inode, NULL, 1, flags);
 	if (status < 0) {
 		if (status != -EAGAIN)
 			mlog_errno(status);
@@ -1433,7 +1245,7 @@ static int ocfs2_queue_orphans(struct ocfs2_super *osb,
 	}	
 
 	mutex_lock(&orphan_dir_inode->i_mutex);
-	status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0);
+	status = ocfs2_meta_lock(orphan_dir_inode, NULL, 0);
 	if (status < 0) {
 		mlog_errno(status);
 		goto out;
diff --git a/fs/ocfs2/journal.h b/fs/ocfs2/journal.h
index 2f3a6acdac45..e1216364d191 100644
--- a/fs/ocfs2/journal.h
+++ b/fs/ocfs2/journal.h
@@ -37,7 +37,6 @@ enum ocfs2_journal_state {
 
 struct ocfs2_super;
 struct ocfs2_dinode;
-struct ocfs2_journal_handle;
 
 struct ocfs2_journal {
 	enum ocfs2_journal_state   j_state;    /* Journals current state   */
@@ -133,46 +132,8 @@ static inline void ocfs2_inode_set_new(struct ocfs2_super *osb,
 	spin_unlock(&trans_inc_lock);
 }
 
-extern kmem_cache_t *ocfs2_lock_cache;
-
-struct ocfs2_journal_lock {
-	struct inode     *jl_inode;
-	struct list_head  jl_lock_list;
-};
-
-struct ocfs2_journal_handle {
-	handle_t            *k_handle; /* kernel handle.                */
-	struct ocfs2_journal        *journal;
-	u32                 flags;     /* see flags below.              */
-	int                 max_buffs; /* Buffs reserved by this handle */
-
-	/* The following two fields are for ocfs2_handle_add_lock */
-	int                 num_locks;
-	struct list_head    locks;     /* A bunch of locks to
-					* release on commit. This
-					* should be a list_head */
-
-	struct list_head     inode_list;
-};
-
-#define OCFS2_HANDLE_STARTED			1
-/* should we sync-commit this handle? */
-#define OCFS2_HANDLE_SYNC			2
-static inline int ocfs2_handle_started(struct ocfs2_journal_handle *handle)
-{
-	return handle->flags & OCFS2_HANDLE_STARTED;
-}
-
-static inline void ocfs2_handle_set_sync(struct ocfs2_journal_handle *handle, int sync)
-{
-	if (sync)
-		handle->flags |= OCFS2_HANDLE_SYNC;
-	else
-		handle->flags &= ~OCFS2_HANDLE_SYNC;
-}
-
 /* Exported only for the journal struct init code in super.c. Do not call. */
-void ocfs2_complete_recovery(void *data);
+void ocfs2_complete_recovery(struct work_struct *work);
 
 /*
  *  Journal Control:
@@ -196,7 +157,7 @@ int    ocfs2_journal_init(struct ocfs2_journal *journal,
 void   ocfs2_journal_shutdown(struct ocfs2_super *osb);
 int    ocfs2_journal_wipe(struct ocfs2_journal *journal,
 			  int full);
-int    ocfs2_journal_load(struct ocfs2_journal *journal);
+int    ocfs2_journal_load(struct ocfs2_journal *journal, int local);
 int    ocfs2_check_journals_nolocks(struct ocfs2_super *osb);
 void   ocfs2_recovery_thread(struct ocfs2_super *osb,
 			     int node_num);
@@ -213,6 +174,9 @@ static inline void ocfs2_checkpoint_inode(struct inode *inode)
 {
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
 
+	if (ocfs2_mount_local(osb))
+		return;
+
 	if (!ocfs2_inode_fully_checkpointed(inode)) {
 		/* WARNING: This only kicks off a single
 		 * checkpoint. If someone races you and adds more
@@ -231,15 +195,14 @@ static inline void ocfs2_checkpoint_inode(struct inode *inode)
  *  Transaction Handling:
  *  Manage the lifetime of a transaction handle.
  *
- *  ocfs2_alloc_handle     - Only allocate a handle so we can start putting
- *                          cluster locks on it. To actually change blocks,
- *                          call ocfs2_start_trans with the handle returned
- *                          from this function. You may call ocfs2_commit_trans
- *                           at any time in the lifetime of a handle.
  *  ocfs2_start_trans      - Begin a transaction. Give it an upper estimate of
  *                          the number of blocks that will be changed during
  *                          this handle.
- *  ocfs2_commit_trans     - Complete a handle.
+ *  ocfs2_commit_trans - Complete a handle. It might return -EIO if
+ *                       the journal was aborted. The majority of paths don't
+ *                       check the return value as an error there comes too
+ *                       late to do anything (and will be picked up in a
+ *                       later transaction).
  *  ocfs2_extend_trans     - Extend a handle by nblocks credits. This may
  *                          commit the handle to disk in the process, but will
  *                          not release any locks taken during the transaction.
@@ -249,24 +212,16 @@ static inline void ocfs2_checkpoint_inode(struct inode *inode)
  *  ocfs2_journal_dirty    - Mark a journalled buffer as having dirty data.
  *  ocfs2_journal_dirty_data - Indicate that a data buffer should go out before
  *                             the current handle commits.
- *  ocfs2_handle_add_lock  - Sometimes we need to delay lock release
- *                          until after a transaction has been completed. Use
- *                          ocfs2_handle_add_lock to indicate that a lock needs
- *                          to be released at the end of that handle. Locks
- *                          will be released in the order that they are added.
- *  ocfs2_handle_add_inode - Add a locked inode to a transaction.
  */
 
 /* You must always start_trans with a number of buffs > 0, but it's
  * perfectly legal to go through an entire transaction without having
  * dirtied any buffers. */
-struct ocfs2_journal_handle *ocfs2_alloc_handle(struct ocfs2_super *osb);
-struct ocfs2_journal_handle *ocfs2_start_trans(struct ocfs2_super *osb,
-					       struct ocfs2_journal_handle *handle,
+handle_t		    *ocfs2_start_trans(struct ocfs2_super *osb,
 					       int max_buffs);
-void			     ocfs2_commit_trans(struct ocfs2_journal_handle *handle);
-int			     ocfs2_extend_trans(struct ocfs2_journal_handle *handle,
-						int nblocks);
+int			     ocfs2_commit_trans(struct ocfs2_super *osb,
+						handle_t *handle);
+int			     ocfs2_extend_trans(handle_t *handle, int nblocks);
 
 /*
  * Create access is for when we get a newly created buffer and we're
@@ -283,7 +238,7 @@ int			     ocfs2_extend_trans(struct ocfs2_journal_handle *handle,
 #define OCFS2_JOURNAL_ACCESS_WRITE  1
 #define OCFS2_JOURNAL_ACCESS_UNDO   2
 
-int                  ocfs2_journal_access(struct ocfs2_journal_handle *handle,
+int                  ocfs2_journal_access(handle_t *handle,
 					  struct inode *inode,
 					  struct buffer_head *bh,
 					  int type);
@@ -306,18 +261,10 @@ int                  ocfs2_journal_access(struct ocfs2_journal_handle *handle,
  *	<modify the bh>
  * 	ocfs2_journal_dirty(handle, bh);
  */
-int                  ocfs2_journal_dirty(struct ocfs2_journal_handle *handle,
+int                  ocfs2_journal_dirty(handle_t *handle,
 					 struct buffer_head *bh);
 int                  ocfs2_journal_dirty_data(handle_t *handle,
 					      struct buffer_head *bh);
-int                  ocfs2_handle_add_lock(struct ocfs2_journal_handle *handle,
-					   struct inode *inode);
-/*
- * Use this to protect from other processes reading buffer state while
- * it's in flight.
- */
-void                 ocfs2_handle_add_inode(struct ocfs2_journal_handle *handle,
-					    struct inode *inode);
 
 /*
  *  Credit Macros:
diff --git a/fs/ocfs2/localalloc.c b/fs/ocfs2/localalloc.c
index 1f17a4d08287..4dedd9789108 100644
--- a/fs/ocfs2/localalloc.c
+++ b/fs/ocfs2/localalloc.c
@@ -58,19 +58,18 @@ static int ocfs2_local_alloc_find_clear_bits(struct ocfs2_super *osb,
 static void ocfs2_clear_local_alloc(struct ocfs2_dinode *alloc);
 
 static int ocfs2_sync_local_to_main(struct ocfs2_super *osb,
-				    struct ocfs2_journal_handle *handle,
+				    handle_t *handle,
 				    struct ocfs2_dinode *alloc,
 				    struct inode *main_bm_inode,
 				    struct buffer_head *main_bm_bh);
 
 static int ocfs2_local_alloc_reserve_for_window(struct ocfs2_super *osb,
-						struct ocfs2_journal_handle *handle,
 						struct ocfs2_alloc_context **ac,
 						struct inode **bitmap_inode,
 						struct buffer_head **bitmap_bh);
 
 static int ocfs2_local_alloc_new_window(struct ocfs2_super *osb,
-					struct ocfs2_journal_handle *handle,
+					handle_t *handle,
 					struct ocfs2_alloc_context *ac);
 
 static int ocfs2_local_alloc_slide_window(struct ocfs2_super *osb,
@@ -196,7 +195,7 @@ bail:
 void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
 {
 	int status;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle;
 	struct inode *local_alloc_inode = NULL;
 	struct buffer_head *bh = NULL;
 	struct buffer_head *main_bm_bh = NULL;
@@ -207,7 +206,7 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
 	mlog_entry_void();
 
 	if (osb->local_alloc_state == OCFS2_LA_UNUSED)
-		goto bail;
+		goto out;
 
 	local_alloc_inode =
 		ocfs2_get_system_file_inode(osb,
@@ -216,40 +215,34 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
 	if (!local_alloc_inode) {
 		status = -ENOENT;
 		mlog_errno(status);
-		goto bail;
+		goto out;
 	}
 
 	osb->local_alloc_state = OCFS2_LA_DISABLED;
 
-	handle = ocfs2_alloc_handle(osb);
-	if (!handle) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
 	main_bm_inode = ocfs2_get_system_file_inode(osb,
 						    GLOBAL_BITMAP_SYSTEM_INODE,
 						    OCFS2_INVALID_SLOT);
 	if (!main_bm_inode) {
 		status = -EINVAL;
 		mlog_errno(status);
-		goto bail;
+		goto out;
 	}
 
-	ocfs2_handle_add_inode(handle, main_bm_inode);
-	status = ocfs2_meta_lock(main_bm_inode, handle, &main_bm_bh, 1);
+	mutex_lock(&main_bm_inode->i_mutex);
+
+	status = ocfs2_meta_lock(main_bm_inode, &main_bm_bh, 1);
 	if (status < 0) {
 		mlog_errno(status);
-		goto bail;
+		goto out_mutex;
 	}
 
 	/* WINDOW_MOVE_CREDITS is a bit heavy... */
-	handle = ocfs2_start_trans(osb, handle, OCFS2_WINDOW_MOVE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_WINDOW_MOVE_CREDITS);
 	if (IS_ERR(handle)) {
 		mlog_errno(PTR_ERR(handle));
 		handle = NULL;
-		goto bail;
+		goto out_unlock;
 	}
 
 	bh = osb->local_alloc_bh;
@@ -258,7 +251,7 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
 	alloc_copy = kmalloc(bh->b_size, GFP_KERNEL);
 	if (!alloc_copy) {
 		status = -ENOMEM;
-		goto bail;
+		goto out_commit;
 	}
 	memcpy(alloc_copy, alloc, bh->b_size);
 
@@ -266,7 +259,7 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
 				      OCFS2_JOURNAL_ACCESS_WRITE);
 	if (status < 0) {
 		mlog_errno(status);
-		goto bail;
+		goto out_commit;
 	}
 
 	ocfs2_clear_local_alloc(alloc);
@@ -274,7 +267,7 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
 	status = ocfs2_journal_dirty(handle, bh);
 	if (status < 0) {
 		mlog_errno(status);
-		goto bail;
+		goto out_commit;
 	}
 
 	brelse(bh);
@@ -286,16 +279,20 @@ void ocfs2_shutdown_local_alloc(struct ocfs2_super *osb)
 	if (status < 0)
 		mlog_errno(status);
 
-bail:
-	if (handle)
-		ocfs2_commit_trans(handle);
+out_commit:
+	ocfs2_commit_trans(osb, handle);
 
+out_unlock:
 	if (main_bm_bh)
 		brelse(main_bm_bh);
 
-	if (main_bm_inode)
-		iput(main_bm_inode);
+	ocfs2_meta_unlock(main_bm_inode, 1);
 
+out_mutex:
+	mutex_unlock(&main_bm_inode->i_mutex);
+	iput(main_bm_inode);
+
+out:
 	if (local_alloc_inode)
 		iput(local_alloc_inode);
 
@@ -385,61 +382,59 @@ int ocfs2_complete_local_alloc_recovery(struct ocfs2_super *osb,
 					struct ocfs2_dinode *alloc)
 {
 	int status;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle;
 	struct buffer_head *main_bm_bh = NULL;
-	struct inode *main_bm_inode = NULL;
+	struct inode *main_bm_inode;
 
 	mlog_entry_void();
 
-	handle = ocfs2_alloc_handle(osb);
-	if (!handle) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
 	main_bm_inode = ocfs2_get_system_file_inode(osb,
 						    GLOBAL_BITMAP_SYSTEM_INODE,
 						    OCFS2_INVALID_SLOT);
 	if (!main_bm_inode) {
 		status = -EINVAL;
 		mlog_errno(status);
-		goto bail;
+		goto out;
 	}
 
-	ocfs2_handle_add_inode(handle, main_bm_inode);
-	status = ocfs2_meta_lock(main_bm_inode, handle, &main_bm_bh, 1);
+	mutex_lock(&main_bm_inode->i_mutex);
+
+	status = ocfs2_meta_lock(main_bm_inode, &main_bm_bh, 1);
 	if (status < 0) {
 		mlog_errno(status);
-		goto bail;
+		goto out_mutex;
 	}
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_WINDOW_MOVE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_WINDOW_MOVE_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
 		mlog_errno(status);
-		goto bail;
+		goto out_unlock;
 	}
 
 	/* we want the bitmap change to be recorded on disk asap */
-	ocfs2_handle_set_sync(handle, 1);
+	handle->h_sync = 1;
 
 	status = ocfs2_sync_local_to_main(osb, handle, alloc,
 					  main_bm_inode, main_bm_bh);
 	if (status < 0)
 		mlog_errno(status);
 
-bail:
-	if (handle)
-		ocfs2_commit_trans(handle);
+	ocfs2_commit_trans(osb, handle);
+
+out_unlock:
+	ocfs2_meta_unlock(main_bm_inode, 1);
+
+out_mutex:
+	mutex_unlock(&main_bm_inode->i_mutex);
 
 	if (main_bm_bh)
 		brelse(main_bm_bh);
 
-	if (main_bm_inode)
-		iput(main_bm_inode);
+	iput(main_bm_inode);
 
+out:
 	mlog_exit(status);
 	return status;
 }
@@ -452,7 +447,6 @@ bail:
  * our own in order to shift windows.
  */
 int ocfs2_reserve_local_alloc_bits(struct ocfs2_super *osb,
-				   struct ocfs2_journal_handle *passed_handle,
 				   u32 bits_wanted,
 				   struct ocfs2_alloc_context *ac)
 {
@@ -463,9 +457,7 @@ int ocfs2_reserve_local_alloc_bits(struct ocfs2_super *osb,
 
 	mlog_entry_void();
 
-	BUG_ON(!passed_handle);
 	BUG_ON(!ac);
-	BUG_ON(passed_handle->flags & OCFS2_HANDLE_STARTED);
 
 	local_alloc_inode =
 		ocfs2_get_system_file_inode(osb,
@@ -476,7 +468,11 @@ int ocfs2_reserve_local_alloc_bits(struct ocfs2_super *osb,
 		mlog_errno(status);
 		goto bail;
 	}
-	ocfs2_handle_add_inode(passed_handle, local_alloc_inode);
+
+	mutex_lock(&local_alloc_inode->i_mutex);
+
+	ac->ac_inode = local_alloc_inode;
+	ac->ac_which = OCFS2_AC_USE_LOCAL;
 
 	if (osb->local_alloc_state != OCFS2_LA_ENABLED) {
 		status = -ENOSPC;
@@ -515,21 +511,17 @@ int ocfs2_reserve_local_alloc_bits(struct ocfs2_super *osb,
 		}
 	}
 
-	ac->ac_inode = igrab(local_alloc_inode);
 	get_bh(osb->local_alloc_bh);
 	ac->ac_bh = osb->local_alloc_bh;
-	ac->ac_which = OCFS2_AC_USE_LOCAL;
 	status = 0;
 bail:
-	if (local_alloc_inode)
-		iput(local_alloc_inode);
 
 	mlog_exit(status);
 	return status;
 }
 
 int ocfs2_claim_local_alloc_bits(struct ocfs2_super *osb,
-				 struct ocfs2_journal_handle *handle,
+				 handle_t *handle,
 				 struct ocfs2_alloc_context *ac,
 				 u32 min_bits,
 				 u32 *bit_off,
@@ -707,7 +699,7 @@ static void ocfs2_verify_zero_bits(unsigned long *bitmap,
  * passed is used for caching.
  */
 static int ocfs2_sync_local_to_main(struct ocfs2_super *osb,
-				    struct ocfs2_journal_handle *handle,
+				    handle_t *handle,
 				    struct ocfs2_dinode *alloc,
 				    struct inode *main_bm_inode,
 				    struct buffer_head *main_bm_bh)
@@ -778,21 +770,19 @@ bail:
 }
 
 static int ocfs2_local_alloc_reserve_for_window(struct ocfs2_super *osb,
-						struct ocfs2_journal_handle *handle,
 						struct ocfs2_alloc_context **ac,
 						struct inode **bitmap_inode,
 						struct buffer_head **bitmap_bh)
 {
 	int status;
 
-	*ac = kcalloc(1, sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
+	*ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
 	if (!(*ac)) {
 		status = -ENOMEM;
 		mlog_errno(status);
 		goto bail;
 	}
 
-	(*ac)->ac_handle = handle;
 	(*ac)->ac_bits_wanted = ocfs2_local_alloc_window_bits(osb);
 
 	status = ocfs2_reserve_cluster_bitmap_bits(osb, *ac);
@@ -821,7 +811,7 @@ bail:
  * pass it the bitmap lock in lock_bh if you have it.
  */
 static int ocfs2_local_alloc_new_window(struct ocfs2_super *osb,
-					struct ocfs2_journal_handle *handle,
+					handle_t *handle,
 					struct ocfs2_alloc_context *ac)
 {
 	int status = 0;
@@ -888,23 +878,15 @@ static int ocfs2_local_alloc_slide_window(struct ocfs2_super *osb,
 	int status = 0;
 	struct buffer_head *main_bm_bh = NULL;
 	struct inode *main_bm_inode = NULL;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct ocfs2_dinode *alloc;
 	struct ocfs2_dinode *alloc_copy = NULL;
 	struct ocfs2_alloc_context *ac = NULL;
 
 	mlog_entry_void();
 
-	handle = ocfs2_alloc_handle(osb);
-	if (!handle) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
 	/* This will lock the main bitmap for us. */
 	status = ocfs2_local_alloc_reserve_for_window(osb,
-						      handle,
 						      &ac,
 						      &main_bm_inode,
 						      &main_bm_bh);
@@ -914,7 +896,7 @@ static int ocfs2_local_alloc_slide_window(struct ocfs2_super *osb,
 		goto bail;
 	}
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_WINDOW_MOVE_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_WINDOW_MOVE_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -972,7 +954,7 @@ static int ocfs2_local_alloc_slide_window(struct ocfs2_super *osb,
 	status = 0;
 bail:
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
 
 	if (main_bm_bh)
 		brelse(main_bm_bh);
diff --git a/fs/ocfs2/localalloc.h b/fs/ocfs2/localalloc.h
index 30f88ce14e46..385a10152f9c 100644
--- a/fs/ocfs2/localalloc.h
+++ b/fs/ocfs2/localalloc.h
@@ -42,12 +42,11 @@ int ocfs2_alloc_should_use_local(struct ocfs2_super *osb,
 
 struct ocfs2_alloc_context;
 int ocfs2_reserve_local_alloc_bits(struct ocfs2_super *osb,
-				   struct ocfs2_journal_handle *passed_handle,
 				   u32 bits_wanted,
 				   struct ocfs2_alloc_context *ac);
 
 int ocfs2_claim_local_alloc_bits(struct ocfs2_super *osb,
-				 struct ocfs2_journal_handle *handle,
+				 handle_t *handle,
 				 struct ocfs2_alloc_context *ac,
 				 u32 min_bits,
 				 u32 *bit_off,
diff --git a/fs/ocfs2/mmap.c b/fs/ocfs2/mmap.c
index 83934e33e5b0..51b020447683 100644
--- a/fs/ocfs2/mmap.c
+++ b/fs/ocfs2/mmap.c
@@ -82,16 +82,27 @@ static struct vm_operations_struct ocfs2_file_vm_ops = {
 
 int ocfs2_mmap(struct file *file, struct vm_area_struct *vma)
 {
+	int ret = 0, lock_level = 0;
+	struct ocfs2_super *osb = OCFS2_SB(file->f_dentry->d_inode->i_sb);
+
 	/* We don't want to support shared writable mappings yet. */
-	if (((vma->vm_flags & VM_SHARED) || (vma->vm_flags & VM_MAYSHARE))
-	    && ((vma->vm_flags & VM_WRITE) || (vma->vm_flags & VM_MAYWRITE))) {
+	if (!ocfs2_mount_local(osb) &&
+	    ((vma->vm_flags & VM_SHARED) || (vma->vm_flags & VM_MAYSHARE)) &&
+	    ((vma->vm_flags & VM_WRITE) || (vma->vm_flags & VM_MAYWRITE))) {
 		mlog(0, "disallow shared writable mmaps %lx\n", vma->vm_flags);
 		/* This is -EINVAL because generic_file_readonly_mmap
 		 * returns it in a similar situation. */
 		return -EINVAL;
 	}
 
-	file_accessed(file);
+	ret = ocfs2_meta_lock_atime(file->f_dentry->d_inode,
+				    file->f_vfsmnt, &lock_level);
+	if (ret < 0) {
+		mlog_errno(ret);
+		goto out;
+	}
+	ocfs2_meta_unlock(file->f_dentry->d_inode, lock_level);
+out:
 	vma->vm_ops = &ocfs2_file_vm_ops;
 	return 0;
 }
diff --git a/fs/ocfs2/namei.c b/fs/ocfs2/namei.c
index a57b751d4f40..9637039c2633 100644
--- a/fs/ocfs2/namei.c
+++ b/fs/ocfs2/namei.c
@@ -75,12 +75,12 @@ static int inline ocfs2_search_dirblock(struct buffer_head *bh,
 					unsigned long offset,
 					struct ocfs2_dir_entry **res_dir);
 
-static int ocfs2_delete_entry(struct ocfs2_journal_handle *handle,
+static int ocfs2_delete_entry(handle_t *handle,
 			      struct inode *dir,
 			      struct ocfs2_dir_entry *de_del,
 			      struct buffer_head *bh);
 
-static int __ocfs2_add_entry(struct ocfs2_journal_handle *handle,
+static int __ocfs2_add_entry(handle_t *handle,
 			     struct inode *dir,
 			     const char *name, int namelen,
 			     struct inode *inode, u64 blkno,
@@ -93,43 +93,37 @@ static int ocfs2_mknod_locked(struct ocfs2_super *osb,
 			      dev_t dev,
 			      struct buffer_head **new_fe_bh,
 			      struct buffer_head *parent_fe_bh,
-			      struct ocfs2_journal_handle *handle,
+			      handle_t *handle,
 			      struct inode **ret_inode,
 			      struct ocfs2_alloc_context *inode_ac);
 
 static int ocfs2_fill_new_dir(struct ocfs2_super *osb,
-			      struct ocfs2_journal_handle *handle,
+			      handle_t *handle,
 			      struct inode *parent,
 			      struct inode *inode,
 			      struct buffer_head *fe_bh,
 			      struct ocfs2_alloc_context *data_ac);
 
-static int ocfs2_double_lock(struct ocfs2_super *osb,
-			     struct ocfs2_journal_handle *handle,
-			     struct buffer_head **bh1,
-			     struct inode *inode1,
-			     struct buffer_head **bh2,
-			     struct inode *inode2);
-
 static int ocfs2_prepare_orphan_dir(struct ocfs2_super *osb,
-				    struct ocfs2_journal_handle *handle,
+				    struct inode **ret_orphan_dir,
 				    struct inode *inode,
 				    char *name,
 				    struct buffer_head **de_bh);
 
 static int ocfs2_orphan_add(struct ocfs2_super *osb,
-			    struct ocfs2_journal_handle *handle,
+			    handle_t *handle,
 			    struct inode *inode,
 			    struct ocfs2_dinode *fe,
 			    char *name,
-			    struct buffer_head *de_bh);
+			    struct buffer_head *de_bh,
+			    struct inode *orphan_dir_inode);
 
 static int ocfs2_create_symlink_data(struct ocfs2_super *osb,
-				     struct ocfs2_journal_handle *handle,
+				     handle_t *handle,
 				     struct inode *inode,
 				     const char *symname);
 
-static inline int ocfs2_add_entry(struct ocfs2_journal_handle *handle,
+static inline int ocfs2_add_entry(handle_t *handle,
 				  struct dentry *dentry,
 				  struct inode *inode, u64 blkno,
 				  struct buffer_head *parent_fe_bh,
@@ -165,7 +159,7 @@ static struct dentry *ocfs2_lookup(struct inode *dir, struct dentry *dentry,
 	mlog(0, "find name %.*s in directory %llu\n", dentry->d_name.len,
 	     dentry->d_name.name, (unsigned long long)OCFS2_I(dir)->ip_blkno);
 
-	status = ocfs2_meta_lock(dir, NULL, NULL, 0);
+	status = ocfs2_meta_lock(dir, NULL, 0);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
@@ -242,7 +236,7 @@ bail:
 }
 
 static int ocfs2_fill_new_dir(struct ocfs2_super *osb,
-			      struct ocfs2_journal_handle *handle,
+			      handle_t *handle,
 			      struct inode *parent,
 			      struct inode *inode,
 			      struct buffer_head *fe_bh,
@@ -317,7 +311,7 @@ static int ocfs2_mknod(struct inode *dir,
 {
 	int status = 0;
 	struct buffer_head *parent_fe_bh = NULL;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct ocfs2_super *osb;
 	struct ocfs2_dinode *dirfe;
 	struct buffer_head *new_fe_bh = NULL;
@@ -333,18 +327,11 @@ static int ocfs2_mknod(struct inode *dir,
 	/* get our super block */
 	osb = OCFS2_SB(dir->i_sb);
 
-	handle = ocfs2_alloc_handle(osb);
-	if (handle == NULL) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto leave;
-	}
-
-	status = ocfs2_meta_lock(dir, handle, &parent_fe_bh, 1);
+	status = ocfs2_meta_lock(dir, &parent_fe_bh, 1);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
-		goto leave;
+		return status;
 	}
 
 	if (S_ISDIR(mode) && (dir->i_nlink >= OCFS2_LINK_MAX)) {
@@ -374,7 +361,7 @@ static int ocfs2_mknod(struct inode *dir,
 	}
 
 	/* reserve an inode spot */
-	status = ocfs2_reserve_new_inode(osb, handle, &inode_ac);
+	status = ocfs2_reserve_new_inode(osb, &inode_ac);
 	if (status < 0) {
 		if (status != -ENOSPC)
 			mlog_errno(status);
@@ -384,7 +371,7 @@ static int ocfs2_mknod(struct inode *dir,
 	/* are we making a directory? If so, reserve a cluster for his
 	 * 1st extent. */
 	if (S_ISDIR(mode)) {
-		status = ocfs2_reserve_clusters(osb, handle, 1, &data_ac);
+		status = ocfs2_reserve_clusters(osb, 1, &data_ac);
 		if (status < 0) {
 			if (status != -ENOSPC)
 				mlog_errno(status);
@@ -392,7 +379,7 @@ static int ocfs2_mknod(struct inode *dir,
 		}
 	}
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_MKNOD_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_MKNOD_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -453,7 +440,9 @@ static int ocfs2_mknod(struct inode *dir,
 	status = 0;
 leave:
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
+
+	ocfs2_meta_unlock(dir, 1);
 
 	if (status == -ENOSPC)
 		mlog(0, "Disk is full\n");
@@ -487,7 +476,7 @@ static int ocfs2_mknod_locked(struct ocfs2_super *osb,
 			      dev_t dev,
 			      struct buffer_head **new_fe_bh,
 			      struct buffer_head *parent_fe_bh,
-			      struct ocfs2_journal_handle *handle,
+			      handle_t *handle,
 			      struct inode **ret_inode,
 			      struct ocfs2_alloc_context *inode_ac)
 {
@@ -598,9 +587,11 @@ static int ocfs2_mknod_locked(struct ocfs2_super *osb,
 	}
 
 	ocfs2_inode_set_new(osb, inode);
-	status = ocfs2_create_new_inode_locks(inode);
-	if (status < 0)
-		mlog_errno(status);
+	if (!ocfs2_mount_local(osb)) {
+		status = ocfs2_create_new_inode_locks(inode);
+		if (status < 0)
+			mlog_errno(status);
+	}
 
 	status = 0; /* error in ocfs2_create_new_inode_locks is not
 		     * critical */
@@ -653,7 +644,7 @@ static int ocfs2_link(struct dentry *old_dentry,
 		      struct inode *dir,
 		      struct dentry *dentry)
 {
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle;
 	struct inode *inode = old_dentry->d_inode;
 	int err;
 	struct buffer_head *fe_bh = NULL;
@@ -666,68 +657,60 @@ static int ocfs2_link(struct dentry *old_dentry,
 		   old_dentry->d_name.len, old_dentry->d_name.name,
 		   dentry->d_name.len, dentry->d_name.name);
 
-	if (S_ISDIR(inode->i_mode)) {
-		err = -EPERM;
-		goto bail;
-	}
-
-	handle = ocfs2_alloc_handle(osb);
-	if (handle == NULL) {
-		err = -ENOMEM;
-		goto bail;
-	}
+	if (S_ISDIR(inode->i_mode))
+		return -EPERM;
 
-	err = ocfs2_meta_lock(dir, handle, &parent_fe_bh, 1);
+	err = ocfs2_meta_lock(dir, &parent_fe_bh, 1);
 	if (err < 0) {
 		if (err != -ENOENT)
 			mlog_errno(err);
-		goto bail;
+		return err;
 	}
 
 	if (!dir->i_nlink) {
 		err = -ENOENT;
-		goto bail;
+		goto out;
 	}
 
 	err = ocfs2_check_dir_for_entry(dir, dentry->d_name.name,
 					dentry->d_name.len);
 	if (err)
-		goto bail;
+		goto out;
 
 	err = ocfs2_prepare_dir_for_insert(osb, dir, parent_fe_bh,
 					   dentry->d_name.name,
 					   dentry->d_name.len, &de_bh);
 	if (err < 0) {
 		mlog_errno(err);
-		goto bail;
+		goto out;
 	}
 
-	err = ocfs2_meta_lock(inode, handle, &fe_bh, 1);
+	err = ocfs2_meta_lock(inode, &fe_bh, 1);
 	if (err < 0) {
 		if (err != -ENOENT)
 			mlog_errno(err);
-		goto bail;
+		goto out;
 	}
 
 	fe = (struct ocfs2_dinode *) fe_bh->b_data;
 	if (le16_to_cpu(fe->i_links_count) >= OCFS2_LINK_MAX) {
 		err = -EMLINK;
-		goto bail;
+		goto out_unlock_inode;
 	}
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_LINK_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_LINK_CREDITS);
 	if (IS_ERR(handle)) {
 		err = PTR_ERR(handle);
 		handle = NULL;
 		mlog_errno(err);
-		goto bail;
+		goto out_unlock_inode;
 	}
 
 	err = ocfs2_journal_access(handle, inode, fe_bh,
 				   OCFS2_JOURNAL_ACCESS_WRITE);
 	if (err < 0) {
 		mlog_errno(err);
-		goto bail;
+		goto out_commit;
 	}
 
 	inc_nlink(inode);
@@ -741,7 +724,7 @@ static int ocfs2_link(struct dentry *old_dentry,
 		le16_add_cpu(&fe->i_links_count, -1);
 		drop_nlink(inode);
 		mlog_errno(err);
-		goto bail;
+		goto out_commit;
 	}
 
 	err = ocfs2_add_entry(handle, dentry, inode,
@@ -751,21 +734,27 @@ static int ocfs2_link(struct dentry *old_dentry,
 		le16_add_cpu(&fe->i_links_count, -1);
 		drop_nlink(inode);
 		mlog_errno(err);
-		goto bail;
+		goto out_commit;
 	}
 
 	err = ocfs2_dentry_attach_lock(dentry, inode, OCFS2_I(dir)->ip_blkno);
 	if (err) {
 		mlog_errno(err);
-		goto bail;
+		goto out_commit;
 	}
 
 	atomic_inc(&inode->i_count);
 	dentry->d_op = &ocfs2_dentry_ops;
 	d_instantiate(dentry, inode);
-bail:
-	if (handle)
-		ocfs2_commit_trans(handle);
+
+out_commit:
+	ocfs2_commit_trans(osb, handle);
+out_unlock_inode:
+	ocfs2_meta_unlock(inode, 1);
+
+out:
+	ocfs2_meta_unlock(dir, 1);
+
 	if (de_bh)
 		brelse(de_bh);
 	if (fe_bh)
@@ -812,13 +801,15 @@ static int ocfs2_unlink(struct inode *dir,
 			struct dentry *dentry)
 {
 	int status;
+	int child_locked = 0;
 	struct inode *inode = dentry->d_inode;
+	struct inode *orphan_dir = NULL;
 	struct ocfs2_super *osb = OCFS2_SB(dir->i_sb);
 	u64 blkno;
 	struct ocfs2_dinode *fe = NULL;
 	struct buffer_head *fe_bh = NULL;
 	struct buffer_head *parent_node_bh = NULL;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct ocfs2_dir_entry *dirent = NULL;
 	struct buffer_head *dirent_bh = NULL;
 	char orphan_name[OCFS2_ORPHAN_NAMELEN + 1];
@@ -833,22 +824,14 @@ static int ocfs2_unlink(struct inode *dir,
 
 	if (inode == osb->root_inode) {
 		mlog(0, "Cannot delete the root directory\n");
-		status = -EPERM;
-		goto leave;
-	}
-
-	handle = ocfs2_alloc_handle(osb);
-	if (handle == NULL) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto leave;
+		return -EPERM;
 	}
 
-	status = ocfs2_meta_lock(dir, handle, &parent_node_bh, 1);
+	status = ocfs2_meta_lock(dir, &parent_node_bh, 1);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
-		goto leave;
+		return status;
 	}
 
 	status = ocfs2_find_files_on_disk(dentry->d_name.name,
@@ -869,12 +852,13 @@ static int ocfs2_unlink(struct inode *dir,
 		goto leave;
 	}
 
-	status = ocfs2_meta_lock(inode, handle, &fe_bh, 1);
+	status = ocfs2_meta_lock(inode, &fe_bh, 1);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
 		goto leave;
 	}
+	child_locked = 1;
 
 	if (S_ISDIR(inode->i_mode)) {
 	       	if (!ocfs2_empty_dir(inode)) {
@@ -895,7 +879,7 @@ static int ocfs2_unlink(struct inode *dir,
 	}
 
 	if (inode_is_unlinkable(inode)) {
-		status = ocfs2_prepare_orphan_dir(osb, handle, inode,
+		status = ocfs2_prepare_orphan_dir(osb, &orphan_dir, inode,
 						  orphan_name,
 						  &orphan_entry_bh);
 		if (status < 0) {
@@ -904,7 +888,7 @@ static int ocfs2_unlink(struct inode *dir,
 		}
 	}
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_UNLINK_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_UNLINK_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -923,7 +907,7 @@ static int ocfs2_unlink(struct inode *dir,
 
 	if (inode_is_unlinkable(inode)) {
 		status = ocfs2_orphan_add(osb, handle, inode, fe, orphan_name,
-					  orphan_entry_bh);
+					  orphan_entry_bh, orphan_dir);
 		if (status < 0) {
 			mlog_errno(status);
 			goto leave;
@@ -960,7 +944,19 @@ static int ocfs2_unlink(struct inode *dir,
 
 leave:
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
+
+	if (child_locked)
+		ocfs2_meta_unlock(inode, 1);
+
+	ocfs2_meta_unlock(dir, 1);
+
+	if (orphan_dir) {
+		/* This was locked for us in ocfs2_prepare_orphan_dir() */
+		ocfs2_meta_unlock(orphan_dir, 1);
+		mutex_unlock(&orphan_dir->i_mutex);
+		iput(orphan_dir);
+	}
 
 	if (fe_bh)
 		brelse(fe_bh);
@@ -984,7 +980,6 @@ leave:
  * if they have the same id, then the 1st one is the only one locked.
  */
 static int ocfs2_double_lock(struct ocfs2_super *osb,
-			     struct ocfs2_journal_handle *handle,
 			     struct buffer_head **bh1,
 			     struct inode *inode1,
 			     struct buffer_head **bh2,
@@ -1000,8 +995,6 @@ static int ocfs2_double_lock(struct ocfs2_super *osb,
 		   (unsigned long long)oi1->ip_blkno,
 		   (unsigned long long)oi2->ip_blkno);
 
-	BUG_ON(!handle);
-
 	if (*bh1)
 		*bh1 = NULL;
 	if (*bh2)
@@ -1021,25 +1014,41 @@ static int ocfs2_double_lock(struct ocfs2_super *osb,
 			inode1 = tmpinode;
 		}
 		/* lock id2 */
-		status = ocfs2_meta_lock(inode2, handle, bh2, 1);
+		status = ocfs2_meta_lock(inode2, bh2, 1);
 		if (status < 0) {
 			if (status != -ENOENT)
 				mlog_errno(status);
 			goto bail;
 		}
 	}
+
 	/* lock id1 */
-	status = ocfs2_meta_lock(inode1, handle, bh1, 1);
+	status = ocfs2_meta_lock(inode1, bh1, 1);
 	if (status < 0) {
+		/*
+		 * An error return must mean that no cluster locks
+		 * were held on function exit.
+		 */
+		if (oi1->ip_blkno != oi2->ip_blkno)
+			ocfs2_meta_unlock(inode2, 1);
+
 		if (status != -ENOENT)
 			mlog_errno(status);
-		goto bail;
 	}
+
 bail:
 	mlog_exit(status);
 	return status;
 }
 
+static void ocfs2_double_unlock(struct inode *inode1, struct inode *inode2)
+{
+	ocfs2_meta_unlock(inode1, 1);
+
+	if (inode1 != inode2)
+		ocfs2_meta_unlock(inode2, 1);
+}
+
 #define PARENT_INO(buffer) \
 	((struct ocfs2_dir_entry *) \
 	 ((char *)buffer + \
@@ -1050,9 +1059,11 @@ static int ocfs2_rename(struct inode *old_dir,
 			struct inode *new_dir,
 			struct dentry *new_dentry)
 {
-	int status = 0, rename_lock = 0;
+	int status = 0, rename_lock = 0, parents_locked = 0;
+	int old_child_locked = 0, new_child_locked = 0;
 	struct inode *old_inode = old_dentry->d_inode;
 	struct inode *new_inode = new_dentry->d_inode;
+	struct inode *orphan_dir = NULL;
 	struct ocfs2_dinode *newfe = NULL;
 	char orphan_name[OCFS2_ORPHAN_NAMELEN + 1];
 	struct buffer_head *orphan_entry_bh = NULL;
@@ -1060,7 +1071,7 @@ static int ocfs2_rename(struct inode *old_dir,
 	struct buffer_head *insert_entry_bh = NULL;
 	struct ocfs2_super *osb = NULL;
 	u64 newfe_blkno;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct buffer_head *old_dir_bh = NULL;
 	struct buffer_head *new_dir_bh = NULL;
 	struct ocfs2_dir_entry *old_de = NULL, *new_de = NULL; // dirent for old_dentry
@@ -1105,21 +1116,14 @@ static int ocfs2_rename(struct inode *old_dir,
 		rename_lock = 1;
 	}
 
-	handle = ocfs2_alloc_handle(osb);
-	if (handle == NULL) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
 	/* if old and new are the same, this'll just do one lock. */
-	status = ocfs2_double_lock(osb, handle,
-				  &old_dir_bh, old_dir,
-				  &new_dir_bh, new_dir);
+	status = ocfs2_double_lock(osb, &old_dir_bh, old_dir,
+				   &new_dir_bh, new_dir);
 	if (status < 0) {
 		mlog_errno(status);
 		goto bail;
 	}
+	parents_locked = 1;
 
 	/* make sure both dirs have bhs
 	 * get an extra ref on old_dir_bh if old==new */
@@ -1140,12 +1144,13 @@ static int ocfs2_rename(struct inode *old_dir,
 	 * the vote thread on other nodes won't have to concurrently
 	 * downconvert the inode and the dentry locks.
 	 */
-	status = ocfs2_meta_lock(old_inode, handle, NULL, 1);
+	status = ocfs2_meta_lock(old_inode, NULL, 1);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
 		goto bail;
 	}
+	old_child_locked = 1;
 
 	status = ocfs2_remote_dentry_delete(old_dentry);
 	if (status < 0) {
@@ -1231,12 +1236,13 @@ static int ocfs2_rename(struct inode *old_dir,
 			goto bail;
 		}
 
-		status = ocfs2_meta_lock(new_inode, handle, &newfe_bh, 1);
+		status = ocfs2_meta_lock(new_inode, &newfe_bh, 1);
 		if (status < 0) {
 			if (status != -ENOENT)
 				mlog_errno(status);
 			goto bail;
 		}
+		new_child_locked = 1;
 
 		status = ocfs2_remote_dentry_delete(new_dentry);
 		if (status < 0) {
@@ -1252,7 +1258,7 @@ static int ocfs2_rename(struct inode *old_dir,
 		     (unsigned long long)newfe_bh->b_blocknr : 0ULL);
 
 		if (S_ISDIR(new_inode->i_mode) || (new_inode->i_nlink == 1)) {
-			status = ocfs2_prepare_orphan_dir(osb, handle,
+			status = ocfs2_prepare_orphan_dir(osb, &orphan_dir,
 							  new_inode,
 							  orphan_name,
 							  &orphan_entry_bh);
@@ -1280,7 +1286,7 @@ static int ocfs2_rename(struct inode *old_dir,
 		}
 	}
 
-	handle = ocfs2_start_trans(osb, handle, OCFS2_RENAME_CREDITS);
+	handle = ocfs2_start_trans(osb, OCFS2_RENAME_CREDITS);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -1307,7 +1313,7 @@ static int ocfs2_rename(struct inode *old_dir,
 		    (newfe->i_links_count == cpu_to_le16(1))){
 			status = ocfs2_orphan_add(osb, handle, new_inode,
 						  newfe, orphan_name,
-						  orphan_entry_bh);
+						  orphan_entry_bh, orphan_dir);
 			if (status < 0) {
 				mlog_errno(status);
 				goto bail;
@@ -1424,7 +1430,23 @@ bail:
 		ocfs2_rename_unlock(osb);
 
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
+
+	if (parents_locked)
+		ocfs2_double_unlock(old_dir, new_dir);
+
+	if (old_child_locked)
+		ocfs2_meta_unlock(old_inode, 1);
+
+	if (new_child_locked)
+		ocfs2_meta_unlock(new_inode, 1);
+
+	if (orphan_dir) {
+		/* This was locked for us in ocfs2_prepare_orphan_dir() */
+		ocfs2_meta_unlock(orphan_dir, 1);
+		mutex_unlock(&orphan_dir->i_mutex);
+		iput(orphan_dir);
+	}
 
 	if (new_inode)
 		sync_mapping_buffers(old_inode->i_mapping);
@@ -1458,7 +1480,7 @@ bail:
  * data, including the null terminator.
  */
 static int ocfs2_create_symlink_data(struct ocfs2_super *osb,
-				     struct ocfs2_journal_handle *handle,
+				     handle_t *handle,
 				     struct inode *inode,
 				     const char *symname)
 {
@@ -1573,7 +1595,7 @@ static int ocfs2_symlink(struct inode *dir,
 	struct buffer_head *parent_fe_bh = NULL;
 	struct ocfs2_dinode *fe = NULL;
 	struct ocfs2_dinode *dirfe;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	struct ocfs2_alloc_context *inode_ac = NULL;
 	struct ocfs2_alloc_context *data_ac = NULL;
 
@@ -1587,19 +1609,12 @@ static int ocfs2_symlink(struct inode *dir,
 
 	credits = ocfs2_calc_symlink_credits(sb);
 
-	handle = ocfs2_alloc_handle(osb);
-	if (handle == NULL) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
 	/* lock the parent directory */
-	status = ocfs2_meta_lock(dir, handle, &parent_fe_bh, 1);
+	status = ocfs2_meta_lock(dir, &parent_fe_bh, 1);
 	if (status < 0) {
 		if (status != -ENOENT)
 			mlog_errno(status);
-		goto bail;
+		return status;
 	}
 
 	dirfe = (struct ocfs2_dinode *) parent_fe_bh->b_data;
@@ -1622,7 +1637,7 @@ static int ocfs2_symlink(struct inode *dir,
 		goto bail;
 	}
 
-	status = ocfs2_reserve_new_inode(osb, handle, &inode_ac);
+	status = ocfs2_reserve_new_inode(osb, &inode_ac);
 	if (status < 0) {
 		if (status != -ENOSPC)
 			mlog_errno(status);
@@ -1631,7 +1646,7 @@ static int ocfs2_symlink(struct inode *dir,
 
 	/* don't reserve bitmap space for fast symlinks. */
 	if (l > ocfs2_fast_symlink_chars(sb)) {
-		status = ocfs2_reserve_clusters(osb, handle, 1, &data_ac);
+		status = ocfs2_reserve_clusters(osb, 1, &data_ac);
 		if (status < 0) {
 			if (status != -ENOSPC)
 				mlog_errno(status);
@@ -1639,7 +1654,7 @@ static int ocfs2_symlink(struct inode *dir,
 		}
 	}
 
-	handle = ocfs2_start_trans(osb, handle, credits);
+	handle = ocfs2_start_trans(osb, credits);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -1717,7 +1732,10 @@ static int ocfs2_symlink(struct inode *dir,
 	d_instantiate(dentry, inode);
 bail:
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
+
+	ocfs2_meta_unlock(dir, 1);
+
 	if (new_fe_bh)
 		brelse(new_fe_bh);
 	if (parent_fe_bh)
@@ -1768,7 +1786,7 @@ int ocfs2_check_dir_entry(struct inode * dir,
  * If you pass me insert_bh, I'll skip the search of the other dir
  * blocks and put the record in there.
  */
-static int __ocfs2_add_entry(struct ocfs2_journal_handle *handle,
+static int __ocfs2_add_entry(handle_t *handle,
 			     struct inode *dir,
 			     const char *name, int namelen,
 			     struct inode *inode, u64 blkno,
@@ -1854,7 +1872,7 @@ bail:
  * ocfs2_delete_entry deletes a directory entry by merging it with the
  * previous entry
  */
-static int ocfs2_delete_entry(struct ocfs2_journal_handle *handle,
+static int ocfs2_delete_entry(handle_t *handle,
 			      struct inode *dir,
 			      struct ocfs2_dir_entry *de_del,
 			      struct buffer_head *bh)
@@ -2085,19 +2103,19 @@ bail:
 }
 
 static int ocfs2_prepare_orphan_dir(struct ocfs2_super *osb,
-				    struct ocfs2_journal_handle *handle,
+				    struct inode **ret_orphan_dir,
 				    struct inode *inode,
 				    char *name,
 				    struct buffer_head **de_bh)
 {
-	struct inode *orphan_dir_inode = NULL;
+	struct inode *orphan_dir_inode;
 	struct buffer_head *orphan_dir_bh = NULL;
 	int status = 0;
 
 	status = ocfs2_blkno_stringify(OCFS2_I(inode)->ip_blkno, name);
 	if (status < 0) {
 		mlog_errno(status);
-		goto leave;
+		return status;
 	}
 
 	orphan_dir_inode = ocfs2_get_system_file_inode(osb,
@@ -2106,11 +2124,12 @@ static int ocfs2_prepare_orphan_dir(struct ocfs2_super *osb,
 	if (!orphan_dir_inode) {
 		status = -ENOENT;
 		mlog_errno(status);
-		goto leave;
+		return status;
 	}
 
-	ocfs2_handle_add_inode(handle, orphan_dir_inode);
-	status = ocfs2_meta_lock(orphan_dir_inode, handle, &orphan_dir_bh, 1);
+	mutex_lock(&orphan_dir_inode->i_mutex);
+
+	status = ocfs2_meta_lock(orphan_dir_inode, &orphan_dir_bh, 1);
 	if (status < 0) {
 		mlog_errno(status);
 		goto leave;
@@ -2120,13 +2139,19 @@ static int ocfs2_prepare_orphan_dir(struct ocfs2_super *osb,
 					      orphan_dir_bh, name,
 					      OCFS2_ORPHAN_NAMELEN, de_bh);
 	if (status < 0) {
+		ocfs2_meta_unlock(orphan_dir_inode, 1);
+
 		mlog_errno(status);
 		goto leave;
 	}
 
+	*ret_orphan_dir = orphan_dir_inode;
+
 leave:
-	if (orphan_dir_inode)
+	if (status) {
+		mutex_unlock(&orphan_dir_inode->i_mutex);
 		iput(orphan_dir_inode);
+	}
 
 	if (orphan_dir_bh)
 		brelse(orphan_dir_bh);
@@ -2136,28 +2161,19 @@ leave:
 }
 
 static int ocfs2_orphan_add(struct ocfs2_super *osb,
-			    struct ocfs2_journal_handle *handle,
+			    handle_t *handle,
 			    struct inode *inode,
 			    struct ocfs2_dinode *fe,
 			    char *name,
-			    struct buffer_head *de_bh)
+			    struct buffer_head *de_bh,
+			    struct inode *orphan_dir_inode)
 {
-	struct inode *orphan_dir_inode = NULL;
 	struct buffer_head *orphan_dir_bh = NULL;
 	int status = 0;
 	struct ocfs2_dinode *orphan_fe;
 
 	mlog_entry("(inode->i_ino = %lu)\n", inode->i_ino);
 
-	orphan_dir_inode = ocfs2_get_system_file_inode(osb,
-						       ORPHAN_DIR_SYSTEM_INODE,
-						       osb->slot_num);
-	if (!orphan_dir_inode) {
-		status = -ENOENT;
-		mlog_errno(status);
-		goto leave;
-	}
-
 	status = ocfs2_read_block(osb,
 				  OCFS2_I(orphan_dir_inode)->ip_blkno,
 				  &orphan_dir_bh, OCFS2_BH_CACHED,
@@ -2209,9 +2225,6 @@ static int ocfs2_orphan_add(struct ocfs2_super *osb,
 	     (unsigned long long)OCFS2_I(inode)->ip_blkno, osb->slot_num);
 
 leave:
-	if (orphan_dir_inode)
-		iput(orphan_dir_inode);
-
 	if (orphan_dir_bh)
 		brelse(orphan_dir_bh);
 
@@ -2221,7 +2234,7 @@ leave:
 
 /* unlike orphan_add, we expect the orphan dir to already be locked here. */
 int ocfs2_orphan_del(struct ocfs2_super *osb,
-		     struct ocfs2_journal_handle *handle,
+		     handle_t *handle,
 		     struct inode *orphan_dir_inode,
 		     struct inode *inode,
 		     struct buffer_head *orphan_dir_bh)
@@ -2300,4 +2313,5 @@ struct inode_operations ocfs2_dir_iops = {
 	.rename		= ocfs2_rename,
 	.setattr	= ocfs2_setattr,
 	.getattr	= ocfs2_getattr,
+	.permission	= ocfs2_permission,
 };
diff --git a/fs/ocfs2/namei.h b/fs/ocfs2/namei.h
index deaaa97dbf0b..8425944fcccd 100644
--- a/fs/ocfs2/namei.h
+++ b/fs/ocfs2/namei.h
@@ -39,7 +39,7 @@ struct buffer_head *ocfs2_find_entry(const char *name,
 				     struct inode *dir,
 				     struct ocfs2_dir_entry **res_dir);
 int ocfs2_orphan_del(struct ocfs2_super *osb,
-		     struct ocfs2_journal_handle *handle,
+		     handle_t *handle,
 		     struct inode *orphan_dir_inode,
 		     struct inode *inode,
 		     struct buffer_head *orphan_dir_bh);
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 0462a7f4e21b..db8e77cd35d3 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -34,6 +34,7 @@
 #include <linux/workqueue.h>
 #include <linux/kref.h>
 #include <linux/mutex.h>
+#include <linux/jbd.h>
 
 #include "cluster/nodemanager.h"
 #include "cluster/heartbeat.h"
@@ -179,9 +180,9 @@ enum ocfs2_mount_options
 #define OCFS2_OSB_SOFT_RO	0x0001
 #define OCFS2_OSB_HARD_RO	0x0002
 #define OCFS2_OSB_ERROR_FS	0x0004
+#define OCFS2_DEFAULT_ATIME_QUANTUM	60
 
 struct ocfs2_journal;
-struct ocfs2_journal_handle;
 struct ocfs2_super
 {
 	struct task_struct *commit_task;
@@ -218,6 +219,7 @@ struct ocfs2_super
 	unsigned long osb_flags;
 
 	unsigned long s_mount_opt;
+	unsigned int s_atime_quantum;
 
 	u16 max_slots;
 	s16 node_num;
@@ -283,7 +285,7 @@ struct ocfs2_super
 	/* Truncate log info */
 	struct inode			*osb_tl_inode;
 	struct buffer_head		*osb_tl_bh;
-	struct work_struct		osb_truncate_log_wq;
+	struct delayed_work		osb_truncate_log_wq;
 
 	struct ocfs2_node_map		osb_recovering_orphan_dirs;
 	unsigned int			*osb_orphan_wipes;
@@ -347,6 +349,11 @@ static inline int ocfs2_is_soft_readonly(struct ocfs2_super *osb)
 	return ret;
 }
 
+static inline int ocfs2_mount_local(struct ocfs2_super *osb)
+{
+	return (osb->s_feature_incompat & OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT);
+}
+
 #define OCFS2_IS_VALID_DINODE(ptr)					\
 	(!strcmp((ptr)->i_signature, OCFS2_INODE_SIGNATURE))
 
diff --git a/fs/ocfs2/ocfs2_fs.h b/fs/ocfs2/ocfs2_fs.h
index 3330a5dc6be2..b5c68567077e 100644
--- a/fs/ocfs2/ocfs2_fs.h
+++ b/fs/ocfs2/ocfs2_fs.h
@@ -86,7 +86,7 @@
 	OCFS2_SB(sb)->s_feature_incompat &= ~(mask)
 
 #define OCFS2_FEATURE_COMPAT_SUPP	0
-#define OCFS2_FEATURE_INCOMPAT_SUPP	0
+#define OCFS2_FEATURE_INCOMPAT_SUPP	OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT
 #define OCFS2_FEATURE_RO_COMPAT_SUPP	0
 
 /*
@@ -96,6 +96,18 @@
  */
 #define OCFS2_FEATURE_INCOMPAT_HEARTBEAT_DEV	0x0002
 
+/*
+ * tunefs sets this incompat flag before starting the resize and clears it
+ * at the end. This flag protects users from inadvertently mounting the fs
+ * after an aborted run without fsck-ing.
+ */
+#define OCFS2_FEATURE_INCOMPAT_RESIZE_INPROG    0x0004
+
+/* Used to denote a non-clustered volume */
+#define OCFS2_FEATURE_INCOMPAT_LOCAL_MOUNT	0x0008
+
+/* Support for sparse allocation in b-trees */
+#define OCFS2_FEATURE_INCOMPAT_SPARSE_ALLOC	0x0010
 
 /*
  * Flags on ocfs2_dinode.i_flags
diff --git a/fs/ocfs2/slot_map.c b/fs/ocfs2/slot_map.c
index aa6f5aadedc4..2d3ac32cb74e 100644
--- a/fs/ocfs2/slot_map.c
+++ b/fs/ocfs2/slot_map.c
@@ -175,7 +175,7 @@ int ocfs2_init_slot_info(struct ocfs2_super *osb)
 	struct buffer_head *bh = NULL;
 	struct ocfs2_slot_info *si;
 
-	si = kcalloc(1, sizeof(struct ocfs2_slot_info), GFP_KERNEL);
+	si = kzalloc(sizeof(struct ocfs2_slot_info), GFP_KERNEL);
 	if (!si) {
 		status = -ENOMEM;
 		mlog_errno(status);
diff --git a/fs/ocfs2/suballoc.c b/fs/ocfs2/suballoc.c
index 9d91e66f51a9..6dbb11762759 100644
--- a/fs/ocfs2/suballoc.c
+++ b/fs/ocfs2/suballoc.c
@@ -49,7 +49,7 @@
 static inline void ocfs2_debug_bg(struct ocfs2_group_desc *bg);
 static inline void ocfs2_debug_suballoc_inode(struct ocfs2_dinode *fe);
 static inline u16 ocfs2_find_victim_chain(struct ocfs2_chain_list *cl);
-static int ocfs2_block_group_fill(struct ocfs2_journal_handle *handle,
+static int ocfs2_block_group_fill(handle_t *handle,
 				  struct inode *alloc_inode,
 				  struct buffer_head *bg_bh,
 				  u64 group_blkno,
@@ -59,9 +59,6 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb,
 				   struct inode *alloc_inode,
 				   struct buffer_head *bh);
 
-static int ocfs2_reserve_suballoc_bits(struct ocfs2_super *osb,
-				       struct ocfs2_alloc_context *ac);
-
 static int ocfs2_cluster_group_search(struct inode *inode,
 				      struct buffer_head *group_bh,
 				      u32 bits_wanted, u32 min_bits,
@@ -72,6 +69,7 @@ static int ocfs2_block_group_search(struct inode *inode,
 				    u16 *bit_off, u16 *bits_found);
 static int ocfs2_claim_suballoc_bits(struct ocfs2_super *osb,
 				     struct ocfs2_alloc_context *ac,
+				     handle_t *handle,
 				     u32 bits_wanted,
 				     u32 min_bits,
 				     u16 *bit_off,
@@ -79,20 +77,20 @@ static int ocfs2_claim_suballoc_bits(struct ocfs2_super *osb,
 				     u64 *bg_blkno);
 static int ocfs2_test_bg_bit_allocatable(struct buffer_head *bg_bh,
 					 int nr);
-static inline int ocfs2_block_group_set_bits(struct ocfs2_journal_handle *handle,
+static inline int ocfs2_block_group_set_bits(handle_t *handle,
 					     struct inode *alloc_inode,
 					     struct ocfs2_group_desc *bg,
 					     struct buffer_head *group_bh,
 					     unsigned int bit_off,
 					     unsigned int num_bits);
-static inline int ocfs2_block_group_clear_bits(struct ocfs2_journal_handle *handle,
+static inline int ocfs2_block_group_clear_bits(handle_t *handle,
 					       struct inode *alloc_inode,
 					       struct ocfs2_group_desc *bg,
 					       struct buffer_head *group_bh,
 					       unsigned int bit_off,
 					       unsigned int num_bits);
 
-static int ocfs2_relink_block_group(struct ocfs2_journal_handle *handle,
+static int ocfs2_relink_block_group(handle_t *handle,
 				    struct inode *alloc_inode,
 				    struct buffer_head *fe_bh,
 				    struct buffer_head *bg_bh,
@@ -100,7 +98,7 @@ static int ocfs2_relink_block_group(struct ocfs2_journal_handle *handle,
 				    u16 chain);
 static inline int ocfs2_block_group_reasonably_empty(struct ocfs2_group_desc *bg,
 						     u32 wanted);
-static int ocfs2_free_suballoc_bits(struct ocfs2_journal_handle *handle,
+static int ocfs2_free_suballoc_bits(handle_t *handle,
 				    struct inode *alloc_inode,
 				    struct buffer_head *alloc_bh,
 				    unsigned int start_bit,
@@ -120,8 +118,16 @@ static inline void ocfs2_block_to_cluster_group(struct inode *inode,
 
 void ocfs2_free_alloc_context(struct ocfs2_alloc_context *ac)
 {
-	if (ac->ac_inode)
-		iput(ac->ac_inode);
+	struct inode *inode = ac->ac_inode;
+
+	if (inode) {
+		if (ac->ac_which != OCFS2_AC_USE_LOCAL)
+			ocfs2_meta_unlock(inode, 1);
+
+		mutex_unlock(&inode->i_mutex);
+
+		iput(inode);
+	}
 	if (ac->ac_bh)
 		brelse(ac->ac_bh);
 	kfree(ac);
@@ -190,7 +196,7 @@ static int ocfs2_check_group_descriptor(struct super_block *sb,
 	return 0;
 }
 
-static int ocfs2_block_group_fill(struct ocfs2_journal_handle *handle,
+static int ocfs2_block_group_fill(handle_t *handle,
 				  struct inode *alloc_inode,
 				  struct buffer_head *bg_bh,
 				  u64 group_blkno,
@@ -273,7 +279,7 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb,
 	struct ocfs2_dinode *fe = (struct ocfs2_dinode *) bh->b_data;
 	struct ocfs2_chain_list *cl;
 	struct ocfs2_alloc_context *ac = NULL;
-	struct ocfs2_journal_handle *handle = NULL;
+	handle_t *handle = NULL;
 	u32 bit_off, num_bits;
 	u16 alloc_rec;
 	u64 bg_blkno;
@@ -284,16 +290,8 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb,
 
 	mlog_entry_void();
 
-	handle = ocfs2_alloc_handle(osb);
-	if (!handle) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
 	cl = &fe->id2.i_chain;
 	status = ocfs2_reserve_clusters(osb,
-					handle,
 					le16_to_cpu(cl->cl_cpg),
 					&ac);
 	if (status < 0) {
@@ -304,7 +302,7 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb,
 
 	credits = ocfs2_calc_group_alloc_credits(osb->sb,
 						 le16_to_cpu(cl->cl_cpg));
-	handle = ocfs2_start_trans(osb, handle, credits);
+	handle = ocfs2_start_trans(osb, credits);
 	if (IS_ERR(handle)) {
 		status = PTR_ERR(handle);
 		handle = NULL;
@@ -389,7 +387,7 @@ static int ocfs2_block_group_alloc(struct ocfs2_super *osb,
 	status = 0;
 bail:
 	if (handle)
-		ocfs2_commit_trans(handle);
+		ocfs2_commit_trans(osb, handle);
 
 	if (ac)
 		ocfs2_free_alloc_context(ac);
@@ -402,27 +400,38 @@ bail:
 }
 
 static int ocfs2_reserve_suballoc_bits(struct ocfs2_super *osb,
-				       struct ocfs2_alloc_context *ac)
+				       struct ocfs2_alloc_context *ac,
+				       int type,
+				       u32 slot)
 {
 	int status;
 	u32 bits_wanted = ac->ac_bits_wanted;
-	struct inode *alloc_inode = ac->ac_inode;
+	struct inode *alloc_inode;
 	struct buffer_head *bh = NULL;
-	struct ocfs2_journal_handle *handle = ac->ac_handle;
 	struct ocfs2_dinode *fe;
 	u32 free_bits;
 
 	mlog_entry_void();
 
-	BUG_ON(handle->flags & OCFS2_HANDLE_STARTED);
+	alloc_inode = ocfs2_get_system_file_inode(osb, type, slot);
+	if (!alloc_inode) {
+		mlog_errno(-EINVAL);
+		return -EINVAL;
+	}
 
-	ocfs2_handle_add_inode(handle, alloc_inode);
-	status = ocfs2_meta_lock(alloc_inode, handle, &bh, 1);
+	mutex_lock(&alloc_inode->i_mutex);
+
+	status = ocfs2_meta_lock(alloc_inode, &bh, 1);
 	if (status < 0) {
+		mutex_unlock(&alloc_inode->i_mutex);
+		iput(alloc_inode);
+
 		mlog_errno(status);
-		goto bail;
+		return status;
 	}
 
+	ac->ac_inode = alloc_inode;
+
 	fe = (struct ocfs2_dinode *) bh->b_data;
 	if (!OCFS2_IS_VALID_DINODE(fe)) {
 		OCFS2_RO_ON_INVALID_DINODE(alloc_inode->i_sb, fe);
@@ -473,14 +482,13 @@ bail:
 }
 
 int ocfs2_reserve_new_metadata(struct ocfs2_super *osb,
-			       struct ocfs2_journal_handle *handle,
 			       struct ocfs2_dinode *fe,
 			       struct ocfs2_alloc_context **ac)
 {
 	int status;
-	struct inode *alloc_inode = NULL;
+	u32 slot;
 
-	*ac = kcalloc(1, sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
+	*ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
 	if (!(*ac)) {
 		status = -ENOMEM;
 		mlog_errno(status);
@@ -488,28 +496,18 @@ int ocfs2_reserve_new_metadata(struct ocfs2_super *osb,
 	}
 
 	(*ac)->ac_bits_wanted = ocfs2_extend_meta_needed(fe);
-	(*ac)->ac_handle = handle;
 	(*ac)->ac_which = OCFS2_AC_USE_META;
 
 #ifndef OCFS2_USE_ALL_METADATA_SUBALLOCATORS
-	alloc_inode = ocfs2_get_system_file_inode(osb,
-						  EXTENT_ALLOC_SYSTEM_INODE,
-						  0);
+	slot = 0;
 #else
-	alloc_inode = ocfs2_get_system_file_inode(osb,
-						  EXTENT_ALLOC_SYSTEM_INODE,
-						  osb->slot_num);
+	slot = osb->slot_num;
 #endif
-	if (!alloc_inode) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
 
-	(*ac)->ac_inode = igrab(alloc_inode);
 	(*ac)->ac_group_search = ocfs2_block_group_search;
 
-	status = ocfs2_reserve_suballoc_bits(osb, (*ac));
+	status = ocfs2_reserve_suballoc_bits(osb, (*ac),
+					     EXTENT_ALLOC_SYSTEM_INODE, slot);
 	if (status < 0) {
 		if (status != -ENOSPC)
 			mlog_errno(status);
@@ -523,21 +521,16 @@ bail:
 		*ac = NULL;
 	}
 
-	if (alloc_inode)
-		iput(alloc_inode);
-
 	mlog_exit(status);
 	return status;
 }
 
 int ocfs2_reserve_new_inode(struct ocfs2_super *osb,
-			    struct ocfs2_journal_handle *handle,
 			    struct ocfs2_alloc_context **ac)
 {
 	int status;
-	struct inode *alloc_inode = NULL;
 
-	*ac = kcalloc(1, sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
+	*ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
 	if (!(*ac)) {
 		status = -ENOMEM;
 		mlog_errno(status);
@@ -545,22 +538,13 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb,
 	}
 
 	(*ac)->ac_bits_wanted = 1;
-	(*ac)->ac_handle = handle;
 	(*ac)->ac_which = OCFS2_AC_USE_INODE;
 
-	alloc_inode = ocfs2_get_system_file_inode(osb,
-						  INODE_ALLOC_SYSTEM_INODE,
-						  osb->slot_num);
-	if (!alloc_inode) {
-		status = -ENOMEM;
-		mlog_errno(status);
-		goto bail;
-	}
-
-	(*ac)->ac_inode = igrab(alloc_inode);
 	(*ac)->ac_group_search = ocfs2_block_group_search;
 
-	status = ocfs2_reserve_suballoc_bits(osb, *ac);
+	status = ocfs2_reserve_suballoc_bits(osb, *ac,
+					     INODE_ALLOC_SYSTEM_INODE,
+					     osb->slot_num);
 	if (status < 0) {
 		if (status != -ENOSPC)
 			mlog_errno(status);
@@ -574,9 +558,6 @@ bail:
 		*ac = NULL;
 	}
 
-	if (alloc_inode)
-		iput(alloc_inode);
-
 	mlog_exit(status);
 	return status;
 }
@@ -588,20 +569,17 @@ int ocfs2_reserve_cluster_bitmap_bits(struct ocfs2_super *osb,
 {
 	int status;
 
-	ac->ac_inode = ocfs2_get_system_file_inode(osb,
-						   GLOBAL_BITMAP_SYSTEM_INODE,
-						   OCFS2_INVALID_SLOT);
-	if (!ac->ac_inode) {
-		status = -EINVAL;
-		mlog(ML_ERROR, "Could not get bitmap inode!\n");
-		goto bail;
-	}
 	ac->ac_which = OCFS2_AC_USE_MAIN;
 	ac->ac_group_search = ocfs2_cluster_group_search;
 
-	status = ocfs2_reserve_suballoc_bits(osb, ac);
-	if (status < 0 && status != -ENOSPC)
+	status = ocfs2_reserve_suballoc_bits(osb, ac,
+					     GLOBAL_BITMAP_SYSTEM_INODE,
+					     OCFS2_INVALID_SLOT);
+	if (status < 0 && status != -ENOSPC) {
 		mlog_errno(status);
+		goto bail;
+	}
+
 bail:
 	return status;
 }
@@ -610,7 +588,6 @@ bail:
  * use so we figure it out for them, but unfortunately this clutters
  * things a bit. */
 int ocfs2_reserve_clusters(struct ocfs2_super *osb,
-			   struct ocfs2_journal_handle *handle,
 			   u32 bits_wanted,
 			   struct ocfs2_alloc_context **ac)
 {
@@ -618,9 +595,7 @@ int ocfs2_reserve_clusters(struct ocfs2_super *osb,
 
 	mlog_entry_void();
 
-	BUG_ON(!handle);
-
-	*ac = kcalloc(1, sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
+	*ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL);
 	if (!(*ac)) {
 		status = -ENOMEM;
 		mlog_errno(status);
@@ -628,12 +603,10 @@ int ocfs2_reserve_clusters(struct ocfs2_super *osb,
 	}
 
 	(*ac)->ac_bits_wanted = bits_wanted;
-	(*ac)->ac_handle = handle;
 
 	status = -ENOSPC;
 	if (ocfs2_alloc_should_use_local(osb, bits_wanted)) {
 		status = ocfs2_reserve_local_alloc_bits(osb,
-							handle,
 							bits_wanted,
 							*ac);
 		if ((status < 0) && (status != -ENOSPC)) {
@@ -774,7 +747,7 @@ static int ocfs2_block_group_find_clear_bits(struct ocfs2_super *osb,
 	return status;
 }
 
-static inline int ocfs2_block_group_set_bits(struct ocfs2_journal_handle *handle,
+static inline int ocfs2_block_group_set_bits(handle_t *handle,
 					     struct inode *alloc_inode,
 					     struct ocfs2_group_desc *bg,
 					     struct buffer_head *group_bh,
@@ -845,7 +818,7 @@ static inline u16 ocfs2_find_victim_chain(struct ocfs2_chain_list *cl)
 	return best;
 }
 
-static int ocfs2_relink_block_group(struct ocfs2_journal_handle *handle,
+static int ocfs2_relink_block_group(handle_t *handle,
 				    struct inode *alloc_inode,
 				    struct buffer_head *fe_bh,
 				    struct buffer_head *bg_bh,
@@ -1025,7 +998,7 @@ static int ocfs2_block_group_search(struct inode *inode,
 }
 
 static int ocfs2_alloc_dinode_update_counts(struct inode *inode,
-				       struct ocfs2_journal_handle *handle,
+				       handle_t *handle,
 				       struct buffer_head *di_bh,
 				       u32 num_bits,
 				       u16 chain)
@@ -1055,6 +1028,7 @@ out:
 }
 
 static int ocfs2_search_one_group(struct ocfs2_alloc_context *ac,
+				  handle_t *handle,
 				  u32 bits_wanted,
 				  u32 min_bits,
 				  u16 *bit_off,
@@ -1067,7 +1041,6 @@ static int ocfs2_search_one_group(struct ocfs2_alloc_context *ac,
 	struct buffer_head *group_bh = NULL;
 	struct ocfs2_group_desc *gd;
 	struct inode *alloc_inode = ac->ac_inode;
-	struct ocfs2_journal_handle *handle = ac->ac_handle;
 
 	ret = ocfs2_read_block(OCFS2_SB(alloc_inode->i_sb), gd_blkno,
 			       &group_bh, OCFS2_BH_CACHED, alloc_inode);
@@ -1115,6 +1088,7 @@ out:
 }
 
 static int ocfs2_search_chain(struct ocfs2_alloc_context *ac,
+			      handle_t *handle,
 			      u32 bits_wanted,
 			      u32 min_bits,
 			      u16 *bit_off,
@@ -1126,7 +1100,6 @@ static int ocfs2_search_chain(struct ocfs2_alloc_context *ac,
 	u16 chain, tmp_bits;
 	u32 tmp_used;
 	u64 next_group;
-	struct ocfs2_journal_handle *handle = ac->ac_handle;
 	struct inode *alloc_inode = ac->ac_inode;
 	struct buffer_head *group_bh = NULL;
 	struct buffer_head *prev_group_bh = NULL;
@@ -1272,6 +1245,7 @@ bail:
 /* will give out up to bits_wanted contiguous bits. */
 static int ocfs2_claim_suballoc_bits(struct ocfs2_super *osb,
 				     struct ocfs2_alloc_context *ac,
+				     handle_t *handle,
 				     u32 bits_wanted,
 				     u32 min_bits,
 				     u16 *bit_off,
@@ -1313,8 +1287,8 @@ static int ocfs2_claim_suballoc_bits(struct ocfs2_super *osb,
 		 * by jumping straight to the most recently used
 		 * allocation group. This helps us mantain some
 		 * contiguousness across allocations. */
-		status = ocfs2_search_one_group(ac, bits_wanted, min_bits,
-						bit_off, num_bits,
+		status = ocfs2_search_one_group(ac, handle, bits_wanted,
+						min_bits, bit_off, num_bits,
 						hint_blkno, &bits_left);
 		if (!status) {
 			/* Be careful to update *bg_blkno here as the
@@ -1336,7 +1310,7 @@ static int ocfs2_claim_suballoc_bits(struct ocfs2_super *osb,
 	ac->ac_chain = victim;
 	ac->ac_allow_chain_relink = 1;
 
-	status = ocfs2_search_chain(ac, bits_wanted, min_bits, bit_off,
+	status = ocfs2_search_chain(ac, handle, bits_wanted, min_bits, bit_off,
 				    num_bits, bg_blkno, &bits_left);
 	if (!status)
 		goto set_hint;
@@ -1360,7 +1334,7 @@ static int ocfs2_claim_suballoc_bits(struct ocfs2_super *osb,
 			continue;
 
 		ac->ac_chain = i;
-		status = ocfs2_search_chain(ac, bits_wanted, min_bits,
+		status = ocfs2_search_chain(ac, handle, bits_wanted, min_bits,
 					    bit_off, num_bits, bg_blkno,
 					    &bits_left);
 		if (!status)
@@ -1388,7 +1362,7 @@ bail:
 }
 
 int ocfs2_claim_metadata(struct ocfs2_super *osb,
-			 struct ocfs2_journal_handle *handle,
+			 handle_t *handle,
 			 struct ocfs2_alloc_context *ac,
 			 u32 bits_wanted,
 			 u16 *suballoc_bit_start,
@@ -1401,10 +1375,10 @@ int ocfs2_claim_metadata(struct ocfs2_super *osb,
 	BUG_ON(!ac);
 	BUG_ON(ac->ac_bits_wanted < (ac->ac_bits_given + bits_wanted));
 	BUG_ON(ac->ac_which != OCFS2_AC_USE_META);
-	BUG_ON(ac->ac_handle != handle);
 
 	status = ocfs2_claim_suballoc_bits(osb,
 					   ac,
+					   handle,
 					   bits_wanted,
 					   1,
 					   suballoc_bit_start,
@@ -1425,7 +1399,7 @@ bail:
 }
 
 int ocfs2_claim_new_inode(struct ocfs2_super *osb,
-			  struct ocfs2_journal_handle *handle,
+			  handle_t *handle,
 			  struct ocfs2_alloc_context *ac,
 			  u16 *suballoc_bit,
 			  u64 *fe_blkno)
@@ -1440,10 +1414,10 @@ int ocfs2_claim_new_inode(struct ocfs2_super *osb,
 	BUG_ON(ac->ac_bits_given != 0);
 	BUG_ON(ac->ac_bits_wanted != 1);
 	BUG_ON(ac->ac_which != OCFS2_AC_USE_INODE);
-	BUG_ON(ac->ac_handle != handle);
 
 	status = ocfs2_claim_suballoc_bits(osb,
 					   ac,
+					   handle,
 					   1,
 					   1,
 					   suballoc_bit,
@@ -1528,7 +1502,7 @@ static inline void ocfs2_block_to_cluster_group(struct inode *inode,
  * of any size.
  */
 int ocfs2_claim_clusters(struct ocfs2_super *osb,
-			 struct ocfs2_journal_handle *handle,
+			 handle_t *handle,
 			 struct ocfs2_alloc_context *ac,
 			 u32 min_clusters,
 			 u32 *cluster_start,
@@ -1546,7 +1520,6 @@ int ocfs2_claim_clusters(struct ocfs2_super *osb,
 
 	BUG_ON(ac->ac_which != OCFS2_AC_USE_LOCAL
 	       && ac->ac_which != OCFS2_AC_USE_MAIN);
-	BUG_ON(ac->ac_handle != handle);
 
 	if (ac->ac_which == OCFS2_AC_USE_LOCAL) {
 		status = ocfs2_claim_local_alloc_bits(osb,
@@ -1572,6 +1545,7 @@ int ocfs2_claim_clusters(struct ocfs2_super *osb,
 
 		status = ocfs2_claim_suballoc_bits(osb,
 						   ac,
+						   handle,
 						   bits_wanted,
 						   min_clusters,
 						   &bg_bit_off,
@@ -1598,7 +1572,7 @@ bail:
 	return status;
 }
 
-static inline int ocfs2_block_group_clear_bits(struct ocfs2_journal_handle *handle,
+static inline int ocfs2_block_group_clear_bits(handle_t *handle,
 					       struct inode *alloc_inode,
 					       struct ocfs2_group_desc *bg,
 					       struct buffer_head *group_bh,
@@ -1653,7 +1627,7 @@ bail:
 /*
  * expects the suballoc inode to already be locked.
  */
-static int ocfs2_free_suballoc_bits(struct ocfs2_journal_handle *handle,
+static int ocfs2_free_suballoc_bits(handle_t *handle,
 				    struct inode *alloc_inode,
 				    struct buffer_head *alloc_bh,
 				    unsigned int start_bit,
@@ -1737,7 +1711,7 @@ static inline u64 ocfs2_which_suballoc_group(u64 block, unsigned int bit)
 	return group;
 }
 
-int ocfs2_free_dinode(struct ocfs2_journal_handle *handle,
+int ocfs2_free_dinode(handle_t *handle,
 		      struct inode *inode_alloc_inode,
 		      struct buffer_head *inode_alloc_bh,
 		      struct ocfs2_dinode *di)
@@ -1750,7 +1724,7 @@ int ocfs2_free_dinode(struct ocfs2_journal_handle *handle,
 					inode_alloc_bh, bit, bg_blkno, 1);
 }
 
-int ocfs2_free_extent_block(struct ocfs2_journal_handle *handle,
+int ocfs2_free_extent_block(handle_t *handle,
 			    struct inode *eb_alloc_inode,
 			    struct buffer_head *eb_alloc_bh,
 			    struct ocfs2_extent_block *eb)
@@ -1763,7 +1737,7 @@ int ocfs2_free_extent_block(struct ocfs2_journal_handle *handle,
 					bit, bg_blkno, 1);
 }
 
-int ocfs2_free_clusters(struct ocfs2_journal_handle *handle,
+int ocfs2_free_clusters(handle_t *handle,
 		       struct inode *bitmap_inode,
 		       struct buffer_head *bitmap_bh,
 		       u64 start_blk,
diff --git a/fs/ocfs2/suballoc.h b/fs/ocfs2/suballoc.h
index c787838d1052..1a3c94cb9250 100644
--- a/fs/ocfs2/suballoc.h
+++ b/fs/ocfs2/suballoc.h
@@ -43,7 +43,6 @@ struct ocfs2_alloc_context {
 #define OCFS2_AC_USE_INODE 3
 #define OCFS2_AC_USE_META  4
 	u32    ac_which;
-	struct ocfs2_journal_handle *ac_handle;
 
 	/* these are used by the chain search */
 	u16    ac_chain;
@@ -60,45 +59,42 @@ static inline int ocfs2_alloc_context_bits_left(struct ocfs2_alloc_context *ac)
 }
 
 int ocfs2_reserve_new_metadata(struct ocfs2_super *osb,
-			       struct ocfs2_journal_handle *handle,
 			       struct ocfs2_dinode *fe,
 			       struct ocfs2_alloc_context **ac);
 int ocfs2_reserve_new_inode(struct ocfs2_super *osb,
-			    struct ocfs2_journal_handle *handle,
 			    struct ocfs2_alloc_context **ac);
 int ocfs2_reserve_clusters(struct ocfs2_super *osb,
-			   struct ocfs2_journal_handle *handle,
 			   u32 bits_wanted,
 			   struct ocfs2_alloc_context **ac);
 
 int ocfs2_claim_metadata(struct ocfs2_super *osb,
-			 struct ocfs2_journal_handle *handle,
+			 handle_t *handle,
 			 struct ocfs2_alloc_context *ac,
 			 u32 bits_wanted,
 			 u16 *suballoc_bit_start,
 			 u32 *num_bits,
 			 u64 *blkno_start);
 int ocfs2_claim_new_inode(struct ocfs2_super *osb,
-			  struct ocfs2_journal_handle *handle,
+			  handle_t *handle,
 			  struct ocfs2_alloc_context *ac,
 			  u16 *suballoc_bit,
 			  u64 *fe_blkno);
 int ocfs2_claim_clusters(struct ocfs2_super *osb,
-			 struct ocfs2_journal_handle *handle,
+			 handle_t *handle,
 			 struct ocfs2_alloc_context *ac,
 			 u32 min_clusters,
 			 u32 *cluster_start,
 			 u32 *num_clusters);
 
-int ocfs2_free_dinode(struct ocfs2_journal_handle *handle,
+int ocfs2_free_dinode(handle_t *handle,
 		      struct inode *inode_alloc_inode,
 		      struct buffer_head *inode_alloc_bh,
 		      struct ocfs2_dinode *di);
-int ocfs2_free_extent_block(struct ocfs2_journal_handle *handle,
+int ocfs2_free_extent_block(handle_t *handle,
 			    struct inode *eb_alloc_inode,
 			    struct buffer_head *eb_alloc_bh,
 			    struct ocfs2_extent_block *eb);
-int ocfs2_free_clusters(struct ocfs2_journal_handle *handle,
+int ocfs2_free_clusters(handle_t *handle,
 			struct inode *bitmap_inode,
 			struct buffer_head *bitmap_bh,
 			u64 start_blk,
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 76b46ebbb10c..6e300a88a47e 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -68,9 +68,7 @@
 
 #include "buffer_head_io.h"
 
-static kmem_cache_t *ocfs2_inode_cachep = NULL;
-
-kmem_cache_t *ocfs2_lock_cache = NULL;
+static struct kmem_cache *ocfs2_inode_cachep = NULL;
 
 /* OCFS2 needs to schedule several differnt types of work which
  * require cluster locking, disk I/O, recovery waits, etc. Since these
@@ -141,6 +139,7 @@ enum {
 	Opt_hb_local,
 	Opt_data_ordered,
 	Opt_data_writeback,
+	Opt_atime_quantum,
 	Opt_err,
 };
 
@@ -154,6 +153,7 @@ static match_table_t tokens = {
 	{Opt_hb_local, OCFS2_HB_LOCAL},
 	{Opt_data_ordered, "data=ordered"},
 	{Opt_data_writeback, "data=writeback"},
+	{Opt_atime_quantum, "atime_quantum=%u"},
 	{Opt_err, NULL}
 };
 
@@ -303,7 +303,7 @@ static struct inode *ocfs2_alloc_inode(struct super_block *sb)
 {
 	struct ocfs2_inode_info *oi;
 
-	oi = kmem_cache_alloc(ocfs2_inode_cachep, SLAB_NOFS);
+	oi = kmem_cache_alloc(ocfs2_inode_cachep, GFP_NOFS);
 	if (!oi)
 		return NULL;
 
@@ -508,6 +508,27 @@ bail:
 	return status;
 }
 
+static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
+{
+	if (ocfs2_mount_local(osb)) {
+		if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+			mlog(ML_ERROR, "Cannot heartbeat on a locally "
+			     "mounted device.\n");
+			return -EINVAL;
+		}
+	}
+
+	if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
+		if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb)) {
+			mlog(ML_ERROR, "Heartbeat has to be started to mount "
+			     "a read-write clustered device.\n");
+			return -EINVAL;
+		}
+	}
+
+	return 0;
+}
+
 static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 {
 	struct dentry *root;
@@ -516,16 +537,24 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 	struct inode *inode = NULL;
 	struct ocfs2_super *osb = NULL;
 	struct buffer_head *bh = NULL;
+	char nodestr[8];
 
 	mlog_entry("%p, %p, %i", sb, data, silent);
 
-	/* for now we only have one cluster/node, make sure we see it
-	 * in the heartbeat universe */
-	if (!o2hb_check_local_node_heartbeating()) {
+	if (!ocfs2_parse_options(sb, data, &parsed_opt, 0)) {
 		status = -EINVAL;
 		goto read_super_error;
 	}
 
+	/* for now we only have one cluster/node, make sure we see it
+	 * in the heartbeat universe */
+	if (parsed_opt & OCFS2_MOUNT_HB_LOCAL) {
+		if (!o2hb_check_local_node_heartbeating()) {
+			status = -EINVAL;
+			goto read_super_error;
+		}
+	}
+
 	/* probe for superblock */
 	status = ocfs2_sb_probe(sb, &bh, &sector_size);
 	if (status < 0) {
@@ -541,11 +570,6 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 	}
 	brelse(bh);
 	bh = NULL;
-
-	if (!ocfs2_parse_options(sb, data, &parsed_opt, 0)) {
-		status = -EINVAL;
-		goto read_super_error;
-	}
 	osb->s_mount_opt = parsed_opt;
 
 	sb->s_magic = OCFS2_SUPER_MAGIC;
@@ -588,21 +612,16 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 	}
 
 	if (!ocfs2_is_hard_readonly(osb)) {
-		/* If this isn't a hard readonly mount, then we need
-		 * to make sure that heartbeat is in a valid state,
-		 * and that we mark ourselves soft readonly is -oro
-		 * was specified. */
-		if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
-			mlog(ML_ERROR, "No heartbeat for device (%s)\n",
-			     sb->s_id);
-			status = -EINVAL;
-			goto read_super_error;
-		}
-
 		if (sb->s_flags & MS_RDONLY)
 			ocfs2_set_ro_flag(osb, 0);
 	}
 
+	status = ocfs2_verify_heartbeat(osb);
+	if (status < 0) {
+		mlog_errno(status);
+		goto read_super_error;
+	}
+
 	osb->osb_debug_root = debugfs_create_dir(osb->uuid_str,
 						 ocfs2_debugfs_root);
 	if (!osb->osb_debug_root) {
@@ -635,9 +654,14 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 
 	ocfs2_complete_mount_recovery(osb);
 
-	printk(KERN_INFO "ocfs2: Mounting device (%s) on (node %d, slot %d) "
+	if (ocfs2_mount_local(osb))
+		snprintf(nodestr, sizeof(nodestr), "local");
+	else
+		snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+
+	printk(KERN_INFO "ocfs2: Mounting device (%s) on (node %s, slot %d) "
 	       "with %s data mode.\n",
-	       osb->dev_str, osb->node_num, osb->slot_num,
+	       osb->dev_str, nodestr, osb->slot_num,
 	       osb->s_mount_opt & OCFS2_MOUNT_DATA_WRITEBACK ? "writeback" :
 	       "ordered");
 
@@ -707,6 +731,7 @@ static int ocfs2_parse_options(struct super_block *sb,
 	while ((p = strsep(&options, ",")) != NULL) {
 		int token, option;
 		substring_t args[MAX_OPT_ARGS];
+		struct ocfs2_super * osb = OCFS2_SB(sb);
 
 		if (!*p)
 			continue;
@@ -747,6 +772,16 @@ static int ocfs2_parse_options(struct super_block *sb,
 		case Opt_data_writeback:
 			*mount_opt |= OCFS2_MOUNT_DATA_WRITEBACK;
 			break;
+		case Opt_atime_quantum:
+			if (match_int(&args[0], &option)) {
+				status = 0;
+				goto bail;
+			}
+			if (option >= 0)
+				osb->s_atime_quantum = option;
+			else
+				osb->s_atime_quantum = OCFS2_DEFAULT_ATIME_QUANTUM;
+			break;
 		default:
 			mlog(ML_ERROR,
 			     "Unrecognized mount option \"%s\" "
@@ -867,7 +902,7 @@ static int ocfs2_statfs(struct dentry *dentry, struct kstatfs *buf)
 		goto bail;
 	}
 
-	status = ocfs2_meta_lock(inode, NULL, &bh, 0);
+	status = ocfs2_meta_lock(inode, &bh, 0);
 	if (status < 0) {
 		mlog_errno(status);
 		goto bail;
@@ -903,7 +938,7 @@ bail:
 }
 
 static void ocfs2_inode_init_once(void *data,
-				  kmem_cache_t *cachep,
+				  struct kmem_cache *cachep,
 				  unsigned long flags)
 {
 	struct ocfs2_inode_info *oi = data;
@@ -914,9 +949,7 @@ static void ocfs2_inode_init_once(void *data,
 		oi->ip_open_count = 0;
 		spin_lock_init(&oi->ip_lock);
 		ocfs2_extent_map_init(&oi->vfs_inode);
-		INIT_LIST_HEAD(&oi->ip_handle_list);
 		INIT_LIST_HEAD(&oi->ip_io_markers);
-		oi->ip_handle = NULL;
 		oi->ip_created_trans = 0;
 		oi->ip_last_trans = 0;
 		oi->ip_dir_start_lookup = 0;
@@ -948,14 +981,6 @@ static int ocfs2_initialize_mem_caches(void)
 	if (!ocfs2_inode_cachep)
 		return -ENOMEM;
 
-	ocfs2_lock_cache = kmem_cache_create("ocfs2_lock",
-					     sizeof(struct ocfs2_journal_lock),
-					     0,
-					     SLAB_HWCACHE_ALIGN,
-					     NULL, NULL);
-	if (!ocfs2_lock_cache)
-		return -ENOMEM;
-
 	return 0;
 }
 
@@ -963,11 +988,8 @@ static void ocfs2_free_mem_caches(void)
 {
 	if (ocfs2_inode_cachep)
 		kmem_cache_destroy(ocfs2_inode_cachep);
-	if (ocfs2_lock_cache)
-		kmem_cache_destroy(ocfs2_lock_cache);
 
 	ocfs2_inode_cachep = NULL;
-	ocfs2_lock_cache = NULL;
 }
 
 static int ocfs2_get_sector(struct super_block *sb,
@@ -1001,7 +1023,11 @@ static int ocfs2_fill_local_node_info(struct ocfs2_super *osb)
 
 	/* XXX hold a ref on the node while mounte?  easy enough, if
 	 * desirable. */
-	osb->node_num = o2nm_this_node();
+	if (ocfs2_mount_local(osb))
+		osb->node_num = 0;
+	else
+		osb->node_num = o2nm_this_node();
+
 	if (osb->node_num == O2NM_MAX_NODES) {
 		mlog(ML_ERROR, "could not find this host's node number\n");
 		status = -ENOENT;
@@ -1086,6 +1112,9 @@ static int ocfs2_mount_volume(struct super_block *sb)
 		goto leave;
 	}
 
+	if (ocfs2_mount_local(osb))
+		goto leave;
+
 	/* This should be sent *after* we recovered our journal as it
 	 * will cause other nodes to unmark us as needing
 	 * recovery. However, we need to send it *before* dropping the
@@ -1116,6 +1145,7 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
 {
 	int tmp;
 	struct ocfs2_super *osb = NULL;
+	char nodestr[8];
 
 	mlog_entry("(0x%p)\n", sb);
 
@@ -1179,8 +1209,13 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
 
 	atomic_set(&osb->vol_state, VOLUME_DISMOUNTED);
 
-	printk(KERN_INFO "ocfs2: Unmounting device (%s) on (node %d)\n",
-	       osb->dev_str, osb->node_num);
+	if (ocfs2_mount_local(osb))
+		snprintf(nodestr, sizeof(nodestr), "local");
+	else
+		snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+
+	printk(KERN_INFO "ocfs2: Unmounting device (%s) on (node %s)\n",
+	       osb->dev_str, nodestr);
 
 	ocfs2_delete_osb(osb);
 	kfree(osb);
@@ -1196,7 +1231,7 @@ static int ocfs2_setup_osb_uuid(struct ocfs2_super *osb, const unsigned char *uu
 
 	BUG_ON(uuid_bytes != OCFS2_VOL_UUID_LEN);
 
-	osb->uuid_str = kcalloc(1, OCFS2_VOL_UUID_LEN * 2 + 1, GFP_KERNEL);
+	osb->uuid_str = kzalloc(OCFS2_VOL_UUID_LEN * 2 + 1, GFP_KERNEL);
 	if (osb->uuid_str == NULL)
 		return -ENOMEM;
 
@@ -1227,7 +1262,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
 
 	mlog_entry_void();
 
-	osb = kcalloc(1, sizeof(struct ocfs2_super), GFP_KERNEL);
+	osb = kzalloc(sizeof(struct ocfs2_super), GFP_KERNEL);
 	if (!osb) {
 		status = -ENOMEM;
 		mlog_errno(status);
@@ -1280,6 +1315,8 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	init_waitqueue_head(&osb->checkpoint_event);
 	atomic_set(&osb->needs_checkpoint, 0);
 
+	osb->s_atime_quantum = OCFS2_DEFAULT_ATIME_QUANTUM;
+
 	osb->node_num = O2NM_INVALID_NODE_NUM;
 	osb->slot_num = OCFS2_INVALID_SLOT;
 
@@ -1350,7 +1387,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	 */
 	/* initialize our journal structure */
 
-	journal = kcalloc(1, sizeof(struct ocfs2_journal), GFP_KERNEL);
+	journal = kzalloc(sizeof(struct ocfs2_journal), GFP_KERNEL);
 	if (!journal) {
 		mlog(ML_ERROR, "unable to alloc journal\n");
 		status = -ENOMEM;
@@ -1365,7 +1402,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	spin_lock_init(&journal->j_lock);
 	journal->j_trans_id = (unsigned long) 1;
 	INIT_LIST_HEAD(&journal->j_la_cleanups);
-	INIT_WORK(&journal->j_recovery_work, ocfs2_complete_recovery, osb);
+	INIT_WORK(&journal->j_recovery_work, ocfs2_complete_recovery);
 	journal->j_state = OCFS2_JOURNAL_FREE;
 
 	/* get some pseudo constants for clustersize bits */
@@ -1536,6 +1573,7 @@ static int ocfs2_check_volume(struct ocfs2_super *osb)
 {
 	int status = 0;
 	int dirty;
+	int local;
 	struct ocfs2_dinode *local_alloc = NULL; /* only used if we
 						  * recover
 						  * ourselves. */
@@ -1563,8 +1601,10 @@ static int ocfs2_check_volume(struct ocfs2_super *osb)
 		     "recovering volume.\n");
 	}
 
+	local = ocfs2_mount_local(osb);
+
 	/* will play back anything left in the journal. */
-	ocfs2_journal_load(osb->journal);
+	ocfs2_journal_load(osb->journal, local);
 
 	if (dirty) {
 		/* recover my local alloc if we didn't unmount cleanly. */
@@ -1674,7 +1714,7 @@ void __ocfs2_error(struct super_block *sb,
 	va_list args;
 
 	va_start(args, fmt);
-	vsprintf(error_buf, fmt, args);
+	vsnprintf(error_buf, sizeof(error_buf), fmt, args);
 	va_end(args);
 
 	/* Not using mlog here because we want to show the actual
@@ -1695,7 +1735,7 @@ void __ocfs2_abort(struct super_block* sb,
 	va_list args;
 
 	va_start(args, fmt);
-	vsprintf(error_buf, fmt, args);
+	vsnprintf(error_buf, sizeof(error_buf), fmt, args);
 	va_end(args);
 
 	printk(KERN_CRIT "OCFS2: abort (device %s): %s: %s\n",
diff --git a/fs/ocfs2/symlink.c b/fs/ocfs2/symlink.c
index c0f68aa6c175..957d6878b03e 100644
--- a/fs/ocfs2/symlink.c
+++ b/fs/ocfs2/symlink.c
@@ -126,6 +126,10 @@ static int ocfs2_readlink(struct dentry *dentry,
 		goto out;
 	}
 
+	/*
+	 * Without vfsmount we can't update atime now,
+	 * but we will update atime here ultimately.
+	 */
 	ret = vfs_readlink(dentry, buffer, buflen, link);
 
 	brelse(bh);
diff --git a/fs/ocfs2/uptodate.c b/fs/ocfs2/uptodate.c
index 9707ed7a3206..39814b900fc0 100644
--- a/fs/ocfs2/uptodate.c
+++ b/fs/ocfs2/uptodate.c
@@ -69,7 +69,7 @@ struct ocfs2_meta_cache_item {
 	sector_t	c_block;
 };
 
-static kmem_cache_t *ocfs2_uptodate_cachep = NULL;
+static struct kmem_cache *ocfs2_uptodate_cachep = NULL;
 
 void ocfs2_metadata_cache_init(struct inode *inode)
 {
diff --git a/fs/ocfs2/vote.c b/fs/ocfs2/vote.c
index 5b4dca79990b..0afd8b9af70f 100644
--- a/fs/ocfs2/vote.c
+++ b/fs/ocfs2/vote.c
@@ -479,7 +479,7 @@ static struct ocfs2_net_wait_ctxt *ocfs2_new_net_wait_ctxt(unsigned int response
 {
 	struct ocfs2_net_wait_ctxt *w;
 
-	w = kcalloc(1, sizeof(*w), GFP_NOFS);
+	w = kzalloc(sizeof(*w), GFP_NOFS);
 	if (!w) {
 		mlog_errno(-ENOMEM);
 		goto bail;
@@ -642,7 +642,7 @@ static struct ocfs2_vote_msg * ocfs2_new_vote_request(struct ocfs2_super *osb,
 
 	BUG_ON(!ocfs2_is_valid_vote_request(type));
 
-	request = kcalloc(1, sizeof(*request), GFP_NOFS);
+	request = kzalloc(sizeof(*request), GFP_NOFS);
 	if (!request) {
 		mlog_errno(-ENOMEM);
 	} else {
@@ -1000,6 +1000,9 @@ int ocfs2_register_net_handlers(struct ocfs2_super *osb)
 {
 	int status = 0;
 
+	if (ocfs2_mount_local(osb))
+		return 0;
+
 	status = o2net_register_handler(OCFS2_MESSAGE_TYPE_RESPONSE,
 					osb->net_key,
 					sizeof(struct ocfs2_response_msg),