summary refs log tree commit diff
path: root/fs/ocfs2/journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/journal.c')
-rw-r--r--fs/ocfs2/journal.c364
1 files changed, 300 insertions, 64 deletions
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index 99fe9d584f3c..57d7d25a2b9a 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -35,6 +35,7 @@
 #include "ocfs2.h"
 
 #include "alloc.h"
+#include "blockcheck.h"
 #include "dir.h"
 #include "dlmglue.h"
 #include "extent_map.h"
@@ -45,6 +46,7 @@
 #include "slot_map.h"
 #include "super.h"
 #include "sysfile.h"
+#include "quota.h"
 
 #include "buffer_head_io.h"
 
@@ -52,10 +54,10 @@ DEFINE_SPINLOCK(trans_inc_lock);
 
 static int ocfs2_force_read_journal(struct inode *inode);
 static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num);
+			      int node_num, int slot_num);
 static int __ocfs2_recovery_thread(void *arg);
 static int ocfs2_commit_cache(struct ocfs2_super *osb);
-static int ocfs2_wait_on_mount(struct ocfs2_super *osb);
+static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota);
 static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
 				      int dirty, int replayed);
 static int ocfs2_trylock_journal(struct ocfs2_super *osb,
@@ -64,6 +66,17 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
 				 int slot);
 static int ocfs2_commit_thread(void *arg);
 
+static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
+{
+	return __ocfs2_wait_on_mount(osb, 0);
+}
+
+static inline int ocfs2_wait_on_quotas(struct ocfs2_super *osb)
+{
+	return __ocfs2_wait_on_mount(osb, 1);
+}
+
+
 
 /*
  * The recovery_list is a simple linked list of node numbers to recover.
@@ -256,11 +269,9 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
 	BUG_ON(osb->journal->j_state == OCFS2_JOURNAL_FREE);
 	BUG_ON(max_buffs <= 0);
 
-	/* JBD might support this, but our journalling code doesn't yet. */
-	if (journal_current_handle()) {
-		mlog(ML_ERROR, "Recursive transaction attempted!\n");
-		BUG();
-	}
+	/* Nested transaction? Just return the handle... */
+	if (journal_current_handle())
+		return jbd2_journal_start(journal, max_buffs);
 
 	down_read(&osb->journal->j_trans_barrier);
 
@@ -285,16 +296,18 @@ handle_t *ocfs2_start_trans(struct ocfs2_super *osb, int max_buffs)
 int ocfs2_commit_trans(struct ocfs2_super *osb,
 		       handle_t *handle)
 {
-	int ret;
+	int ret, nested;
 	struct ocfs2_journal *journal = osb->journal;
 
 	BUG_ON(!handle);
 
+	nested = handle->h_ref > 1;
 	ret = jbd2_journal_stop(handle);
 	if (ret < 0)
 		mlog_errno(ret);
 
-	up_read(&journal->j_trans_barrier);
+	if (!nested)
+		up_read(&journal->j_trans_barrier);
 
 	return ret;
 }
@@ -357,10 +370,137 @@ bail:
 	return status;
 }
 
