summary refs log tree commit diff
path: root/fs/dlm
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/ast.c4
-rw-r--r--fs/dlm/config.c79
-rw-r--r--fs/dlm/config.h2
-rw-r--r--fs/dlm/dlm_internal.h46
-rw-r--r--fs/dlm/lockspace.c15
-rw-r--r--fs/dlm/lowcomms.c215
-rw-r--r--fs/dlm/lowcomms.h2
-rw-r--r--fs/dlm/main.c2
-rw-r--r--fs/dlm/member.c17
-rw-r--r--fs/dlm/netlink.c8
-rw-r--r--fs/dlm/rcom.c2
-rw-r--r--fs/dlm/recoverd.c27
-rw-r--r--fs/dlm/recoverd.h1
-rw-r--r--fs/dlm/user.c7
14 files changed, 293 insertions, 134 deletions
diff --git a/fs/dlm/ast.c b/fs/dlm/ast.c
index 63dc19c54d5a..27a6ba9aaeec 100644
--- a/fs/dlm/ast.c
+++ b/fs/dlm/ast.c
@@ -15,8 +15,8 @@
 #include "lock.h"
 #include "user.h"
 
-static uint64_t			dlm_cb_seq;
-static spinlock_t		dlm_cb_seq_spin;
+static uint64_t dlm_cb_seq;
+static DEFINE_SPINLOCK(dlm_cb_seq_spin);
 
 static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb)
 {
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 9ccf7346834a..a0387dd8b1f0 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -750,6 +750,7 @@ static ssize_t comm_local_write(struct dlm_comm *cm, const char *buf,
 static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
 {
 	struct sockaddr_storage *addr;
+	int rv;
 
 	if (len != sizeof(struct sockaddr_storage))
 		return -EINVAL;
@@ -762,6 +763,13 @@ static ssize_t comm_addr_write(struct dlm_comm *cm, const char *buf, size_t len)
 		return -ENOMEM;
 
 	memcpy(addr, buf, len);
+
+	rv = dlm_lowcomms_addr(cm->nodeid, addr, len);
+	if (rv) {
+		kfree(addr);
+		return rv;
+	}
+
 	cm->addr[cm->addr_count++] = addr;
 	return len;
 }
@@ -878,34 +886,7 @@ static void put_space(struct dlm_space *sp)
 	config_item_put(&sp->group.cg_item);
 }
 
-static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
-{
-	switch (x->ss_family) {
-	case AF_INET: {
-		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
-		struct sockaddr_in *siny = (struct sockaddr_in *)y;
-		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
-			return 0;
-		if (sinx->sin_port != siny->sin_port)
-			return 0;
-		break;
-	}
-	case AF_INET6: {
-		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
-		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
-		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
-			return 0;
-		if (sinx->sin6_port != siny->sin6_port)
-			return 0;
-		break;
-	}
-	default:
-		return 0;
-	}
-	return 1;
-}
-
-static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr)
+static struct dlm_comm *get_comm(int nodeid)
 {
 	struct config_item *i;
 	struct dlm_comm *cm = NULL;
@@ -919,19 +900,11 @@ static struct dlm_comm *get_comm(int nodeid, struct sockaddr_storage *addr)
 	list_for_each_entry(i, &comm_list->cg_children, ci_entry) {
 		cm = config_item_to_comm(i);
 
-		if (nodeid) {
-			if (cm->nodeid != nodeid)
-				continue;
-			found = 1;
-			config_item_get(i);
-			break;
-		} else {
-			if (!cm->addr_count || !addr_compare(cm->addr[0], addr))
-				continue;
-			found = 1;
-			config_item_get(i);
-			break;
-		}
+		if (cm->nodeid != nodeid)
+			continue;
+		found = 1;
+		config_item_get(i);
+		break;
 	}
 	mutex_unlock(&clusters_root.subsys.su_mutex);
 
@@ -995,7 +968,7 @@ int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
 
 int dlm_comm_seq(int nodeid, uint32_t *seq)
 {
-	struct dlm_comm *cm = get_comm(nodeid, NULL);
+	struct dlm_comm *cm = get_comm(nodeid);
 	if (!cm)
 		return -EEXIST;
 	*seq = cm->seq;
@@ -1003,28 +976,6 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
 	return 0;
 }
 
-int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr)
-{
-	struct dlm_comm *cm = get_comm(nodeid, NULL);
-	if (!cm)
-		return -EEXIST;
-	if (!cm->addr_count)
-		return -ENOENT;
-	memcpy(addr, cm->addr[0], sizeof(*addr));
-	put_comm(cm);
-	return 0;
-}
-
-int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
-{
-	struct dlm_comm *cm = get_comm(0, addr);
-	if (!cm)
-		return -EEXIST;
-	*nodeid = cm->nodeid;
-	put_comm(cm);
-	return 0;
-}
-
 int dlm_our_nodeid(void)
 {
 	return local_comm ? local_comm->nodeid : 0;
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index dbd35a08f3a5..f30697bc2780 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -46,8 +46,6 @@ void dlm_config_exit(void);
 int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
 		     int *count_out);
 int dlm_comm_seq(int nodeid, uint32_t *seq);
