summary refs log tree commit diff
path: root/fs/ocfs2/super.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/super.c')
-rw-r--r--fs/ocfs2/super.c208
1 files changed, 122 insertions, 86 deletions
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index bec75aff3d9f..df63ba20ae90 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -40,8 +40,7 @@
 #include <linux/crc32.h>
 #include <linux/debugfs.h>
 #include <linux/mount.h>
-
-#include <cluster/nodemanager.h>
+#include <linux/seq_file.h>
 
 #define MLOG_MASK_PREFIX ML_SUPER
 #include <cluster/masklog.h>
@@ -88,6 +87,7 @@ struct mount_options
 	unsigned int	atime_quantum;
 	signed short	slot;
 	unsigned int	localalloc_opt;
+	char		cluster_stack[OCFS2_STACK_LABEL_LEN + 1];
 };
 
 static int ocfs2_parse_options(struct super_block *sb, char *options,
@@ -109,7 +109,6 @@ static int ocfs2_sync_fs(struct super_block *sb, int wait);
 static int ocfs2_init_global_system_inodes(struct ocfs2_super *osb);
 static int ocfs2_init_local_system_inodes(struct ocfs2_super *osb);
 static void ocfs2_release_system_inodes(struct ocfs2_super *osb);
-static int ocfs2_fill_local_node_info(struct ocfs2_super *osb);
 static int ocfs2_check_volume(struct ocfs2_super *osb);
 static int ocfs2_verify_volume(struct ocfs2_dinode *di,
 			       struct buffer_head *bh,
@@ -154,6 +153,7 @@ enum {
 	Opt_commit,
 	Opt_localalloc,
 	Opt_localflocks,
+	Opt_stack,
 	Opt_err,
 };
 
@@ -172,6 +172,7 @@ static match_table_t tokens = {
 	{Opt_commit, "commit=%u"},
 	{Opt_localalloc, "localalloc=%d"},
 	{Opt_localflocks, "localflocks"},
+	{Opt_stack, "cluster_stack=%s"},
 	{Opt_err, NULL}
 };
 
@@ -551,8 +552,17 @@ static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
 		}
 	}
 