-int ocfs2_journal_access(handle_t *handle,
-			 struct inode *inode,
-			 struct buffer_head *bh,
-			 int type)
+struct ocfs2_triggers {
+	struct jbd2_buffer_trigger_type	ot_triggers;
+	int				ot_offset;
+};
+
+static inline struct ocfs2_triggers *to_ocfs2_trigger(struct jbd2_buffer_trigger_type *triggers)
+{
+	return container_of(triggers, struct ocfs2_triggers, ot_triggers);
+}
+
+static void ocfs2_commit_trigger(struct jbd2_buffer_trigger_type *triggers,
+				 struct buffer_head *bh,
+				 void *data, size_t size)
+{
+	struct ocfs2_triggers *ot = to_ocfs2_trigger(triggers);
+
+	/*
+	 * We aren't guaranteed to have the superblock here, so we
+	 * must unconditionally compute the ecc data.
+	 * __ocfs2_journal_access() will only set the triggers if
+	 * metaecc is enabled.
+	 */
+	ocfs2_block_check_compute(data, size, data + ot->ot_offset);
+}
+
+/*
+ * Quota blocks have their own trigger because the struct ocfs2_block_check
+ * offset depends on the blocksize.
+ */
+static void ocfs2_dq_commit_trigger(struct jbd2_buffer_trigger_type *triggers,
+				 struct buffer_head *bh,
+				 void *data, size_t size)
+{
+	struct ocfs2_disk_dqtrailer *dqt =
+		ocfs2_block_dqtrailer(size, data);
+
+	/*
+	 * We aren't guaranteed to have the superblock here, so we
+	 * must unconditionally compute the ecc data.
+	 * __ocfs2_journal_access() will only set the triggers if
+	 * metaecc is enabled.
+	 */
+	ocfs2_block_check_compute(data, size, &dqt->dq_check);
+}
+
+/*
+ * Directory blocks also have their own trigger because the
+ * struct ocfs2_block_check offset depends on the blocksize.
+ */
+static void ocfs2_db_commit_trigger(struct jbd2_buffer_trigger_type *triggers,
+				 struct buffer_head *bh,
+				 void *data, size_t size)
+{
+	struct ocfs2_dir_block_trailer *trailer =
+		ocfs2_dir_trailer_from_size(size, data);
+
+	/*
+	 * We aren't guaranteed to have the superblock here, so we
+	 * must unconditionally compute the ecc data.
+	 * __ocfs2_journal_access() will only set the triggers if
+	 * metaecc is enabled.
+	 */
+	ocfs2_block_check_compute(data, size, &trailer->db_check);
+}
+
+static void ocfs2_abort_trigger(struct jbd2_buffer_trigger_type *triggers,
+				struct buffer_head *bh)
+{
+	mlog(ML_ERROR,
+	     "ocfs2_abort_trigger called by JBD2.  bh = 0x%lx, "
+	     "bh->b_blocknr = %llu\n",
+	     (unsigned long)bh,
+	     (unsigned long long)bh->b_blocknr);
+
+	/* We aren't guaranteed to have the superblock here - but if we
+	 * don't, it'll just crash. */
+	ocfs2_error(bh->b_assoc_map->host->i_sb,
+		    "JBD2 has aborted our journal, ocfs2 cannot continue\n");
+}
+
+static struct ocfs2_triggers di_triggers = {
+	.ot_triggers = {
+		.t_commit = ocfs2_commit_trigger,
+		.t_abort = ocfs2_abort_trigger,
+	},
+	.ot_offset	= offsetof(struct ocfs2_dinode, i_check),
+};
+
+static struct ocfs2_triggers eb_triggers = {
+	.ot_triggers = {
+		.t_commit = ocfs2_commit_trigger,
+		.t_abort = ocfs2_abort_trigger,
+	},
+	.ot_offset	= offsetof(struct ocfs2_extent_block, h_check),
+};
+
+static struct ocfs2_triggers gd_triggers = {
+	.ot_triggers = {
+		.t_commit = ocfs2_commit_trigger,
+		.t_abort = ocfs2_abort_trigger,
+	},
+	.ot_offset	= offsetof(struct ocfs2_group_desc, bg_check),
+};
+
+static struct ocfs2_triggers db_triggers = {
+	.ot_triggers = {
+		.t_commit = ocfs2_db_commit_trigger,
+		.t_abort = ocfs2_abort_trigger,
+	},
+};
+
+static struct ocfs2_triggers xb_triggers = {
+	.ot_triggers = {
+		.t_commit = ocfs2_commit_trigger,
+		.t_abort = ocfs2_abort_trigger,
+	},
+	.ot_offset	= offsetof(struct ocfs2_xattr_block, xb_check),
+};
+
+static struct ocfs2_triggers dq_triggers = {
+	.ot_triggers = {
+		.t_commit = ocfs2_dq_commit_trigger,
+		.t_abort = ocfs2_abort_trigger,
+	},
+};
+
+static int __ocfs2_journal_access(handle_t *handle,
+				  struct inode *inode,
+				  struct buffer_head *bh,
+				  struct ocfs2_triggers *triggers,
+				  int type)
 {
 	int status;
 
@@ -406,6 +546,8 @@ int ocfs2_journal_access(handle_t *handle,
 		status = -EINVAL;
 		mlog(ML_ERROR, "Uknown access type!\n");
 	}
+	if (!status && ocfs2_meta_ecc(OCFS2_SB(inode->i_sb)) && triggers)
+		jbd2_journal_set_triggers(bh, &triggers->ot_triggers);
 	mutex_unlock(&OCFS2_I(inode)->ip_io_mutex);
 
 	if (status < 0)
@@ -416,6 +558,54 @@ int ocfs2_journal_access(handle_t *handle,
 	return status;
 }
 
+int ocfs2_journal_access_di(handle_t *handle, struct inode *inode,
+			       struct buffer_head *bh, int type)
+{
+	return __ocfs2_journal_access(handle, inode, bh, &di_triggers,
+				      type);
+}
+
+int ocfs2_journal_access_eb(handle_t *handle, struct inode *inode,
+			    struct buffer_head *bh, int type)
+{
+	return __ocfs2_journal_access(handle, inode, bh, &eb_triggers,
+				      type);
+}
+
+int ocfs2_journal_access_gd(handle_t *handle, struct inode *inode,
+			    struct buffer_head *bh, int type)
+{
+	return __ocfs2_journal_access(handle, inode, bh, &gd_triggers,
+				      type);
+}
+
+int ocfs2_journal_access_db(handle_t *handle, struct inode *inode,
+			    struct buffer_head *bh, int type)
+{
+	return __ocfs2_journal_access(handle, inode, bh, &db_triggers,
+				      type);
+}
+
+int ocfs2_journal_access_xb(handle_t *handle, struct inode *inode,
+			    struct buffer_head *bh, int type)
+{
+	return __ocfs2_journal_access(handle, inode, bh, &xb_triggers,
+				      type);
+}
+
+int ocfs2_journal_access_dq(handle_t *handle, struct inode *inode,
+			    struct buffer_head *bh, int type)
+{
+	return __ocfs2_journal_access(handle, inode, bh, &dq_triggers,
+				      type);
+}
+
+int ocfs2_journal_access(handle_t *handle, struct inode *inode,
+			 struct buffer_head *bh, int type)
+{
+	return __ocfs2_journal_access(handle, inode, bh, NULL, type);
+}
+
 int ocfs2_journal_dirty(handle_t *handle,
 			struct buffer_head *bh)
 {
@@ -434,20 +624,6 @@ int ocfs2_journal_dirty(handle_t *handle,
 	return status;
 }
 
-#ifdef CONFIG_OCFS2_COMPAT_JBD
-int ocfs2_journal_dirty_data(handle_t *handle,
-			     struct buffer_head *bh)
-{
-	int err = journal_dirty_data(handle, bh);
-	if (err)
-		mlog_errno(err);
-	/* TODO: When we can handle it, abort the handle and go RO on
-	 * error here. */
-
-	return err;
-}
-#endif
-
 #define OCFS2_DEFAULT_COMMIT_INTERVAL	(HZ * JBD2_DEFAULT_MAX_COMMIT_AGE)
 
 void ocfs2_set_journal_params(struct ocfs2_super *osb)
@@ -587,17 +763,11 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
 	mlog_entry_void();
 
 	fe = (struct ocfs2_dinode *)bh->b_data;
-	if (!OCFS2_IS_VALID_DINODE(fe)) {
-		/* This is called from startup/shutdown which will
-		 * handle the errors in a specific manner, so no need
-		 * to call ocfs2_error() here. */
-		mlog(ML_ERROR, "Journal dinode %llu  has invalid "
-		     "signature: %.*s",
-		     (unsigned long long)le64_to_cpu(fe->i_blkno), 7,
-		     fe->i_signature);
-		status = -EIO;
-		goto out;
-	}
+
+	/* The journal bh on the osb always comes from ocfs2_journal_init()
+	 * and was validated there inside ocfs2_inode_lock_full().  It's a
+	 * code bug if we mess it up. */
+	BUG_ON(!OCFS2_IS_VALID_DINODE(fe));
 
 	flags = le32_to_cpu(fe->id1.journal1.ij_flags);
 	if (dirty)
@@ -609,11 +779,11 @@ static int ocfs2_journal_toggle_dirty(struct ocfs2_super *osb,
 	if (replayed)
 		ocfs2_bump_recovery_generation(fe);
 
+	ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
 	status = ocfs2_write_block(osb, bh, journal->j_inode);
 	if (status < 0)
 		mlog_errno(status);
 
-out:
 	mlog_exit(status);
 	return status;
 }
@@ -878,6 +1048,7 @@ struct ocfs2_la_recovery_item {
 	int			lri_slot;
 	struct ocfs2_dinode	*lri_la_dinode;
 	struct ocfs2_dinode	*lri_tl_dinode;
+	struct ocfs2_quota_recovery *lri_qrec;
 };
 
 /* Does the second half of the recovery process. By this point, the
@@ -898,6 +1069,7 @@ void ocfs2_complete_recovery(struct work_struct *work)
 	struct ocfs2_super *osb = journal->j_osb;
 	struct ocfs2_dinode *la_dinode, *tl_dinode;
 	struct ocfs2_la_recovery_item *item, *n;
+	struct ocfs2_quota_recovery *qrec;
 	LIST_HEAD(tmp_la_list);
 
 	mlog_entry_void();
@@ -913,6 +1085,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
 
 		mlog(0, "Complete recovery for slot %d\n", item->lri_slot);
 
+		ocfs2_wait_on_quotas(osb);
+
 		la_dinode = item->lri_la_dinode;
 		if (la_dinode) {
 			mlog(0, "Clean up local alloc %llu\n",
@@ -943,6 +1117,16 @@ void ocfs2_complete_recovery(struct work_struct *work)
 		if (ret < 0)
 			mlog_errno(ret);
 
+		qrec = item->lri_qrec;
+		if (qrec) {
+			mlog(0, "Recovering quota files");
+			ret = ocfs2_finish_quota_recovery(osb, qrec,
+							  item->lri_slot);
+			if (ret < 0)
+				mlog_errno(ret);
+			/* Recovery info is already freed now */
+		}
+
 		kfree(item);
 	}
 