-int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
-int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
 int dlm_our_nodeid(void);
 int dlm_our_addr(struct sockaddr_storage *addr, int num);
 
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 9d3e485f88c8..871c1abf6029 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -604,6 +604,7 @@ struct dlm_ls {
 	struct idr		ls_recover_idr;
 	spinlock_t		ls_recover_idr_lock;
 	wait_queue_head_t	ls_wait_general;
+	wait_queue_head_t	ls_recover_lock_wait;
 	struct mutex		ls_clear_proc_locks;
 
 	struct list_head	ls_root_list;	/* root resources */
@@ -616,15 +617,40 @@ struct dlm_ls {
 	char			ls_name[1];
 };
 
-#define LSFL_WORK		0
-#define LSFL_RUNNING		1
-#define LSFL_RECOVERY_STOP	2
-#define LSFL_RCOM_READY		3
-#define LSFL_RCOM_WAIT		4
-#define LSFL_UEVENT_WAIT	5
-#define LSFL_TIMEWARN		6
-#define LSFL_CB_DELAY		7
-#define LSFL_NODIR		8
+/*
+ * LSFL_RECOVER_STOP - dlm_ls_stop() sets this to tell dlm recovery routines
+ * that they should abort what they're doing so new recovery can be started.
+ *
+ * LSFL_RECOVER_DOWN - dlm_ls_stop() sets this to tell dlm_recoverd that it
+ * should do down_write() on the in_recovery rw_semaphore. (doing down_write
+ * within dlm_ls_stop causes complaints about the lock acquired/released
+ * in different contexts.)
+ *
+ * LSFL_RECOVER_LOCK - dlm_recoverd holds the in_recovery rw_semaphore.
+ * It sets this after it is done with down_write() on the in_recovery
+ * rw_semaphore and clears it after it has released the rw_semaphore.
+ *
+ * LSFL_RECOVER_WORK - dlm_ls_start() sets this to tell dlm_recoverd that it
+ * should begin recovery of the lockspace.
+ *
+ * LSFL_RUNNING - set when normal locking activity is enabled.
+ * dlm_ls_stop() clears this to tell dlm locking routines that they should
+ * quit what they are doing so recovery can run.  dlm_recoverd sets
+ * this after recovery is finished.
+ */
+
+#define LSFL_RECOVER_STOP	0
+#define LSFL_RECOVER_DOWN	1
+#define LSFL_RECOVER_LOCK	2
+#define LSFL_RECOVER_WORK	3
+#define LSFL_RUNNING		4
+
+#define LSFL_RCOM_READY		5
+#define LSFL_RCOM_WAIT		6
+#define LSFL_UEVENT_WAIT	7
+#define LSFL_TIMEWARN		8
+#define LSFL_CB_DELAY		9
+#define LSFL_NODIR		10
 
 /* much of this is just saving user space pointers associated with the
    lock that we pass back to the user lib with an ast */
@@ -667,7 +693,7 @@ static inline int dlm_locking_stopped(struct dlm_ls *ls)
 
 static inline int dlm_recovery_stopped(struct dlm_ls *ls)
 {
-	return test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+	return test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 }
 
 static inline int dlm_no_directory(struct dlm_ls *ls)
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index 952557d00ccd..2e99fb0c9737 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -582,8 +582,6 @@ static int new_lockspace(const char *name, const char *cluster,
 	INIT_LIST_HEAD(&ls->ls_root_list);
 	init_rwsem(&ls->ls_root_sem);
 
-	down_write(&ls->ls_in_recovery);
-
 	spin_lock(&lslist_lock);
 	ls->ls_create_count = 1;
 	list_add(&ls->ls_list, &lslist);
@@ -597,13 +595,24 @@ static int new_lockspace(const char *name, const char *cluster,
 		}
 	}
 
-	/* needs to find ls in lslist */
+	init_waitqueue_head(&ls->ls_recover_lock_wait);
+
+	/*
+	 * Once started, dlm_recoverd first looks for ls in lslist, then
+	 * initializes ls_in_recovery as locked in "down" mode.  We need
+	 * to wait for the wakeup from dlm_recoverd because in_recovery
+	 * has to start out in down mode.
+	 */
+
 	error = dlm_recoverd_start(ls);
 	if (error) {
 		log_error(ls, "can't start dlm_recoverd %d", error);
 		goto out_callback;
 	}
 
+	wait_event(ls->ls_recover_lock_wait,
+		   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
+
 	ls->ls_kobj.kset = dlm_kset;
 	error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
 				     "%s", ls->ls_name);
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 5c1b0e38c7a4..331ea4f94efd 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -140,6 +140,16 @@ struct writequeue_entry {
 	struct connection *con;
 };
 
+struct dlm_node_addr {
+	struct list_head list;
+	int nodeid;
+	int addr_count;
+	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
+};
+
+static LIST_HEAD(dlm_node_addrs);
+static DEFINE_SPINLOCK(dlm_node_addrs_spin);
+
 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 static int dlm_local_count;
 static int dlm_allow_conn;
@@ -264,31 +274,146 @@ static struct connection *assoc2con(int assoc_id)
 	return NULL;
 }
 