+	if (ocfs2_userspace_stack(osb)) {
+		if (osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL) {
+			mlog(ML_ERROR, "Userspace stack expected, but "
+			     "o2cb heartbeat arguments passed to mount\n");
+			return -EINVAL;
+		}
+	}
+
 	if (!(osb->s_mount_opt & OCFS2_MOUNT_HB_LOCAL)) {
-		if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb)) {
+		if (!ocfs2_mount_local(osb) && !ocfs2_is_hard_readonly(osb) &&
+		    !ocfs2_userspace_stack(osb)) {
 			mlog(ML_ERROR, "Heartbeat has to be started to mount "
 			     "a read-write clustered device.\n");
 			return -EINVAL;
@@ -562,6 +572,35 @@ static int ocfs2_verify_heartbeat(struct ocfs2_super *osb)
 	return 0;
 }
 
+/*
+ * If we're using a userspace stack, mount should have passed
+ * a name that matches the disk.  If not, mount should not
+ * have passed a stack.
+ */
+static int ocfs2_verify_userspace_stack(struct ocfs2_super *osb,
+					struct mount_options *mopt)
+{
+	if (!ocfs2_userspace_stack(osb) && mopt->cluster_stack[0]) {
+		mlog(ML_ERROR,
+		     "cluster stack passed to mount, but this filesystem "
+		     "does not support it\n");
+		return -EINVAL;
+	}
+
+	if (ocfs2_userspace_stack(osb) &&
+	    strncmp(osb->osb_cluster_stack, mopt->cluster_stack,
+		    OCFS2_STACK_LABEL_LEN)) {
+		mlog(ML_ERROR,
+		     "cluster stack passed to mount (\"%s\") does not "
+		     "match the filesystem (\"%s\")\n",
+		     mopt->cluster_stack,
+		     osb->osb_cluster_stack);
+		return -EINVAL;
+	}
+
+	return 0;
+}
+
 static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 {
 	struct dentry *root;
@@ -579,15 +618,6 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 		goto read_super_error;
 	}
 
-	/* for now we only have one cluster/node, make sure we see it
-	 * in the heartbeat universe */
-	if (parsed_options.mount_opt & OCFS2_MOUNT_HB_LOCAL) {
-		if (!o2hb_check_local_node_heartbeating()) {
-			status = -EINVAL;
-			goto read_super_error;
-		}
-	}
-
 	/* probe for superblock */
 	status = ocfs2_sb_probe(sb, &bh, &sector_size);
 	if (status < 0) {
@@ -609,6 +639,10 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 	osb->osb_commit_interval = parsed_options.commit_interval;
 	osb->local_alloc_size = parsed_options.localalloc_opt;
 
+	status = ocfs2_verify_userspace_stack(osb, &parsed_options);
+	if (status)
+		goto read_super_error;
+
 	sb->s_magic = OCFS2_SUPER_MAGIC;
 
 	/* Hard readonly mode only if: bdev_read_only, MS_RDONLY,
@@ -694,7 +728,7 @@ static int ocfs2_fill_super(struct super_block *sb, void *data, int silent)
 	if (ocfs2_mount_local(osb))
 		snprintf(nodestr, sizeof(nodestr), "local");
 	else
-		snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+		snprintf(nodestr, sizeof(nodestr), "%u", osb->node_num);
 
 	printk(KERN_INFO "ocfs2: Mounting device (%s) on (node %s, slot %d) "
 	       "with %s data mode.\n",
@@ -763,6 +797,7 @@ static int ocfs2_parse_options(struct super_block *sb,
 	mopt->atime_quantum = OCFS2_DEFAULT_ATIME_QUANTUM;
 	mopt->slot = OCFS2_INVALID_SLOT;
 	mopt->localalloc_opt = OCFS2_DEFAULT_LOCAL_ALLOC_SIZE;
+	mopt->cluster_stack[0] = '\0';
 
 	if (!options) {
 		status = 1;
@@ -864,6 +899,25 @@ static int ocfs2_parse_options(struct super_block *sb,
 			if (!is_remount)
 				mopt->mount_opt |= OCFS2_MOUNT_LOCALFLOCKS;
 			break;
+		case Opt_stack:
+			/* Check both that the option we were passed
+			 * is of the right length and that it is a proper
+			 * string of the right length.
+			 */
+			if (((args[0].to - args[0].from) !=
+			     OCFS2_STACK_LABEL_LEN) ||
+			    (strnlen(args[0].from,
+				     OCFS2_STACK_LABEL_LEN) !=
+			     OCFS2_STACK_LABEL_LEN)) {
+				mlog(ML_ERROR,
+				     "Invalid cluster_stack option\n");
+				status = 0;
+				goto bail;
+			}
+			memcpy(mopt->cluster_stack, args[0].from,
+			       OCFS2_STACK_LABEL_LEN);
+			mopt->cluster_stack[OCFS2_STACK_LABEL_LEN] = '\0';
+			break;
 		default:
 			mlog(ML_ERROR,
 			     "Unrecognized mount option \"%s\" "
@@ -922,6 +976,10 @@ static int ocfs2_show_options(struct seq_file *s, struct vfsmount *mnt)
 	if (opts & OCFS2_MOUNT_LOCALFLOCKS)
 		seq_printf(s, ",localflocks,");
 
+	if (osb->osb_cluster_stack[0])
+		seq_printf(s, ",cluster_stack=%.*s", OCFS2_STACK_LABEL_LEN,
+			   osb->osb_cluster_stack);
+
 	return 0;
 }
 
@@ -957,6 +1015,8 @@ static int __init ocfs2_init(void)
 		mlog(ML_ERROR, "Unable to create ocfs2 debugfs root.\n");
 	}
 
+	ocfs2_set_locking_protocol();
+
 leave:
 	if (status < 0) {
 		ocfs2_free_mem_caches();
@@ -1132,31 +1192,6 @@ static int ocfs2_get_sector(struct super_block *sb,
 	return 0;
 }
 
-/* ocfs2 1.0 only allows one cluster and node identity per kernel image. */
-static int ocfs2_fill_local_node_info(struct ocfs2_super *osb)
-{
-	int status;
-
-	/* XXX hold a ref on the node while mounte?  easy enough, if
-	 * desirable. */
-	if (ocfs2_mount_local(osb))
-		osb->node_num = 0;
-	else
-		osb->node_num = o2nm_this_node();
-
-	if (osb->node_num == O2NM_MAX_NODES) {
-		mlog(ML_ERROR, "could not find this host's node number\n");
-		status = -ENOENT;
-		goto bail;
-	}
-
-	mlog(0, "I am node %d\n", osb->node_num);
-
-	status = 0;
-bail:
-	return status;
-}
-
 static int ocfs2_mount_volume(struct super_block *sb)
 {
 	int status = 0;
@@ -1168,12 +1203,6 @@ static int ocfs2_mount_volume(struct super_block *sb)
 	if (ocfs2_is_hard_readonly(osb))
 		goto leave;
 
-	status = ocfs2_fill_local_node_info(osb);
-	if (status < 0) {
-		mlog_errno(status);
-		goto leave;
-	}
-
 	status = ocfs2_dlm_init(osb);
 	if (status < 0) {
 		mlog_errno(status);
@@ -1224,18 +1253,9 @@ leave:
 	return status;
 }
 
-/* we can't grab the goofy sem lock from inside wait_event, so we use
- * memory barriers to make sure that we'll see the null task before
- * being woken up */
-static int ocfs2_recovery_thread_running(struct ocfs2_super *osb)
-{
-	mb();
-	return osb->recovery_thread_task != NULL;
-}
-
 static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
 {
-	int tmp;
+	int tmp, hangup_needed = 0;
 	struct ocfs2_super *osb = NULL;
 	char nodestr[8];
 
@@ -1249,25 +1269,16 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
 
 	ocfs2_truncate_log_shutdown(osb);
 
-	/* disable any new recovery threads and wait for any currently
-	 * running ones to exit. Do this before setting the vol_state. */
-	mutex_lock(&osb->recovery_lock);
-	osb->disable_recovery = 1;
-	mutex_unlock(&osb->recovery_lock);
-	wait_event(osb->recovery_event, !ocfs2_recovery_thread_running(osb));
-
-	/* At this point, we know that no more recovery threads can be
-	 * launched, so wait for any recovery completion work to
-	 * complete. */
-	flush_workqueue(ocfs2_wq);
+	/* This will disable recovery and flush any recovery work. */
+	ocfs2_recovery_exit(osb);
 
 	ocfs2_journal_shutdown(osb);
 
 	ocfs2_sync_blockdev(sb);
 
-	/* No dlm means we've failed during mount, so skip all the
-	 * steps which depended on that to complete. */
-	if (osb->dlm) {
+	/* No cluster connection means we've failed during mount, so skip
+	 * all the steps which depended on that to complete. */
+	if (osb->cconn) {
 		tmp = ocfs2_super_lock(osb, 1);
 		if (tmp < 0) {
 			mlog_errno(tmp);
@@ -1278,25 +1289,34 @@ static void ocfs2_dismount_volume(struct super_block *sb, int mnt_err)
 	if (osb->slot_num != OCFS2_INVALID_SLOT)
 		ocfs2_put_slot(osb);
 
-	if (osb->dlm)
+	if (osb->cconn)
 		ocfs2_super_unlock(osb, 1);
 
 	ocfs2_release_system_inodes(osb);
 
-	if (osb->dlm)
-		ocfs2_dlm_shutdown(osb);
+	/*
+	 * If we're dismounting due to mount error, mount.ocfs2 will clean
+	 * up heartbeat.  If we're a local mount, there is no heartbeat.
+	 * If we failed before we got a uuid_str yet, we can't stop
+	 * heartbeat.  Otherwise, do it.
+	 */
+	if (!mnt_err && !ocfs2_mount_local(osb) && osb->uuid_str)
+		hangup_needed = 1;
+
+	if (osb->cconn)
+		ocfs2_dlm_shutdown(osb, hangup_needed);
 
 	debugfs_remove(osb->osb_debug_root);
 
-	if (!mnt_err)
-		ocfs2_stop_heartbeat(osb);
+	if (hangup_needed)
+		ocfs2_cluster_hangup(osb->uuid_str, strlen(osb->uuid_str));
 
 	atomic_set(&osb->vol_state, VOLUME_DISMOUNTED);
 
 	if (ocfs2_mount_local(osb))
 		snprintf(nodestr, sizeof(nodestr), "local");
 	else
-		snprintf(nodestr, sizeof(nodestr), "%d", osb->node_num);
+		snprintf(nodestr, sizeof(nodestr), "%u", osb->node_num);
 
 	printk(KERN_INFO "ocfs2: Unmounting device (%s) on (node %s)\n",
 	       osb->dev_str, nodestr);
@@ -1355,7 +1375,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	sb->s_fs_info = osb;
 	sb->s_op = &ocfs2_sops;
 	sb->s_export_op = &ocfs2_export_ops;
-	osb->osb_locking_proto = ocfs2_locking_protocol;
 	sb->s_time_gran = 1;
 	sb->s_flags |= MS_NOATIME;
 	/* this is needed to support O_LARGEFILE */
@@ -1368,7 +1387,6 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	osb->s_sectsize_bits = blksize_bits(sector_size);
 	BUG_ON(!osb->s_sectsize_bits);
 
-	init_waitqueue_head(&osb->recovery_event);
 	spin_lock_init(&osb->dc_task_lock);
 	init_waitqueue_head(&osb->dc_event);
 	osb->dc_work_sequence = 0;
@@ -1376,6 +1394,7 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	INIT_LIST_HEAD(&osb->blocked_lock_list);
 	osb->blocked_lock_count = 0;
 	spin_lock_init(&osb->osb_lock);
+	ocfs2_init_inode_steal_slot(osb);
 
 	atomic_set(&osb->alloc_stats.moves, 0);
 	atomic_set(&osb->alloc_stats.local_data, 0);
@@ -1388,24 +1407,23 @@ static int ocfs2_initialize_super(struct super_block *sb,
 	snprintf(osb->dev_str, sizeof(osb->dev_str), "%u,%u",
 		 MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));
 
-	mutex_init(&osb->recovery_lock);
-
-	osb->disable_recovery = 0;
-	osb->recovery_thread_task = NULL;
+	status = ocfs2_recovery_init(osb);
+	if (status) {
+		mlog(ML_ERROR, "Unable to initialize recovery state\n");
+		mlog_errno(status);
+		goto bail;
+	}
 
 	init_waitqueue_head(&osb->checkpoint_event);
 	atomic_set(&osb->needs_checkpoint, 0);
 
 	osb->s_atime_quantum = OCFS2_DEFAULT_ATIME_QUANTUM;
 
-	osb->node_num = O2NM_INVALID_NODE_NUM;
 	osb->slot_num = OCFS2_INVALID_SLOT;
 
 	osb->local_alloc_state = OCFS2_LA_UNUSED;
 	osb->local_alloc_bh = NULL;
 
-	ocfs2_setup_hb_callbacks(osb);
-
 	init_waitqueue_head(&osb->osb_mount_event);
 
 	osb->vol_label = kmalloc(OCFS2_MAX_VOL_LABEL_LEN, GFP_KERNEL);
@@ -1455,6 +1473,25 @@ static int ocfs2_initialize_super(struct super_block *sb,
 		goto bail;
 	}
 
+	if (ocfs2_userspace_stack(osb)) {
+		memcpy(osb->osb_cluster_stack,
+		       OCFS2_RAW_SB(di)->s_cluster_info.ci_stack,
+		       OCFS2_STACK_LABEL_LEN);
+		osb->osb_cluster_stack[OCFS2_STACK_LABEL_LEN] = '\0';
+		if (strlen(osb->osb_cluster_stack) != OCFS2_STACK_LABEL_LEN) {
+			mlog(ML_ERROR,
+			     "couldn't mount because of an invalid "
+			     "cluster stack label (%s) \n",
+			     osb->osb_cluster_stack);
+			status = -EINVAL;
+			goto bail;
+		}
+	} else {
+		/* The empty string is identical with classic tools that
+		 * don't know about s_cluster_info. */
+		osb->osb_cluster_stack[0] = '\0';
+	}
+
 	get_random_bytes(&osb->s_next_generation, sizeof(u32));
 
 	/* FIXME
@@ -1724,8 +1761,7 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb)
 
 	/* This function assumes that the caller has the main osb resource */
 
-	if (osb->slot_info)
-		ocfs2_free_slot_info(osb->slot_info);
+	ocfs2_free_slot_info(osb);
 
 	kfree(osb->osb_orphan_wipes);
 	/* FIXME