@@ -956,7 +1140,8 @@ void ocfs2_complete_recovery(struct work_struct *work)
 static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 					    int slot_num,
 					    struct ocfs2_dinode *la_dinode,
-					    struct ocfs2_dinode *tl_dinode)
+					    struct ocfs2_dinode *tl_dinode,
+					    struct ocfs2_quota_recovery *qrec)
 {
 	struct ocfs2_la_recovery_item *item;
 
@@ -971,6 +1156,9 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 		if (tl_dinode)
 			kfree(tl_dinode);
 
+		if (qrec)
+			ocfs2_free_quota_recovery(qrec);
+
 		mlog_errno(-ENOMEM);
 		return;
 	}
@@ -979,6 +1167,7 @@ static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
 	item->lri_la_dinode = la_dinode;
 	item->lri_slot = slot_num;
 	item->lri_tl_dinode = tl_dinode;
+	item->lri_qrec = qrec;
 
 	spin_lock(&journal->j_lock);
 	list_add_tail(&item->lri_list, &journal->j_la_cleanups);
@@ -998,6 +1187,7 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
 		ocfs2_queue_recovery_completion(journal,
 						osb->slot_num,
 						osb->local_alloc_copy,
+						NULL,
 						NULL);
 		ocfs2_schedule_truncate_log_flush(osb, 0);
 