-static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
+static struct dlm_node_addr *find_node_addr(int nodeid)
+{
+	struct dlm_node_addr *na;
+
+	list_for_each_entry(na, &dlm_node_addrs, list) {
+		if (na->nodeid == nodeid)
+			return na;
+	}
+	return NULL;
+}
+
+static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
+{
+	switch (x->ss_family) {
+	case AF_INET: {
+		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
+		struct sockaddr_in *siny = (struct sockaddr_in *)y;
+		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
+			return 0;
+		if (sinx->sin_port != siny->sin_port)
+			return 0;
+		break;
+	}
+	case AF_INET6: {
+		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
+		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
+		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
+			return 0;
+		if (sinx->sin6_port != siny->sin6_port)
+			return 0;
+		break;
+	}
+	default:
+		return 0;
+	}
+	return 1;
+}
+
+static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
+			  struct sockaddr *sa_out)
 {
-	struct sockaddr_storage addr;
-	int error;
+	struct sockaddr_storage sas;
+	struct dlm_node_addr *na;
 
 	if (!dlm_local_count)
 		return -1;
 
-	error = dlm_nodeid_to_addr(nodeid, &addr);
-	if (error)
-		return error;
+	spin_lock(&dlm_node_addrs_spin);
+	na = find_node_addr(nodeid);
+	if (na && na->addr_count)
+		memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage));
+	spin_unlock(&dlm_node_addrs_spin);
+
+	if (!na)
+		return -EEXIST;
+
+	if (!na->addr_count)
+		return -ENOENT;
+
+	if (sas_out)
+		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
+
+	if (!sa_out)
+		return 0;
 
 	if (dlm_local_addr[0]->ss_family == AF_INET) {
-		struct sockaddr_in *in4  = (struct sockaddr_in *) &addr;
-		struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
+		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
+		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 	} else {
-		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &addr;
-		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
+		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
+		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 		ret6->sin6_addr = in6->sin6_addr;
 	}
 
 	return 0;
 }
 
