summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--fs/ocfs2/dlm/dlmcommon.h6
-rw-r--r--fs/ocfs2/dlm/dlmlock.c14
-rw-r--r--fs/ocfs2/dlm/dlmmaster.c103
-rw-r--r--fs/ocfs2/dlm/dlmrecovery.c38
4 files changed, 142 insertions, 19 deletions
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 9c772583744a..a8aec9341347 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -658,6 +658,7 @@ void dlm_complete_thread(struct dlm_ctxt *dlm);
 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
 void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
 int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
 
@@ -762,6 +763,11 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+			  u8 nodenum, u8 *real_master);
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+			       struct dlm_lock_resource *res, u8 *real_master);
+
 
 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
 			       struct dlm_lock_resource *res,
diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c
index 671d4ff222cc..6fea28318d6d 100644
--- a/fs/ocfs2/dlm/dlmlock.c
+++ b/fs/ocfs2/dlm/dlmlock.c
@@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm,
 					  res->lockname.len)) {
 			kick_thread = 1;
 			call_ast = 1;
+		} else {
+			mlog(0, "%s: returning DLM_NORMAL to "
+			     "node %u for reco lock\n", dlm->name,
+			     lock->ml.node);
 		}
 	} else {
 		/* for NOQUEUE request, unless we get the
 		 * lock right away, return DLM_NOTQUEUED */
-		if (flags & LKM_NOQUEUE)
+		if (flags & LKM_NOQUEUE) {
 			status = DLM_NOTQUEUED;
-		else {
+			if (dlm_is_recovery_lock(res->lockname.name,
+						 res->lockname.len)) {
+				mlog(0, "%s: returning NOTQUEUED to "
+				     "node %u for reco lock\n", dlm->name,
+				     lock->ml.node);
+			}
+		} else {
 			dlm_lock_get(lock);
 			list_add_tail(&lock->list, &res->blocked);
 			kick_thread = 1;
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 78ac3a00eb54..940be4c13b1f 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -239,6 +239,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
 				       struct dlm_lock_resource *res,
 				       u8 target);
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+				       struct dlm_lock_resource *res);
 
 
 int dlm_is_host_down(int errno)
@@ -677,6 +679,7 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
 	struct dlm_node_iter iter;
 	unsigned int namelen;
 	int tries = 0;
+	int bit, wait_on_recovery = 0;
 
 	BUG_ON(!lockid);
 
@@ -762,6 +765,18 @@ lookup:
 		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
 		set_bit(dlm->node_num, mle->maybe_map);
 		list_add(&mle->list, &dlm->master_list);
+
+		/* still holding the dlm spinlock, check the recovery map
+		 * to see if there are any nodes that still need to be 
+		 * considered.  these will not appear in the mle nodemap
+		 * but they might own this lockres.  wait on them. */
+		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+		if (bit < O2NM_MAX_NODES) {
+			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+			     "recover before lock mastery can begin\n",
+			     dlm->name, namelen, (char *)lockid, bit);
+			wait_on_recovery = 1;
+		}
 	}
 
 	/* at this point there is either a DLM_MLE_BLOCK or a
@@ -779,6 +794,39 @@ lookup:
 	spin_unlock(&dlm->master_lock);
 	spin_unlock(&dlm->spinlock);
 
+	while (wait_on_recovery) {
+		/* any cluster changes that occurred after dropping the
+		 * dlm spinlock would be detectable be a change on the mle,
+		 * so we only need to clear out the recovery map once. */
+		if (dlm_is_recovery_lock(lockid, namelen)) {
+			mlog(ML_NOTICE, "%s: recovery map is not empty, but "
+			     "must master $RECOVERY lock now\n", dlm->name);
+			if (!dlm_pre_master_reco_lockres(dlm, res))
+				wait_on_recovery = 0;
+			else {
+				mlog(0, "%s: waiting 500ms for heartbeat state "
+				    "change\n", dlm->name);
+				msleep(500);
+			}
+			continue;
+		} 
+
+		dlm_kick_recovery_thread(dlm);
+		msleep(100);
+		dlm_wait_for_recovery(dlm);
+
+		spin_lock(&dlm->spinlock);
+		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
+		if (bit < O2NM_MAX_NODES) {
+			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
+			     "recover before lock mastery can begin\n",
+			     dlm->name, namelen, (char *)lockid, bit);
+			wait_on_recovery = 1;
+		} else
+			wait_on_recovery = 0;
+		spin_unlock(&dlm->spinlock);
+	}
+
 	/* must wait for lock to be mastered elsewhere */
 	if (blocked)
 		goto wait;
@@ -1835,6 +1883,61 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
 	mlog(0, "finished with dlm_assert_master_worker\n");
 }
 