@@ -1006,11 +1196,26 @@ void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
 	}
 }
 
+void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
+{
+	if (osb->quota_rec) {
+		ocfs2_queue_recovery_completion(osb->journal,
+						osb->slot_num,
+						NULL,
+						NULL,
+						osb->quota_rec);
+		osb->quota_rec = NULL;
+	}
+}
+
 static int __ocfs2_recovery_thread(void *arg)
 {
-	int status, node_num;
+	int status, node_num, slot_num;
 	struct ocfs2_super *osb = arg;
 	struct ocfs2_recovery_map *rm = osb->recovery_map;
+	int *rm_quota = NULL;
+	int rm_quota_used = 0, i;
+	struct ocfs2_quota_recovery *qrec;
 
 	mlog_entry_void();
 
@@ -1019,6 +1224,11 @@ static int __ocfs2_recovery_thread(void *arg)
 		goto bail;
 	}
 
+	rm_quota = kzalloc(osb->max_slots * sizeof(int), GFP_NOFS);
+	if (!rm_quota) {
+		status = -ENOMEM;
+		goto bail;
+	}
 restart:
 	status = ocfs2_super_lock(osb, 1);
 	if (status < 0) {
@@ -1032,8 +1242,28 @@ restart:
 		 * clear it until ocfs2_recover_node() has succeeded. */
 		node_num = rm->rm_entries[0];
 		spin_unlock(&osb->osb_lock);
-
-		status = ocfs2_recover_node(osb, node_num);
+		mlog(0, "checking node %d\n", node_num);
+		slot_num = ocfs2_node_num_to_slot(osb, node_num);
+		if (slot_num == -ENOENT) {
+			status = 0;
+			mlog(0, "no slot for this node, so no recovery"
+			     "required.\n");
+			goto skip_recovery;
+		}
+		mlog(0, "node %d was using slot %d\n", node_num, slot_num);
+
+		/* It is a bit subtle with quota recovery. We cannot do it
+		 * immediately because we have to obtain cluster locks from
+		 * quota files and we also don't want to just skip it because
+		 * then quota usage would be out of sync until some node takes
+		 * the slot. So we remember which nodes need quota recovery
+		 * and when everything else is done, we recover quotas. */
+		for (i = 0; i < rm_quota_used && rm_quota[i] != slot_num; i++);
+		if (i == rm_quota_used)
+			rm_quota[rm_quota_used++] = slot_num;
+
+		status = ocfs2_recover_node(osb, node_num, slot_num);
+skip_recovery:
 		if (!status) {
 			ocfs2_recovery_map_clear(osb, node_num);
 		} else {
@@ -1055,13 +1285,27 @@ restart:
 	if (status < 0)
 		mlog_errno(status);
 
+	/* Now it is right time to recover quotas... We have to do this under
+	 * superblock lock so that noone can start using the slot (and crash)
+	 * before we recover it */
+	for (i = 0; i < rm_quota_used; i++) {
+		qrec = ocfs2_begin_quota_recovery(osb, rm_quota[i]);
+		if (IS_ERR(qrec)) {
+			status = PTR_ERR(qrec);
+			mlog_errno(status);
+			continue;
+		}
+		ocfs2_queue_recovery_completion(osb->journal, rm_quota[i],
+						NULL, NULL, qrec);
+	}
+
 	ocfs2_super_unlock(osb, 1);
 
 	/* We always run recovery on our own orphan dir - the dead
 	 * node(s) may have disallowd a previos inode delete. Re-processing
 	 * is therefore required. */
 	ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
-					NULL);
+					NULL, NULL);
 
 bail:
 	mutex_lock(&osb->recovery_lock);