+static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
+{
+	struct dlm_node_addr *na;
+	int rv = -EEXIST;
+
+	spin_lock(&dlm_node_addrs_spin);
+	list_for_each_entry(na, &dlm_node_addrs, list) {
+		if (!na->addr_count)
+			continue;
+
+		if (!addr_compare(na->addr[0], addr))
+			continue;
+
+		*nodeid = na->nodeid;
+		rv = 0;
+		break;
+	}
+	spin_unlock(&dlm_node_addrs_spin);
+	return rv;
+}
+
+int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
+{
+	struct sockaddr_storage *new_addr;
+	struct dlm_node_addr *new_node, *na;
+
+	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
+	if (!new_node)
+		return -ENOMEM;
+
+	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
+	if (!new_addr) {
+		kfree(new_node);
+		return -ENOMEM;
+	}
+
+	memcpy(new_addr, addr, len);
+
+	spin_lock(&dlm_node_addrs_spin);
+	na = find_node_addr(nodeid);
+	if (!na) {
+		new_node->nodeid = nodeid;
+		new_node->addr[0] = new_addr;
+		new_node->addr_count = 1;
+		list_add(&new_node->list, &dlm_node_addrs);
+		spin_unlock(&dlm_node_addrs_spin);
+		return 0;
+	}
+
+	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
+		spin_unlock(&dlm_node_addrs_spin);
+		kfree(new_addr);
+		kfree(new_node);
+		return -ENOSPC;
+	}
+
+	na->addr[na->addr_count++] = new_addr;
+	spin_unlock(&dlm_node_addrs_spin);
+	kfree(new_node);
+	return 0;
+}
+
 /* Data available on socket or listen socket received a connect */
 static void lowcomms_data_ready(struct sock *sk, int count_unused)
 {
@@ -348,7 +473,7 @@ int dlm_lowcomms_connect_node(int nodeid)
 }
 
 /* Make a socket active */
-static int add_sock(struct socket *sock, struct connection *con)
+static void add_sock(struct socket *sock, struct connection *con)
 {
 	con->sock = sock;
 
@@ -358,7 +483,6 @@ static int add_sock(struct socket *sock, struct connection *con)
 	con->sock->sk->sk_state_change = lowcomms_state_change;
 	con->sock->sk->sk_user_data = con;
 	con->sock->sk->sk_allocation = GFP_NOFS;
-	return 0;
 }
 
 /* Add the port number to an IPv6 or 4 sockaddr and return the address
@@ -510,7 +634,7 @@ static void process_sctp_notification(struct connection *con,
 				return;
 			}
 			make_sockaddr(&prim.ssp_addr, 0, &addr_len);
-			if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
+			if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 				unsigned char *b=(unsigned char *)&prim.ssp_addr;
 				log_print("reject connect from unknown addr");
 				print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
@@ -747,7 +871,7 @@ static int tcp_accept_from_sock(struct connection *con)
 
 	/* Get the new node's NODEID */
 	make_sockaddr(&peeraddr, 0, &len);
-	if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
+	if (addr_to_nodeid(&peeraddr, &nodeid)) {
 		unsigned char *b=(unsigned char *)&peeraddr;
 		log_print("connect from non cluster node");
 		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
@@ -862,7 +986,7 @@ static void sctp_init_assoc(struct connection *con)
 	if (con->retries++ > MAX_CONNECT_RETRIES)
 		return;
 
-	if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) {
+	if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) {
 		log_print("no address for nodeid %d", con->nodeid);
 		return;
 	}