+/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
+ * We cannot wait for node recovery to complete to begin mastering this
+ * lockres because this lockres is used to kick off recovery! ;-)
+ * So, do a pre-check on all living nodes to see if any of those nodes
+ * think that $RECOVERY is currently mastered by a dead node.  If so,
+ * we wait a short time to allow that node to get notified by its own
+ * heartbeat stack, then check again.  All $RECOVERY lock resources
+ * mastered by dead nodes are purged when the hearbeat callback is 
+ * fired, so we can know for sure that it is safe to continue once
+ * the node returns a live node or no node.  */
+static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
+				       struct dlm_lock_resource *res)
+{
+	struct dlm_node_iter iter;
+	int nodenum;
+	int ret = 0;
+	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
+
+	spin_lock(&dlm->spinlock);
+	dlm_node_iter_init(dlm->domain_map, &iter);
+	spin_unlock(&dlm->spinlock);
+
+	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
+		/* do not send to self */
+		if (nodenum == dlm->node_num)
+			continue;
+		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
+		if (ret < 0) {
+			mlog_errno(ret);
+			if (!dlm_is_host_down(ret))
+				BUG();
+			/* host is down, so answer for that node would be
+			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
+		}
+
+		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
+			/* check to see if this master is in the recovery map */
+			spin_lock(&dlm->spinlock);
+			if (test_bit(master, dlm->recovery_map)) {
+				mlog(ML_NOTICE, "%s: node %u has not seen "
+				     "node %u go down yet, and thinks the "
+				     "dead node is mastering the recovery "
+				     "lock.  must wait.\n", dlm->name,
+				     nodenum, master);
+				ret = -EAGAIN;
+			}
+			spin_unlock(&dlm->spinlock);
+			mlog(0, "%s: reco lock master is %u\n", dlm->name, 
+			     master);
+			break;
+		}
+	}
+	return ret;
+}
+
 
 /*
  * DLM_MIGRATE_LOCKRES
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 1e232000f3f7..36610bdf1231 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -58,7 +58,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
 static int dlm_recovery_thread(void *data);
 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 static int dlm_do_recovery(struct dlm_ctxt *dlm);
 
 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
@@ -78,15 +78,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
 				    u8 send_to,
 				    struct dlm_lock_resource *res,
 				    int total_locks);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-				      struct dlm_lock_resource *res,
-				      u8 *real_master);
 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
 				     struct dlm_lock_resource *res,
 				     struct dlm_migratable_lockres *mres);
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-				 struct dlm_lock_resource *res,
-				 u8 nodenum, u8 *real_master);
 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
 				 u8 dead_node, u8 send_to);
@@ -165,7 +159,7 @@ void dlm_dispatch_work(void *data)
  * RECOVERY THREAD
  */
 
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
 {
 	/* wake the recovery thread
 	 * this will wake the reco thread in one of three places
@@ -1316,9 +1310,8 @@ leave:
 
 
 
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-				      struct dlm_lock_resource *res,
-				      u8 *real_master)
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+			       struct dlm_lock_resource *res, u8 *real_master)
 {
 	struct dlm_node_iter iter;
 	int nodenum;
@@ -1360,8 +1353,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
 		ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
 		if (ret < 0) {
 			mlog_errno(ret);
-			BUG();
-			/* TODO: need to figure a way to restart this */
+			if (!dlm_is_host_down(ret))
+				BUG();
+			/* host is down, so answer for that node would be
+			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
 		}
 		if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
 			mlog(0, "lock master is %u\n", *real_master);
@@ -1372,9 +1367,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
 }
 
 
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-				 struct dlm_lock_resource *res,
-				 u8 nodenum, u8 *real_master)
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+			  u8 nodenum, u8 *real_master)
 {
 	int ret = -EINVAL;
 	struct dlm_master_requery req;
@@ -1739,6 +1733,13 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
 				} else
 					continue;
 
+				if (!list_empty(&res->recovering)) {
+					mlog(0, "%s:%.*s: lockres was "
+					     "marked RECOVERING, owner=%u\n",
+					     dlm->name, res->lockname.len,
+					     res->lockname.name, res->owner);
+					list_del_init(&res->recovering);
+				}
 				spin_lock(&res->spinlock);
 				dlm_change_lockres_owner(dlm, res, new_master);
 				res->state &= ~DLM_LOCK_RES_RECOVERING;
@@ -2258,7 +2259,10 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
 			mlog(0, "%u not in domain/live_nodes map "
 			     "so setting it in reco map manually\n",
 			     br->dead_node);
-		set_bit(br->dead_node, dlm->recovery_map);
+		/* force the recovery cleanup in __dlm_hb_node_down
+		 * both of these will be cleared in a moment */
+		set_bit(br->dead_node, dlm->domain_map);
+		set_bit(br->dead_node, dlm->live_nodes_map);
 		__dlm_hb_node_down(dlm, br->dead_node);
 	}
 	spin_unlock(&dlm->spinlock);