@@ -1076,6 +1320,9 @@ bail:
 
 	mutex_unlock(&osb->recovery_lock);
 
+	if (rm_quota)
+		kfree(rm_quota);
+
 	mlog_exit(status);
 	/* no one is callint kthread_stop() for us so the kthread() api
 	 * requires that we call do_exit().  And it isn't exported, but
@@ -1135,8 +1382,7 @@ static int ocfs2_read_journal_inode(struct ocfs2_super *osb,
 	}
 	SET_INODE_JOURNAL(inode);
 
-	status = ocfs2_read_blocks(inode, OCFS2_I(inode)->ip_blkno, 1, bh,
-				   OCFS2_BH_IGNORE_CACHE);
+	status = ocfs2_read_inode_block_full(inode, bh, OCFS2_BH_IGNORE_CACHE);
 	if (status < 0) {
 		mlog_errno(status);
 		goto bail;
@@ -1268,6 +1514,7 @@ static int ocfs2_replay_journal(struct ocfs2_super *osb,
 	osb->slot_recovery_generations[slot_num] =
 					ocfs2_get_recovery_generation(fe);
 
+	ocfs2_compute_meta_ecc(osb->sb, bh->b_data, &fe->i_check);
 	status = ocfs2_write_block(osb, bh, inode);
 	if (status < 0)
 		mlog_errno(status);
@@ -1304,31 +1551,19 @@ done:
  * far less concerning.
  */
 static int ocfs2_recover_node(struct ocfs2_super *osb,
-			      int node_num)
+			      int node_num, int slot_num)
 {
 	int status = 0;
-	int slot_num;
 	struct ocfs2_dinode *la_copy = NULL;
 	struct ocfs2_dinode *tl_copy = NULL;
 
-	mlog_entry("(node_num=%d, osb->node_num = %d)\n",
-		   node_num, osb->node_num);
-
-	mlog(0, "checking node %d\n", node_num);
+	mlog_entry("(node_num=%d, slot_num=%d, osb->node_num = %d)\n",
+		   node_num, slot_num, osb->node_num);
 
 	/* Should not ever be called to recover ourselves -- in that
 	 * case we should've called ocfs2_journal_load instead. */
 	BUG_ON(osb->node_num == node_num);
 
-	slot_num = ocfs2_node_num_to_slot(osb, node_num);
-	if (slot_num == -ENOENT) {
-		status = 0;
-		mlog(0, "no slot for this node, so no recovery required.\n");
-		goto done;
-	}
-
-	mlog(0, "node %d was using slot %d\n", node_num, slot_num);
-
 	status = ocfs2_replay_journal(osb, node_num, slot_num);
 	if (status < 0) {
 		if (status == -EBUSY) {
@@ -1364,7 +1599,7 @@ static int ocfs2_recover_node(struct ocfs2_super *osb,
 
 	/* This will kfree the memory pointed to by la_copy and tl_copy */
 	ocfs2_queue_recovery_completion(osb->journal, slot_num, la_copy,
-					tl_copy);
+					tl_copy, NULL);
 
 	status = 0;
 done:
@@ -1659,13 +1894,14 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
 	return ret;
 }
 
-static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
+static int __ocfs2_wait_on_mount(struct ocfs2_super *osb, int quota)
 {
 	/* This check is good because ocfs2 will wait on our recovery
 	 * thread before changing it to something other than MOUNTED
 	 * or DISABLED. */
 	wait_event(osb->osb_mount_event,
-		   atomic_read(&osb->vol_state) == VOLUME_MOUNTED ||
+		  (!quota && atomic_read(&osb->vol_state) == VOLUME_MOUNTED) ||
+		   atomic_read(&osb->vol_state) == VOLUME_MOUNTED_QUOTAS ||
 		   atomic_read(&osb->vol_state) == VOLUME_DISABLED);
 
 	/* If there's an error on mount, then we may never get to the