@@ -928,11 +1052,11 @@ static void sctp_init_assoc(struct connection *con)
 /* Connect a new socket to its peer */
 static void tcp_connect_to_sock(struct connection *con)
 {
-	int result = -EHOSTUNREACH;
 	struct sockaddr_storage saddr, src_addr;
 	int addr_len;
 	struct socket *sock = NULL;
 	int one = 1;
+	int result;
 
 	if (con->nodeid == 0) {
 		log_print("attempt to connect sock 0 foiled");
@@ -944,10 +1068,8 @@ static void tcp_connect_to_sock(struct connection *con)
 		goto out;
 
 	/* Some odd races can cause double-connects, ignore them */
-	if (con->sock) {
-		result = 0;
+	if (con->sock)
 		goto out;
-	}
 
 	/* Create a socket to communicate with */
 	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
@@ -956,8 +1078,11 @@ static void tcp_connect_to_sock(struct connection *con)
 		goto out_err;
 
 	memset(&saddr, 0, sizeof(saddr));
-	if (dlm_nodeid_to_addr(con->nodeid, &saddr))
+	result = nodeid_to_addr(con->nodeid, &saddr, NULL);
+	if (result < 0) {
+		log_print("no address for nodeid %d", con->nodeid);
 		goto out_err;
+	}
 
 	sock->sk->sk_user_data = con;
 	con->rx_action = receive_from_sock;
@@ -983,8 +1108,7 @@ static void tcp_connect_to_sock(struct connection *con)
 	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
 			  sizeof(one));
 
-	result =
-		sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
+	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
 				   O_NONBLOCK);
 	if (result == -EINPROGRESS)
 		result = 0;
@@ -1002,11 +1126,17 @@ out_err:
 	 * Some errors are fatal and this list might need adjusting. For other
 	 * errors we try again until the max number of retries is reached.
 	 */
-	if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
-	    result != -ENETDOWN && result != -EINVAL
-	    && result != -EPROTONOSUPPORT) {
+	if (result != -EHOSTUNREACH &&
+	    result != -ENETUNREACH &&
+	    result != -ENETDOWN && 
+	    result != -EINVAL &&
+	    result != -EPROTONOSUPPORT) {
+		log_print("connect %d try %d error %d", con->nodeid,
+			  con->retries, result);
+		mutex_unlock(&con->sock_mutex);
+		msleep(1000);
 		lowcomms_connect_sock(con);
-		result = 0;
+		return;
 	}
 out:
 	mutex_unlock(&con->sock_mutex);
@@ -1044,10 +1174,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
 	if (result < 0) {
 		log_print("Failed to set SO_REUSEADDR on socket: %d", result);
 	}
-	sock->sk->sk_user_data = con;
 	con->rx_action = tcp_accept_from_sock;
 	con->connect_action = tcp_connect_to_sock;
-	con->sock = sock;
 
 	/* Bind to our port */
 	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
@@ -1358,8 +1486,7 @@ static void send_to_sock(struct connection *con)
 				}
 				cond_resched();
 				goto out;
-			}
-			if (ret <= 0)
+			} else if (ret < 0)
 				goto send_error;
 		}
 
@@ -1376,7 +1503,6 @@ static void send_to_sock(struct connection *con)
 		if (e->len == 0 && e->users == 0) {
 			list_del(&e->list);
 			free_entry(e);
-			continue;
 		}
 	}
 	spin_unlock(&con->writequeue_lock);
@@ -1394,7 +1520,6 @@ out_connect:
 	mutex_unlock(&con->sock_mutex);
 	if (!test_bit(CF_INIT_PENDING, &con->flags))
 		lowcomms_connect_sock(con);
-	return;
 }
 
 static void clean_one_writequeue(struct connection *con)
@@ -1414,6 +1539,7 @@ static void clean_one_writequeue(struct connection *con)
 int dlm_lowcomms_close(int nodeid)
 {
 	struct connection *con;
+	struct dlm_node_addr *na;
 
 	log_print("closing connection to node %d", nodeid);
 	con = nodeid2con(nodeid, 0);
@@ -1428,6 +1554,17 @@ int dlm_lowcomms_close(int nodeid)
 		clean_one_writequeue(con);
 		close_connection(con, true);
 	}
+
+	spin_lock(&dlm_node_addrs_spin);
+	na = find_node_addr(nodeid);
+	if (na) {
+		list_del(&na->list);
+		while (na->addr_count--)
+			kfree(na->addr[na->addr_count]);
+		kfree(na);
+	}
+	spin_unlock(&dlm_node_addrs_spin);
+
 	return 0;
 }
 
