summary refs log tree commit diff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c87
1 files changed, 63 insertions, 24 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 30878defaeb6..e725005fafd0 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -754,6 +754,11 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
 	mutex_unlock(&ls->ls_waiters_mutex);
 }
 
+/* We clear the RESEND flag because we might be taking an lkb off the waiters
+   list as part of process_requestqueue (e.g. a lookup that has an optimized
+   request reply on the requestqueue) between dlm_recover_waiters_pre() which
+   set RESEND and dlm_recover_waiters_post() */
+
 static int _remove_from_waiters(struct dlm_lkb *lkb)
 {
 	int error = 0;
@@ -764,6 +769,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb)
 		goto out;
 	}
 	lkb->lkb_wait_type = 0;
+	lkb->lkb_flags &= ~DLM_IFL_RESEND;
 	list_del(&lkb->lkb_wait_reply);
 	unhold_lkb(lkb);
  out:
@@ -810,7 +816,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
 					    res_hashchain) {
 			if (!time_after_eq(jiffies, r->res_toss_time +
-					   dlm_config.toss_secs * HZ))
+					   dlm_config.ci_toss_secs * HZ))
 				continue;
 			found = 1;
 			break;
@@ -2144,12 +2150,24 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
 	if (lkb->lkb_astaddr)
 		ms->m_asts |= AST_COMP;
 
-	if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
-		memcpy(ms->m_extra, r->res_name, r->res_length);
+	/* compare with switch in create_message; send_remove() doesn't
+	   use send_args() */
 
-	else if (lkb->lkb_lvbptr)
+	switch (ms->m_type) {
+	case DLM_MSG_REQUEST:
+	case DLM_MSG_LOOKUP:
+		memcpy(ms->m_extra, r->res_name, r->res_length);
+		break;
+	case DLM_MSG_CONVERT:
+	case DLM_MSG_UNLOCK:
+	case DLM_MSG_REQUEST_REPLY:
+	case DLM_MSG_CONVERT_REPLY:
+	case DLM_MSG_GRANT:
+		if (!lkb->lkb_lvbptr)
+			break;
 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
-
+		break;
+	}
 }
 
 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
@@ -2418,8 +2436,12 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
 	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
 
-	if (receive_lvb(ls, lkb, ms))
-		return -ENOMEM;
+	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
+		/* lkb was just created so there won't be an lvb yet */
+		lkb->lkb_lvbptr = allocate_lvb(ls);
+		if (!lkb->lkb_lvbptr)
+			return -ENOMEM;
+	}
 
 	return 0;
 }
@@ -3002,7 +3024,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
 {
 	struct dlm_message *ms = (struct dlm_message *) hd;
 	struct dlm_ls *ls;
-	int error;
+	int error = 0;
 
 	if (!recovery)
 		dlm_message_in(ms);
@@ -3119,7 +3141,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
  out:
 	dlm_put_lockspace(ls);
 	dlm_astd_wake();
-	return 0;
+	return error;
 }
 
 
@@ -3132,6 +3154,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
 	if (middle_conversion(lkb)) {
 		hold_lkb(lkb);
 		ls->ls_stub_ms.m_result = -EINPROGRESS;
+		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
 		_remove_from_waiters(lkb);
 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
 
@@ -3205,6 +3228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 		case DLM_MSG_UNLOCK:
 			hold_lkb(lkb);
 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
 			_remove_from_waiters(lkb);
 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
@@ -3213,6 +3237,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
 		case DLM_MSG_CANCEL:
 			hold_lkb(lkb);
 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
 			_remove_from_waiters(lkb);
 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
 			dlm_put_lkb(lkb);
@@ -3571,6 +3596,14 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 	lock_rsb(r);
 
 	switch (error) {
+	case -EBADR:
+		/* There's a chance the new master received our lock before
+		   dlm_recover_master_reply(), this wouldn't happen if we did
+		   a barrier between recover_masters and recover_locks. */
+		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
+			  (unsigned long)r, r->res_name);
+		dlm_send_rcom_lock(r, lkb);
+		goto out;
 	case -EEXIST:
 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
 		/* fall through */
@@ -3585,7 +3618,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
 	/* an ack for dlm_recover_locks() which waits for replies from
 	   all the locks it sends to new masters */
 	dlm_recovered_lock(r);
-
+ out:
 	unlock_rsb(r);
 	put_rsb(r);
 	dlm_put_lkb(lkb);
@@ -3610,7 +3643,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
 	}
 
 	if (flags & DLM_LKF_VALBLK) {
-		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
 		if (!ua->lksb.sb_lvbptr) {
 			kfree(ua);
 			__put_lkb(ls, lkb);
@@ -3679,7 +3712,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
 
 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
-		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
 		if (!ua->lksb.sb_lvbptr) {
 			error = -ENOMEM;
 			goto out_put;
@@ -3745,12 +3778,10 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 		goto out_put;
 
 	spin_lock(&ua->proc->locks_spin);
-	list_del_init(&lkb->lkb_ownqueue);
+	/* dlm_user_add_ast() may have already taken lkb off the proc list */
+	if (!list_empty(&lkb->lkb_ownqueue))
+		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
 	spin_unlock(&ua->proc->locks_spin);
-
-	/* this removes the reference for the proc->locks list added by
-	   dlm_user_request */
-	unhold_lkb(lkb);
  out_put:
 	dlm_put_lkb(lkb);
  out:
@@ -3790,9 +3821,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
 	/* this lkb was removed from the WAITING queue */
 	if (lkb->lkb_grmode == DLM_LOCK_IV) {
 		spin_lock(&ua->proc->locks_spin);
-		list_del_init(&lkb->lkb_ownqueue);
+		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
 		spin_unlock(&ua->proc->locks_spin);
-		unhold_lkb(lkb);
 	}
  out_put:
 	dlm_put_lkb(lkb);
@@ -3853,11 +3883,6 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 	mutex_lock(&ls->ls_clear_proc_locks);
 
 	list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
-		if (lkb->lkb_ast_type) {
-			list_del(&lkb->lkb_astqueue);
-			unhold_lkb(lkb);
-		}
-
 		list_del_init(&lkb->lkb_ownqueue);
 
 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
@@ -3874,6 +3899,20 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 
 		dlm_put_lkb(lkb);
 	}
+
+	/* in-progress unlocks */
+	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
+		list_del_init(&lkb->lkb_ownqueue);
+		lkb->lkb_flags |= DLM_IFL_DEAD;
+		dlm_put_lkb(lkb);
+	}
+
+	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+		list_del(&lkb->lkb_astqueue);
+		dlm_put_lkb(lkb);
+	}
+
 	mutex_unlock(&ls->ls_clear_proc_locks);
 	unlock_recovery(ls);
 }
+