@@ -1577,3 +1714,17 @@ fail_destroy:
 fail:
 	return error;
 }
+
+void dlm_lowcomms_exit(void)
+{
+	struct dlm_node_addr *na, *safe;
+
+	spin_lock(&dlm_node_addrs_spin);
+	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
+		list_del(&na->list);
+		while (na->addr_count--)
+			kfree(na->addr[na->addr_count]);
+		kfree(na);
+	}
+	spin_unlock(&dlm_node_addrs_spin);
+}
diff --git a/fs/dlm/lowcomms.h b/fs/dlm/lowcomms.h
index 1311e6426287..67462e54fc2f 100644
--- a/fs/dlm/lowcomms.h
+++ b/fs/dlm/lowcomms.h
@@ -16,10 +16,12 @@
 
 int dlm_lowcomms_start(void);
 void dlm_lowcomms_stop(void);
+void dlm_lowcomms_exit(void);
 int dlm_lowcomms_close(int nodeid);
 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc);
 void dlm_lowcomms_commit_buffer(void *mh);
 int dlm_lowcomms_connect_node(int nodeid);
+int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);
 
 #endif				/* __LOWCOMMS_DOT_H__ */
 
diff --git a/fs/dlm/main.c b/fs/dlm/main.c
index 5a59efa0bb46..079c0bd71ab7 100644
--- a/fs/dlm/main.c
+++ b/fs/dlm/main.c
@@ -17,6 +17,7 @@
 #include "user.h"
 #include "memory.h"
 #include "config.h"
+#include "lowcomms.h"
 
 static int __init init_dlm(void)
 {
@@ -78,6 +79,7 @@ static void __exit exit_dlm(void)
 	dlm_config_exit();
 	dlm_memory_exit();
 	dlm_lockspace_exit();
+	dlm_lowcomms_exit();
 	dlm_unregister_debugfs();
 }
 
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index 862640a36d5c..476557b54921 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -616,13 +616,13 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	down_write(&ls->ls_recv_active);
 
 	/*
-	 * Abort any recovery that's in progress (see RECOVERY_STOP,
+	 * Abort any recovery that's in progress (see RECOVER_STOP,
 	 * dlm_recovery_stopped()) and tell any other threads running in the
 	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
 	 */
 
 	spin_lock(&ls->ls_recover_lock);
-	set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+	set_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
 	ls->ls_recover_seq++;
 	spin_unlock(&ls->ls_recover_lock);
@@ -642,12 +642,16 @@ int dlm_ls_stop(struct dlm_ls *ls)
 	 *    when recovery is complete.
 	 */
 
-	if (new)
-		down_write(&ls->ls_in_recovery);
+	if (new) {
+		set_bit(LSFL_RECOVER_DOWN, &ls->ls_flags);
+		wake_up_process(ls->ls_recoverd_task);
+		wait_event(ls->ls_recover_lock_wait,
+			   test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
+	}
 
 	/*
 	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
-	 * running) has noticed RECOVERY_STOP above and quit processing the
+	 * running) has noticed RECOVER_STOP above and quit processing the
 	 * previous recovery.
 	 */
 
@@ -709,7 +713,8 @@ int dlm_ls_start(struct dlm_ls *ls)
 		kfree(rv_old);
 	}
 
-	dlm_recoverd_kick(ls);
+	set_bit(LSFL_RECOVER_WORK, &ls->ls_flags);
+	wake_up_process(ls->ls_recoverd_task);
 	return 0;
 
  fail:
diff --git a/fs/dlm/netlink.c b/fs/dlm/netlink.c
index ef17e0169da1..60a327863b11 100644
--- a/fs/dlm/netlink.c
+++ b/fs/dlm/netlink.c
@@ -14,7 +14,7 @@
 #include "dlm_internal.h"
 
 static uint32_t dlm_nl_seqnum;
-static uint32_t listener_nlpid;
+static uint32_t listener_nlportid;
 
 static struct genl_family family = {
 	.id		= GENL_ID_GENERATE,
@@ -64,13 +64,13 @@ static int send_data(struct sk_buff *skb)
 		return rv;
 	}
 
-	return genlmsg_unicast(&init_net, skb, listener_nlpid);
+	return genlmsg_unicast(&init_net, skb, listener_nlportid);
 }
 
 static int user_cmd(struct sk_buff *skb, struct genl_info *info)
 {
-	listener_nlpid = info->snd_pid;
-	printk("user_cmd nlpid %u\n", listener_nlpid);
+	listener_nlportid = info->snd_portid;
+	printk("user_cmd nlpid %u\n", listener_nlportid);
 	return 0;
 }
 
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 87f1a56eab32..9d61947d473a 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -581,7 +581,7 @@ void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 
 	spin_lock(&ls->ls_recover_lock);
 	status = ls->ls_recover_status;
-	stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+	stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	seq = ls->ls_recover_seq;
 	spin_unlock(&ls->ls_recover_lock);
 
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 88ce65ff021e..32f9f8926ec3 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -41,6 +41,7 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq)
 		set_bit(LSFL_RUNNING, &ls->ls_flags);
 		/* unblocks processes waiting to enter the dlm */
 		up_write(&ls->ls_in_recovery);
+		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
 		error = 0;
 	}
 	spin_unlock(&ls->ls_recover_lock);
@@ -262,7 +263,7 @@ static void do_ls_recovery(struct dlm_ls *ls)
 	rv = ls->ls_recover_args;
 	ls->ls_recover_args = NULL;
 	if (rv && ls->ls_recover_seq == rv->seq)
-		clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
+		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
 	spin_unlock(&ls->ls_recover_lock);
 
 	if (rv) {
@@ -282,26 +283,34 @@ static int dlm_recoverd(void *arg)
 		return -1;
 	}
 
+	down_write(&ls->ls_in_recovery);
+	set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
+	wake_up(&ls->ls_recover_lock_wait);
+
 	while (!kthread_should_stop()) {
 		set_current_state(TASK_INTERRUPTIBLE);
-		if (!test_bit(LSFL_WORK, &ls->ls_flags))
+		if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
+		    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags))
 			schedule();
 		set_current_state(TASK_RUNNING);
 
-		if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags))
+		if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
+			down_write(&ls->ls_in_recovery);
+			set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
+			wake_up(&ls->ls_recover_lock_wait);
+		}
+
+		if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
 			do_ls_recovery(ls);
 	}
 
+	if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
+		up_write(&ls->ls_in_recovery);
+
 	dlm_put_lockspace(ls);
 	return 0;
 }
 
-void dlm_recoverd_kick(struct dlm_ls *ls)
-{
-	set_bit(LSFL_WORK, &ls->ls_flags);
-	wake_up_process(ls->ls_recoverd_task);
-}
-
 int dlm_recoverd_start(struct dlm_ls *ls)
 {
 	struct task_struct *p;
diff --git a/fs/dlm/recoverd.h b/fs/dlm/recoverd.h
index 866657c5d69d..8856079733fa 100644
--- a/fs/dlm/recoverd.h
+++ b/fs/dlm/recoverd.h
@@ -14,7 +14,6 @@
 #ifndef __RECOVERD_DOT_H__
 #define __RECOVERD_DOT_H__
 
-void dlm_recoverd_kick(struct dlm_ls *ls);
 void dlm_recoverd_stop(struct dlm_ls *ls);
 int dlm_recoverd_start(struct dlm_ls *ls);
 void dlm_recoverd_suspend(struct dlm_ls *ls);
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index eb4ed9ba3098..7ff49852b0cb 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -503,6 +503,13 @@ static ssize_t device_write(struct file *file, const char __user *buf,
 #endif
 		return -EINVAL;
 
+#ifdef CONFIG_COMPAT
+	if (count > sizeof(struct dlm_write_request32) + DLM_RESNAME_MAXLEN)
+#else
+	if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN)
+#endif
+		return -EINVAL;
+
 	kbuf = kzalloc(count + 1, GFP_NOFS);
 	if (!kbuf)
 		return -ENOMEM;