summary refs log tree commit diff
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c336
-rw-r--r--net/tipc/bcast.h2
-rw-r--r--net/tipc/bearer.c5
-rw-r--r--net/tipc/config.c21
-rw-r--r--net/tipc/core.c10
-rw-r--r--net/tipc/core.h42
-rw-r--r--net/tipc/discover.c79
-rw-r--r--net/tipc/link.c299
-rw-r--r--net/tipc/log.c2
-rw-r--r--net/tipc/msg.c2
-rw-r--r--net/tipc/msg.h15
-rw-r--r--net/tipc/name_distr.c8
-rw-r--r--net/tipc/name_table.c48
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/net.c11
-rw-r--r--net/tipc/node.c84
-rw-r--r--net/tipc/node.h37
-rw-r--r--net/tipc/port.c72
-rw-r--r--net/tipc/port.h42
-rw-r--r--net/tipc/socket.c11
-rw-r--r--net/tipc/subscr.c2
21 files changed, 532 insertions, 598 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 8eb87b11d100..e00441a2092f 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void)
 	return bcl->fsm_msg_cnt;
 }
 
-/**
- * bclink_set_gap - set gap according to contents of current deferred pkt queue
- *
- * Called with 'node' locked, bc_lock unlocked
- */
-
-static void bclink_set_gap(struct tipc_node *n_ptr)
-{
-	struct sk_buff *buf = n_ptr->bclink.deferred_head;
-
-	n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
-		mod(n_ptr->bclink.last_in);
-	if (unlikely(buf != NULL))
-		n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
-}
-
-/**
- * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
- *
- * This mechanism endeavours to prevent all nodes in network from trying
- * to ACK or NACK at the same time.
- *
- * Note: TIPC uses a different trigger to distribute ACKs than it does to
- *       distribute NACKs, but tries to use the same spacing (divide by 16).
- */
-
-static int bclink_ack_allowed(u32 n)
+static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
 {
-	return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag;
+	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
+						seqno : node->bclink.last_sent;
 }
 
 
-/**
+/*
  * tipc_bclink_retransmit_to - get most recent node to request retransmission
  *
  * Called with bc_lock locked
@@ -281,7 +256,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 		if (bcbuf_acks(crs) == 0) {
 			bcl->first_out = next;
 			bcl->out_queue_size--;
-			buf_discard(crs);
+			kfree_skb(crs);
 			released = 1;
 		}
 		crs = next;
@@ -300,140 +275,94 @@ exit:
 	spin_unlock_bh(&bc_lock);
 }
 
-/**
- * bclink_send_ack - unicast an ACK msg
+/*
+ * tipc_bclink_update_link_state - update broadcast link state
  *
  * tipc_net_lock and node lock set
  */
 
-static void bclink_send_ack(struct tipc_node *n_ptr)
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
 {
-	struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1];
+	struct sk_buff *buf;
 
-	if (l_ptr != NULL)
-		tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
-}
+	/* Ignore "stale" link state info */
 
-/**
- * bclink_send_nack- broadcast a NACK msg
- *
- * tipc_net_lock and node lock set
- */
+	if (less_eq(last_sent, n_ptr->bclink.last_in))
+		return;
 
-static void bclink_send_nack(struct tipc_node *n_ptr)
-{
-	struct sk_buff *buf;
-	struct tipc_msg *msg;
+	/* Update link synchronization state; quit if in sync */
+
+	bclink_update_last_sent(n_ptr, last_sent);
+
+	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
+		return;
+
+	/* Update out-of-sync state; quit if loss is still unconfirmed */
+
+	if ((++n_ptr->bclink.oos_state) == 1) {
+		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
+			return;
+		n_ptr->bclink.oos_state++;
+	}
 
-	if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to))
+	/* Don't NACK if one has been recently sent (or seen) */
+
+	if (n_ptr->bclink.oos_state & 0x1)
 		return;
 
+	/* Send NACK */
+
 	buf = tipc_buf_acquire(INT_H_SIZE);
 	if (buf) {
-		msg = buf_msg(buf);
+		struct tipc_msg *msg = buf_msg(buf);
+
 		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
-			 INT_H_SIZE, n_ptr->addr);
+			      INT_H_SIZE, n_ptr->addr);
 		msg_set_non_seq(msg, 1);
 		msg_set_mc_netid(msg, tipc_net_id);
-		msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
-		msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
-		msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
-		msg_set_bcast_tag(msg, tipc_own_tag);
+		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
+		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
+		msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
+				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
+				 : n_ptr->bclink.last_sent);
 
+		spin_lock_bh(&bc_lock);
 		tipc_bearer_send(&bcbearer->bearer, buf, NULL);
 		bcl->stats.sent_nacks++;
-		buf_discard(buf);
-
-		/*
-		 * Ensure we doesn't send another NACK msg to the node
-		 * until 16 more deferred messages arrive from it
-		 * (i.e. helps prevent all nodes from NACK'ing at same time)
-		 */
+		spin_unlock_bh(&bc_lock);
+		kfree_skb(buf);
 
-		n_ptr->bclink.nack_sync = tipc_own_tag;
+		n_ptr->bclink.oos_state++;
 	}
 }
 
-/**
- * tipc_bclink_check_gap - send a NACK if a sequence gap exists
+/*
+ * bclink_peek_nack - monitor retransmission requests sent by other nodes
  *
- * tipc_net_lock and node lock set
- */
-
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
-{
-	if (!n_ptr->bclink.supported ||
-	    less_eq(last_sent, mod(n_ptr->bclink.last_in)))
-		return;
-
-	bclink_set_gap(n_ptr);
-	if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
-		n_ptr->bclink.gap_to = last_sent;
-	bclink_send_nack(n_ptr);
-}
-
-/**
- * tipc_bclink_peek_nack - process a NACK msg meant for another node
+ * Delay any upcoming NACK by this node if another node has already
+ * requested the first message this node is going to ask for.
  *
  * Only tipc_net_lock set.
  */
 
-static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to)
+static void bclink_peek_nack(struct tipc_msg *msg)
 {
-	struct tipc_node *n_ptr = tipc_node_find(dest);
-	u32 my_after, my_to;
+	struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
 
-	if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr)))
+	if (unlikely(!n_ptr))
 		return;
+
 	tipc_node_lock(n_ptr);
-	/*
-	 * Modify gap to suppress unnecessary NACKs from this node
-	 */
-	my_after = n_ptr->bclink.gap_after;
-	my_to = n_ptr->bclink.gap_to;
-
-	if (less_eq(gap_after, my_after)) {
-		if (less(my_after, gap_to) && less(gap_to, my_to))
-			n_ptr->bclink.gap_after = gap_to;
-		else if (less_eq(my_to, gap_to))
-			n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
-	} else if (less_eq(gap_after, my_to)) {
-		if (less_eq(my_to, gap_to))
-			n_ptr->bclink.gap_to = gap_after;
-	} else {
-		/*
-		 * Expand gap if missing bufs not in deferred queue:
-		 */
-		struct sk_buff *buf = n_ptr->bclink.deferred_head;
-		u32 prev = n_ptr->bclink.gap_to;
 
-		for (; buf; buf = buf->next) {
-			u32 seqno = buf_seqno(buf);
+	if (n_ptr->bclink.supported &&
+	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
+	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
+		n_ptr->bclink.oos_state = 2;
 
-			if (mod(seqno - prev) != 1) {
-				buf = NULL;
-				break;
-			}
-			if (seqno == gap_after)
-				break;
-			prev = seqno;
-		}
-		if (buf == NULL)
-			n_ptr->bclink.gap_to = gap_after;
-	}
-	/*
-	 * Some nodes may send a complementary NACK now:
-	 */
-	if (bclink_ack_allowed(sender_tag + 1)) {
-		if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
-			bclink_send_nack(n_ptr);
-			bclink_set_gap(n_ptr);
-		}
-	}
 	tipc_node_unlock(n_ptr);
 }
 
-/**
+/*
  * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
  */
 
@@ -445,7 +374,7 @@ int tipc_bclink_send_msg(struct sk_buff *buf)
 
 	if (!bclink->bcast_nodes.count) {
 		res = msg_data_sz(buf_msg(buf));
-		buf_discard(buf);
+		kfree_skb(buf);
 		goto exit;
 	}
 
@@ -460,7 +389,33 @@ exit:
 	return res;
 }
 
-/**
+/*
+ * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
+ *
+ * Called with both sending node's lock and bc_lock taken.
+ */
+
+static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
+{
+	bclink_update_last_sent(node, seqno);
+	node->bclink.last_in = seqno;
+	node->bclink.oos_state = 0;
+	bcl->stats.recv_info++;
+
+	/*
+	 * Unicast an ACK periodically, ensuring that
+	 * all nodes in the cluster don't ACK at the same time
+	 */
+
+	if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
+		tipc_link_send_proto_msg(
+			node->active_links[node->addr & 1],
+			STATE_MSG, 0, 0, 0, 0, 0);
+		bcl->stats.sent_acks++;
+	}
+}
+
+/*
  * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
  *
  * tipc_net_lock is read_locked, no other locks set
@@ -472,7 +427,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
 	struct tipc_node *node;
 	u32 next_in;
 	u32 seqno;
-	struct sk_buff *deferred;
+	int deferred;
 
 	/* Screen out unwanted broadcast messages */
 
@@ -487,6 +442,8 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
 	if (unlikely(!node->bclink.supported))
 		goto unlock;
 
+	/* Handle broadcast protocol message */
+
 	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
 		if (msg_type(msg) != STATE_MSG)
 			goto unlock;
@@ -501,89 +458,118 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
 			spin_unlock_bh(&bc_lock);
 		} else {
 			tipc_node_unlock(node);
-			tipc_bclink_peek_nack(msg_destnode(msg),
-					      msg_bcast_tag(msg),
-					      msg_bcgap_after(msg),
-					      msg_bcgap_to(msg));
+			bclink_peek_nack(msg);
 		}
 		goto exit;
 	}
 
 	/* Handle in-sequence broadcast message */
 
-receive:
-	next_in = mod(node->bclink.last_in + 1);
 	seqno = msg_seqno(msg);
+	next_in = mod(node->bclink.last_in + 1);
 
 	if (likely(seqno == next_in)) {
-		bcl->stats.recv_info++;
-		node->bclink.last_in++;
-		bclink_set_gap(node);
-		if (unlikely(bclink_ack_allowed(seqno))) {
-			bclink_send_ack(node);
-			bcl->stats.sent_acks++;
-		}
+receive:
+		/* Deliver message to destination */
+
 		if (likely(msg_isdata(msg))) {
+			spin_lock_bh(&bc_lock);
+			bclink_accept_pkt(node, seqno);
+			spin_unlock_bh(&bc_lock);
 			tipc_node_unlock(node);
 			if (likely(msg_mcast(msg)))
 				tipc_port_recv_mcast(buf, NULL);
 			else
-				buf_discard(buf);
+				kfree_skb(buf);
 		} else if (msg_user(msg) == MSG_BUNDLER) {
+			spin_lock_bh(&bc_lock);
+			bclink_accept_pkt(node, seqno);
 			bcl->stats.recv_bundles++;
 			bcl->stats.recv_bundled += msg_msgcnt(msg);
+			spin_unlock_bh(&bc_lock);
 			tipc_node_unlock(node);
 			tipc_link_recv_bundle(buf);
 		} else if (msg_user(msg) == MSG_FRAGMENTER) {
+			int ret = tipc_link_recv_fragment(&node->bclink.defragm,
+						      &buf, &msg);
+			if (ret < 0)
+				goto unlock;
+			spin_lock_bh(&bc_lock);
+			bclink_accept_pkt(node, seqno);
 			bcl->stats.recv_fragments++;
-			if (tipc_link_recv_fragment(&node->bclink.defragm,
-						    &buf, &msg))
+			if (ret > 0)
 				bcl->stats.recv_fragmented++;
+			spin_unlock_bh(&bc_lock);
 			tipc_node_unlock(node);
 			tipc_net_route_msg(buf);
 		} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
+			spin_lock_bh(&bc_lock);
+			bclink_accept_pkt(node, seqno);
+			spin_unlock_bh(&bc_lock);
 			tipc_node_unlock(node);
 			tipc_named_recv(buf);
 		} else {
+			spin_lock_bh(&bc_lock);
+			bclink_accept_pkt(node, seqno);
+			spin_unlock_bh(&bc_lock);
 			tipc_node_unlock(node);
-			buf_discard(buf);
+			kfree_skb(buf);
 		}
 		buf = NULL;
+
+		/* Determine new synchronization state */
+
 		tipc_node_lock(node);
-		deferred = node->bclink.deferred_head;
-		if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) {
-			buf = deferred;
-			msg = buf_msg(buf);
-			node->bclink.deferred_head = deferred->next;
-			goto receive;
-		}
-	} else if (less(next_in, seqno)) {
-		u32 gap_after = node->bclink.gap_after;
-		u32 gap_to = node->bclink.gap_to;
-
-		if (tipc_link_defer_pkt(&node->bclink.deferred_head,
-					&node->bclink.deferred_tail,
-					buf)) {
-			node->bclink.nack_sync++;
-			bcl->stats.deferred_recv++;
-			if (seqno == mod(gap_after + 1))
-				node->bclink.gap_after = seqno;
-			else if (less(gap_after, seqno) && less(seqno, gap_to))
-				node->bclink.gap_to = seqno;
+		if (unlikely(!tipc_node_is_up(node)))
+			goto unlock;
+
+		if (node->bclink.last_in == node->bclink.last_sent)
+			goto unlock;
+
+		if (!node->bclink.deferred_head) {
+			node->bclink.oos_state = 1;
+			goto unlock;
 		}
+
+		msg = buf_msg(node->bclink.deferred_head);
+		seqno = msg_seqno(msg);
+		next_in = mod(next_in + 1);
+		if (seqno != next_in)
+			goto unlock;
+
+		/* Take in-sequence message from deferred queue & deliver it */
+
+		buf = node->bclink.deferred_head;
+		node->bclink.deferred_head = buf->next;
+		node->bclink.deferred_size--;
+		goto receive;
+	}
+
+	/* Handle out-of-sequence broadcast message */
+
+	if (less(next_in, seqno)) {
+		deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
+					       &node->bclink.deferred_tail,
+					       buf);
+		node->bclink.deferred_size += deferred;
+		bclink_update_last_sent(node, seqno);
 		buf = NULL;
-		if (bclink_ack_allowed(node->bclink.nack_sync)) {
-			if (gap_to != gap_after)
-				bclink_send_nack(node);
-			bclink_set_gap(node);
-		}
-	} else {
+	} else
+		deferred = 0;
+
+	spin_lock_bh(&bc_lock);
+
+	if (deferred)
+		bcl->stats.deferred_recv++;
+	else
 		bcl->stats.duplicates++;
-	}
+
+	spin_unlock_bh(&bc_lock);
+
 unlock:
 	tipc_node_unlock(node);
 exit:
-	buf_discard(buf);
+	kfree_skb(buf);
 }
 
 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index b009666c60b0..5571394098f9 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -96,7 +96,7 @@ int  tipc_bclink_send_msg(struct sk_buff *buf);
 void tipc_bclink_recv_pkt(struct sk_buff *buf);
 u32  tipc_bclink_get_last_sent(void);
 u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr);
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno);
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
 int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 329fb659fae4..5dfd89c40429 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -435,7 +435,7 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
 	u32 i;
 	int res = -EINVAL;
 
-	if (tipc_mode != TIPC_NET_MODE) {
+	if (!tipc_own_addr) {
 		warn("Bearer <%s> rejected, not supported in standalone mode\n",
 		     name);
 		return -ENOPROTOOPT;
@@ -456,8 +456,7 @@ int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
 		warn("Bearer <%s> rejected, illegal discovery domain\n", name);
 		return -EINVAL;
 	}
-	if ((priority < TIPC_MIN_LINK_PRI ||
-	     priority > TIPC_MAX_LINK_PRI) &&
+	if ((priority > TIPC_MAX_LINK_PRI) &&
 	    (priority != TIPC_MEDIA_LINK_PRI)) {
 		warn("Bearer <%s> rejected, illegal priority\n", name);
 		return -EINVAL;
diff --git a/net/tipc/config.c b/net/tipc/config.c
index 4785bf26cdf4..f76d3b15e4e2 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -179,7 +179,7 @@ static struct sk_buff *cfg_set_own_addr(void)
 	if (!tipc_addr_node_valid(addr))
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (node address)");
-	if (tipc_mode == TIPC_NET_MODE)
+	if (tipc_own_addr)
 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
 						   " (cannot change node address once assigned)");
 
@@ -218,7 +218,7 @@ static struct sk_buff *cfg_set_max_publications(void)
 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
 
 	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
-	if (value != delimit(value, 1, 65535))
+	if (value < 1 || value > 65535)
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (max publications must be 1-65535)");
 	tipc_max_publications = value;
@@ -233,7 +233,7 @@ static struct sk_buff *cfg_set_max_subscriptions(void)
 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
 
 	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
-	if (value != delimit(value, 1, 65535))
+	if (value < 1 || value > 65535)
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (max subscriptions must be 1-65535");
 	tipc_max_subscriptions = value;
@@ -249,14 +249,11 @@ static struct sk_buff *cfg_set_max_ports(void)
 	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
 	if (value == tipc_max_ports)
 		return tipc_cfg_reply_none();
-	if (value != delimit(value, 127, 65535))
+	if (value < 127 || value > 65535)
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (max ports must be 127-65535)");
-	if (tipc_mode != TIPC_NOT_RUNNING)
-		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
-			" (cannot change max ports while TIPC is active)");
-	tipc_max_ports = value;
-	return tipc_cfg_reply_none();
+	return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
+		" (cannot change max ports while TIPC is active)");
 }
 
 static struct sk_buff *cfg_set_netid(void)
@@ -268,10 +265,10 @@ static struct sk_buff *cfg_set_netid(void)
 	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
 	if (value == tipc_net_id)
 		return tipc_cfg_reply_none();
-	if (value != delimit(value, 1, 9999))
+	if (value < 1 || value > 9999)
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (network id must be 1-9999)");
-	if (tipc_mode == TIPC_NET_MODE)
+	if (tipc_own_addr)
 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
 			" (cannot change network id once TIPC has joined a network)");
 	tipc_net_id = value;
@@ -481,7 +478,7 @@ int tipc_cfg_init(void)
 
 	seq.type = TIPC_CFG_SRV;
 	seq.lower = seq.upper = tipc_own_addr;
-	res = tipc_nametbl_publish_rsv(config_port_ref, TIPC_ZONE_SCOPE, &seq);
+	res = tipc_publish(config_port_ref, TIPC_ZONE_SCOPE, &seq);
 	if (res)
 		goto failed;
 
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 2691cd57b8a8..68eba03e7955 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -53,7 +53,6 @@
 
 /* global variables used by multiple sub-systems within TIPC */
 
-int tipc_mode = TIPC_NOT_RUNNING;
 int tipc_random;
 
 const char tipc_alphabet[] =
@@ -125,11 +124,6 @@ int tipc_core_start_net(unsigned long addr)
 
 static void tipc_core_stop(void)
 {
-	if (tipc_mode != TIPC_NODE_MODE)
-		return;
-
-	tipc_mode = TIPC_NOT_RUNNING;
-
 	tipc_netlink_stop();
 	tipc_handler_stop();
 	tipc_cfg_stop();
@@ -148,11 +142,7 @@ static int tipc_core_start(void)
 {
 	int res;
 
-	if (tipc_mode != TIPC_NOT_RUNNING)
-		return -ENOPROTOOPT;
-
 	get_random_bytes(&tipc_random, sizeof(tipc_random));
-	tipc_mode = TIPC_NODE_MODE;
 
 	res = tipc_handler_start();
 	if (!res)
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 2761af36d141..13837e0e56b1 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -130,13 +130,6 @@ void tipc_msg_dbg(struct print_buf *, struct tipc_msg *, const char *);
 #define ELINKCONG EAGAIN	/* link congestion <=> resource unavailable */
 
 /*
- * TIPC operating mode routines
- */
-#define TIPC_NOT_RUNNING  0
-#define TIPC_NODE_MODE    1
-#define TIPC_NET_MODE     2
-
-/*
  * Global configuration variables
  */
 
@@ -151,7 +144,6 @@ extern int tipc_remote_management;
  * Other global variables
  */
 
-extern int tipc_mode;
 extern int tipc_random;
 extern const char tipc_alphabet[];
 
@@ -168,16 +160,6 @@ extern void tipc_netlink_stop(void);
 extern int  tipc_socket_init(void);
 extern void tipc_socket_stop(void);
 
-static inline int delimit(int val, int min, int max)
-{
-	if (val > max)
-		return max;
-	if (val < min)
-		return min;
-	return val;
-}
-
-
 /*
  * TIPC timer and signal code
  */
@@ -279,28 +261,4 @@ static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
 
 extern struct sk_buff *tipc_buf_acquire(u32 size);
 
-/**
- * buf_discard - frees a TIPC message buffer
- * @skb: message buffer
- *
- * Frees a message buffer.  If passed NULL, just returns.
- */
-
-static inline void buf_discard(struct sk_buff *skb)
-{
-	kfree_skb(skb);
-}
-
-/**
- * buf_linearize - convert a TIPC message buffer into a single contiguous piece
- * @skb: message buffer
- *
- * Returns 0 on success.
- */
-
-static inline int buf_linearize(struct sk_buff *skb)
-{
-	return skb_linearize(skb);
-}
-
 #endif
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index a00e5f811569..c630a21b2bed 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -82,6 +82,7 @@ static struct sk_buff *tipc_disc_init_msg(u32 type,
 		msg = buf_msg(buf);
 		tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain);
 		msg_set_non_seq(msg, 1);
+		msg_set_node_sig(msg, tipc_random);
 		msg_set_dest_domain(msg, dest_domain);
 		msg_set_bc_netid(msg, tipc_net_id);
 		b_ptr->media->addr2msg(&b_ptr->addr, msg_media_addr(msg));
@@ -121,20 +122,22 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
 {
 	struct tipc_node *n_ptr;
 	struct tipc_link *link;
-	struct tipc_media_addr media_addr, *addr;
+	struct tipc_media_addr media_addr;
 	struct sk_buff *rbuf;
 	struct tipc_msg *msg = buf_msg(buf);
 	u32 dest = msg_dest_domain(msg);
 	u32 orig = msg_prevnode(msg);
 	u32 net_id = msg_bc_netid(msg);
 	u32 type = msg_type(msg);
+	u32 signature = msg_node_sig(msg);
+	int addr_mismatch;
 	int link_fully_up;
 
 	media_addr.broadcast = 1;
 	b_ptr->media->msg2addr(&media_addr, msg_media_addr(msg));
-	buf_discard(buf);
+	kfree_skb(buf);
 
-	/* Validate discovery message from requesting node */
+	/* Ensure message from node is valid and communication is permitted */
 	if (net_id != tipc_net_id)
 		return;
 	if (media_addr.broadcast)
@@ -162,15 +165,50 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
 	}
 	tipc_node_lock(n_ptr);
 
+	/* Prepare to validate requesting node's signature and media address */
 	link = n_ptr->links[b_ptr->identity];
+	addr_mismatch = (link != NULL) &&
+		memcmp(&link->media_addr, &media_addr, sizeof(media_addr));
 
-	/* Create a link endpoint for this bearer, if necessary */
-	if (!link) {
-		link = tipc_link_create(n_ptr, b_ptr, &media_addr);
-		if (!link) {
+	/*
+	 * Ensure discovery message's signature is correct
+	 *
+	 * If signature is incorrect and there is no working link to the node,
+	 * accept the new signature but invalidate all existing links to the
+	 * node so they won't re-activate without a new discovery message.
+	 *
+	 * If signature is incorrect and the requested link to the node is
+	 * working, accept the new signature. (This is an instance of delayed
+	 * rediscovery, where a link endpoint was able to re-establish contact
+	 * with its peer endpoint on a node that rebooted before receiving a
+	 * discovery message from that node.)
+	 *
+	 * If signature is incorrect and there is a working link to the node
+	 * that is not the requested link, reject the request (must be from
+	 * a duplicate node).
+	 */
+	if (signature != n_ptr->signature) {
+		if (n_ptr->working_links == 0) {
+			struct tipc_link *curr_link;
+			int i;
+
+			for (i = 0; i < MAX_BEARERS; i++) {
+				curr_link = n_ptr->links[i];
+				if (curr_link) {
+					memset(&curr_link->media_addr, 0,
+					       sizeof(media_addr));
+					tipc_link_reset(curr_link);
+				}
+			}
+			addr_mismatch = (link != NULL);
+		} else if (tipc_link_is_up(link) && !addr_mismatch) {
+			/* delayed rediscovery */
+		} else {
+			disc_dupl_alert(b_ptr, orig, &media_addr);
 			tipc_node_unlock(n_ptr);
 			return;
 		}
+		n_ptr->signature = signature;
 	}
 
 	/*
@@ -183,17 +221,26 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
 	 * the new media address and reset the link to ensure it starts up
 	 * cleanly.
 	 */
-	addr = &link->media_addr;
-	if (memcmp(addr, &media_addr, sizeof(*addr))) {
-		if (tipc_link_is_up(link) || (!link->started)) {
+
+	if (addr_mismatch) {
+		if (tipc_link_is_up(link)) {
 			disc_dupl_alert(b_ptr, orig, &media_addr);
 			tipc_node_unlock(n_ptr);
 			return;
+		} else {
+			memcpy(&link->media_addr, &media_addr,
+			       sizeof(media_addr));
+			tipc_link_reset(link);
+		}
+	}
+
+	/* Create a link endpoint for this bearer, if necessary */
+	if (!link) {
+		link = tipc_link_create(n_ptr, b_ptr, &media_addr);
+		if (!link) {
+			tipc_node_unlock(n_ptr);
+			return;
 		}
-		warn("Resetting link <%s>, peer interface address changed\n",
-		     link->name);
-		memcpy(addr, &media_addr, sizeof(*addr));
-		tipc_link_reset(link);
 	}
 
 	/* Accept discovery message & send response, if necessary */
@@ -203,7 +250,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
 		rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
 		if (rbuf) {
 			b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
-			buf_discard(rbuf);
+			kfree_skb(rbuf);
 		}
 	}
 
@@ -349,7 +396,7 @@ void tipc_disc_delete(struct tipc_link_req *req)
 {
 	k_cancel_timer(&req->timer);
 	k_term_timer(&req->timer);
-	buf_discard(req->buf);
+	kfree_skb(req->buf);
 	kfree(req);
 }
 
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ac1832a66f8a..b4b9b30167a3 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -484,7 +484,7 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
 
 	while (buf) {
 		next = buf->next;
-		buf_discard(buf);
+		kfree_skb(buf);
 		buf = next;
 	}
 	l_ptr->first_out = NULL;
@@ -503,7 +503,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 
 	while (buf) {
 		next = buf->next;
-		buf_discard(buf);
+		kfree_skb(buf);
 		buf = next;
 	}
 	l_ptr->defragm_buf = NULL;
@@ -522,20 +522,20 @@ void tipc_link_stop(struct tipc_link *l_ptr)
 	buf = l_ptr->oldest_deferred_in;
 	while (buf) {
 		next = buf->next;
-		buf_discard(buf);
+		kfree_skb(buf);
 		buf = next;
 	}
 
 	buf = l_ptr->first_out;
 	while (buf) {
 		next = buf->next;
-		buf_discard(buf);
+		kfree_skb(buf);
 		buf = next;
 	}
 
 	tipc_link_reset_fragments(l_ptr);
 
-	buf_discard(l_ptr->proto_msg_queue);
+	kfree_skb(l_ptr->proto_msg_queue);
 	l_ptr->proto_msg_queue = NULL;
 }
 
@@ -571,12 +571,12 @@ void tipc_link_reset(struct tipc_link *l_ptr)
 	/* Clean up all queues: */
 
 	link_release_outqueue(l_ptr);
-	buf_discard(l_ptr->proto_msg_queue);
+	kfree_skb(l_ptr->proto_msg_queue);
 	l_ptr->proto_msg_queue = NULL;
 	buf = l_ptr->oldest_deferred_in;
 	while (buf) {
 		struct sk_buff *next = buf->next;
-		buf_discard(buf);
+		kfree_skb(buf);
 		buf = next;
 	}
 	if (!list_empty(&l_ptr->waiting_ports))
@@ -810,7 +810,7 @@ static int link_bundle_buf(struct tipc_link *l_ptr,
 	skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
 	msg_set_size(bundler_msg, to_pos + size);
 	msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
-	buf_discard(buf);
+	kfree_skb(buf);
 	l_ptr->stats.sent_bundled++;
 	return 1;
 }
@@ -871,17 +871,15 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
 	u32 queue_limit = l_ptr->queue_limit[imp];
 	u32 max_packet = l_ptr->max_pkt;
 
-	msg_set_prevnode(msg, tipc_own_addr);	/* If routed message */
-
 	/* Match msg importance against queue limits: */
 
 	if (unlikely(queue_size >= queue_limit)) {
 		if (imp <= TIPC_CRITICAL_IMPORTANCE) {
 			link_schedule_port(l_ptr, msg_origport(msg), size);
-			buf_discard(buf);
+			kfree_skb(buf);
 			return -ELINKCONG;
 		}
-		buf_discard(buf);
+		kfree_skb(buf);
 		if (imp > CONN_MANAGER) {
 			warn("Resetting link <%s>, send queue full", l_ptr->name);
 			tipc_link_reset(l_ptr);
@@ -968,10 +966,10 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
 		if (l_ptr)
 			res = tipc_link_send_buf(l_ptr, buf);
 		else
-			buf_discard(buf);
+			kfree_skb(buf);
 		tipc_node_unlock(n_ptr);
 	} else {
-		buf_discard(buf);
+		kfree_skb(buf);
 	}
 	read_unlock_bh(&tipc_net_lock);
 	return res;
@@ -1018,7 +1016,7 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest)
 
 	list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
 		list_del((struct list_head *)buf);
-		buf_discard(buf);
+		kfree_skb(buf);
 	}
 }
 
@@ -1262,7 +1260,7 @@ again:
 error:
 				for (; buf_chain; buf_chain = buf) {
 					buf = buf_chain->next;
-					buf_discard(buf_chain);
+					kfree_skb(buf_chain);
 				}
 				return -EFAULT;
 			}
@@ -1316,7 +1314,7 @@ error:
 			tipc_node_unlock(node);
 			for (; buf_chain; buf_chain = buf) {
 				buf = buf_chain->next;
-				buf_discard(buf_chain);
+				kfree_skb(buf_chain);
 			}
 			goto again;
 		}
@@ -1324,7 +1322,7 @@ error:
 reject:
 		for (; buf_chain; buf_chain = buf) {
 			buf = buf_chain->next;
-			buf_discard(buf_chain);
+			kfree_skb(buf_chain);
 		}
 		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
 						 total_len, TIPC_ERR_NO_NODE);
@@ -1390,7 +1388,7 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
 		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
 			l_ptr->unacked_window = 0;
-			buf_discard(buf);
+			kfree_skb(buf);
 			l_ptr->proto_msg_queue = NULL;
 			return 0;
 		} else {
@@ -1501,13 +1499,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
 		tipc_node_lock(n_ptr);
 
 		tipc_addr_string_fill(addr_string, n_ptr->addr);
-		info("Multicast link info for %s\n", addr_string);
+		info("Broadcast link info for %s\n", addr_string);
+		info("Supportable: %d,  ", n_ptr->bclink.supportable);
 		info("Supported: %d,  ", n_ptr->bclink.supported);
 		info("Acked: %u\n", n_ptr->bclink.acked);
 		info("Last in: %u,  ", n_ptr->bclink.last_in);
-		info("Gap after: %u,  ", n_ptr->bclink.gap_after);
-		info("Gap to: %u\n", n_ptr->bclink.gap_to);
-		info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
+		info("Oos state: %u,  ", n_ptr->bclink.oos_state);
+		info("Last sent: %u\n", n_ptr->bclink.last_sent);
 
 		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
 
@@ -1679,7 +1677,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
 
 		/* Ensure message data is a single contiguous unit */
 
-		if (unlikely(buf_linearize(buf)))
+		if (unlikely(skb_linearize(buf)))
 			goto cont;
 
 		/* Handle arrival of a non-unicast link message */
@@ -1736,7 +1734,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
 
 		/* Release acked messages */
 
-		if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
+		if (n_ptr->bclink.supported)
 			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
 
 		crs = l_ptr->first_out;
@@ -1744,7 +1742,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
 		       less_eq(buf_seqno(crs), ackd)) {
 			struct sk_buff *next = crs->next;
 
-			buf_discard(crs);
+			kfree_skb(crs);
 			crs = next;
 			released++;
 		}
@@ -1773,52 +1771,56 @@ protocol_check:
 				if (unlikely(l_ptr->oldest_deferred_in))
 					head = link_insert_deferred_queue(l_ptr,
 									  head);
-				if (likely(msg_is_dest(msg, tipc_own_addr))) {
 deliver:
-					if (likely(msg_isdata(msg))) {
-						tipc_node_unlock(n_ptr);
-						tipc_port_recv_msg(buf);
-						continue;
+				if (likely(msg_isdata(msg))) {
+					tipc_node_unlock(n_ptr);
+					tipc_port_recv_msg(buf);
+					continue;
+				}
+				switch (msg_user(msg)) {
+					int ret;
+				case MSG_BUNDLER:
+					l_ptr->stats.recv_bundles++;
+					l_ptr->stats.recv_bundled +=
+						msg_msgcnt(msg);
+					tipc_node_unlock(n_ptr);
+					tipc_link_recv_bundle(buf);
+					continue;
+				case NAME_DISTRIBUTOR:
+					tipc_node_unlock(n_ptr);
+					tipc_named_recv(buf);
+					continue;
+				case CONN_MANAGER:
+					tipc_node_unlock(n_ptr);
+					tipc_port_recv_proto_msg(buf);
+					continue;
+				case MSG_FRAGMENTER:
+					l_ptr->stats.recv_fragments++;
+					ret = tipc_link_recv_fragment(
+						&l_ptr->defragm_buf,
+						&buf, &msg);
+					if (ret == 1) {
+						l_ptr->stats.recv_fragmented++;
+						goto deliver;
 					}
-					switch (msg_user(msg)) {
-					case MSG_BUNDLER:
-						l_ptr->stats.recv_bundles++;
-						l_ptr->stats.recv_bundled +=
-							msg_msgcnt(msg);
-						tipc_node_unlock(n_ptr);
-						tipc_link_recv_bundle(buf);
-						continue;
-					case NAME_DISTRIBUTOR:
-						tipc_node_unlock(n_ptr);
-						tipc_named_recv(buf);
-						continue;
-					case CONN_MANAGER:
-						tipc_node_unlock(n_ptr);
-						tipc_port_recv_proto_msg(buf);
-						continue;
-					case MSG_FRAGMENTER:
-						l_ptr->stats.recv_fragments++;
-						if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
-									    &buf, &msg)) {
-							l_ptr->stats.recv_fragmented++;
+					if (ret == -1)
+						l_ptr->next_in_no--;
+					break;
+				case CHANGEOVER_PROTOCOL:
+					type = msg_type(msg);
+					if (link_recv_changeover_msg(&l_ptr,
+								     &buf)) {
+						msg = buf_msg(buf);
+						seq_no = msg_seqno(msg);
+						if (type == ORIGINAL_MSG)
 							goto deliver;
-						}
-						break;
-					case CHANGEOVER_PROTOCOL:
-						type = msg_type(msg);
-						if (link_recv_changeover_msg(&l_ptr, &buf)) {
-							msg = buf_msg(buf);
-							seq_no = msg_seqno(msg);
-							if (type == ORIGINAL_MSG)
-								goto deliver;
-							goto protocol_check;
-						}
-						break;
-					default:
-						buf_discard(buf);
-						buf = NULL;
-						break;
+						goto protocol_check;
 					}
+					break;
+				default:
+					kfree_skb(buf);
+					buf = NULL;
+					break;
 				}
 				tipc_node_unlock(n_ptr);
 				tipc_net_route_msg(buf);
@@ -1847,23 +1849,22 @@ deliver:
 		}
 		tipc_node_unlock(n_ptr);
 cont:
-		buf_discard(buf);
+		kfree_skb(buf);
 	}
 	read_unlock_bh(&tipc_net_lock);
 }
 
 /*
- * link_defer_buf(): Sort a received out-of-sequence packet
- *                   into the deferred reception queue.
- * Returns the increase of the queue length,i.e. 0 or 1
+ * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
+ *
+ * Returns increase in queue length (i.e. 0 or 1)
  */
 
-u32 tipc_link_defer_pkt(struct sk_buff **head,
-			struct sk_buff **tail,
+u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
 			struct sk_buff *buf)
 {
-	struct sk_buff *prev = NULL;
-	struct sk_buff *crs = *head;
+	struct sk_buff *queue_buf;
+	struct sk_buff **prev;
 	u32 seq_no = buf_seqno(buf);
 
 	buf->next = NULL;
@@ -1881,31 +1882,30 @@ u32 tipc_link_defer_pkt(struct sk_buff **head,
 		return 1;
 	}
 
-	/* Scan through queue and sort it in */
-	do {
-		struct tipc_msg *msg = buf_msg(crs);
+	/* Locate insertion point in queue, then insert; discard if duplicate */
+	prev = head;
+	queue_buf = *head;
+	for (;;) {
+		u32 curr_seqno = buf_seqno(queue_buf);
 
-		if (less(seq_no, msg_seqno(msg))) {
-			buf->next = crs;
-			if (prev)
-				prev->next = buf;
-			else
-				*head = buf;
-			return 1;
+		if (seq_no == curr_seqno) {
+			kfree_skb(buf);
+			return 0;
 		}
-		if (seq_no == msg_seqno(msg))
+
+		if (less(seq_no, curr_seqno))
 			break;
-		prev = crs;
-		crs = crs->next;
-	} while (crs);
 
-	/* Message is a duplicate of an existing message */
+		prev = &queue_buf->next;
+		queue_buf = queue_buf->next;
+	}
 
-	buf_discard(buf);
-	return 0;
+	buf->next = queue_buf;
+	*prev = buf;
+	return 1;
 }
 
-/**
+/*
  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
  */
 
@@ -1930,7 +1930,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
 
 	if (less(seq_no, mod(l_ptr->next_in_no))) {
 		l_ptr->stats.duplicates++;
-		buf_discard(buf);
+		kfree_skb(buf);
 		return;
 	}
 
@@ -1956,6 +1956,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
 	u32 msg_size = sizeof(l_ptr->proto_msg);
 	int r_flag;
 
+	/* Discard any previous message that was deferred due to congestion */
+
+	if (l_ptr->proto_msg_queue) {
+		kfree_skb(l_ptr->proto_msg_queue);
+		l_ptr->proto_msg_queue = NULL;
+	}
+
 	if (link_blocked(l_ptr))
 		return;
 
@@ -1964,9 +1971,11 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
 	if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG))
 		return;
 
+	/* Create protocol message with "out-of-sequence" sequence number */
+
 	msg_set_type(msg, msg_typ);
 	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
-	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
+	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
 
 	if (msg_typ == STATE_MSG) {
@@ -2020,44 +2029,36 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
 	r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
 	msg_set_redundant_link(msg, r_flag);
 	msg_set_linkprio(msg, l_ptr->priority);
-
-	/* Ensure sequence number will not fit : */
+	msg_set_size(msg, msg_size);
 
 	msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
 
-	/* Congestion? */
-
-	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
-		if (!l_ptr->proto_msg_queue) {
-			l_ptr->proto_msg_queue =
-				tipc_buf_acquire(sizeof(l_ptr->proto_msg));
-		}
-		buf = l_ptr->proto_msg_queue;
-		if (!buf)
-			return;
-		skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
-		return;
-	}
-
-	/* Message can be sent */
-
 	buf = tipc_buf_acquire(msg_size);
 	if (!buf)
 		return;
 
 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
-	msg_set_size(buf_msg(buf), msg_size);
 
-	if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
-		l_ptr->unacked_window = 0;
-		buf_discard(buf);
+	/* Defer message if bearer is already congested */
+
+	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
+		l_ptr->proto_msg_queue = buf;
 		return;
 	}
 
-	/* New congestion */
-	tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
-	l_ptr->proto_msg_queue = buf;
-	l_ptr->stats.bearer_congs++;
+	/* Defer message if attempting to send results in bearer congestion */
+
+	if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
+		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
+		l_ptr->proto_msg_queue = buf;
+		l_ptr->stats.bearer_congs++;
+		return;
+	}
+
+	/* Discard message if it was sent successfully */
+
+	l_ptr->unacked_window = 0;
+	kfree_skb(buf);
 }
 
 /*
@@ -2105,6 +2106,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
 			l_ptr->owner->block_setup = WAIT_NODE_DOWN;
 		}
 
+		link_state_event(l_ptr, RESET_MSG);
+
 		/* fall thru' */
 	case ACTIVATE_MSG:
 		/* Update link settings according other endpoint's values */
@@ -2127,16 +2130,22 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
 		} else {
 			l_ptr->max_pkt = l_ptr->max_pkt_target;
 		}
-		l_ptr->owner->bclink.supported = (max_pkt_info != 0);
+		l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
 
-		link_state_event(l_ptr, msg_type(msg));
+		/* Synchronize broadcast link info, if not done previously */
+
+		if (!tipc_node_is_up(l_ptr->owner)) {
+			l_ptr->owner->bclink.last_sent =
+				l_ptr->owner->bclink.last_in =
+				msg_last_bcast(msg);
+			l_ptr->owner->bclink.oos_state = 0;
+		}
 
 		l_ptr->peer_session = msg_session(msg);
 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
 
-		/* Synchronize broadcast sequence numbers */
-		if (!tipc_node_redundant_links(l_ptr->owner))
-			l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
+		if (msg_type(msg) == ACTIVATE_MSG)
+			link_state_event(l_ptr, ACTIVATE_MSG);
 		break;
 	case STATE_MSG:
 
@@ -2177,7 +2186,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
 
 		/* Protocol message before retransmits, reduce loss risk */
 
-		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
+		if (l_ptr->owner->bclink.supported)
+			tipc_bclink_update_link_state(l_ptr->owner,
+						      msg_last_bcast(msg));
 
 		if (rec_gap || (msg_probe(msg))) {
 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
@@ -2191,7 +2202,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
 		break;
 	}
 exit:
-	buf_discard(buf);
+	kfree_skb(buf);
 }
 
 
@@ -2389,7 +2400,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr,
 			warn("Link changeover error, duplicate msg dropped\n");
 			goto exit;
 		}
-		buf_discard(tunnel_buf);
+		kfree_skb(tunnel_buf);
 		return 1;
 	}
 
@@ -2421,7 +2432,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr,
 	} else {
 		*buf = buf_extract(tunnel_buf, INT_H_SIZE);
 		if (*buf != NULL) {
-			buf_discard(tunnel_buf);
+			kfree_skb(tunnel_buf);
 			return 1;
 		} else {
 			warn("Link changeover error, original msg dropped\n");
@@ -2429,7 +2440,7 @@ static int link_recv_changeover_msg(struct tipc_link **l_ptr,
 	}
 exit:
 	*buf = NULL;
-	buf_discard(tunnel_buf);
+	kfree_skb(tunnel_buf);
 	return 0;
 }
 
@@ -2451,7 +2462,7 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
 		pos += align(msg_size(buf_msg(obuf)));
 		tipc_net_route_msg(obuf);
 	}
-	buf_discard(buf);
+	kfree_skb(buf);
 }
 
 /*
@@ -2500,11 +2511,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
 		}
 		fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
 		if (fragm == NULL) {
-			buf_discard(buf);
+			kfree_skb(buf);
 			while (buf_chain) {
 				buf = buf_chain;
 				buf_chain = buf_chain->next;
-				buf_discard(buf);
+				kfree_skb(buf);
 			}
 			return -ENOMEM;
 		}
@@ -2521,7 +2532,7 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
 		crs += fragm_sz;
 		msg_set_type(&fragm_hdr, FRAGMENT);
 	}
-	buf_discard(buf);
+	kfree_skb(buf);
 
 	/* Append chain of fragments to send queue & send them */
 
@@ -2608,7 +2619,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
 		if (msg_type(imsg) == TIPC_MCAST_MSG)
 			max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
 		if (msg_size(imsg) > max) {
-			buf_discard(fbuf);
+			kfree_skb(fbuf);
 			return 0;
 		}
 		pbuf = tipc_buf_acquire(msg_size(imsg));
@@ -2623,9 +2634,11 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
 			set_fragm_size(pbuf, fragm_sz);
 			set_expected_frags(pbuf, exp_fragm_cnt - 1);
 		} else {
-			warn("Link unable to reassemble fragmented message\n");
+			dbg("Link unable to reassemble fragmented message\n");
+			kfree_skb(fbuf);
+			return -1;
 		}
-		buf_discard(fbuf);
+		kfree_skb(fbuf);
 		return 0;
 	} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
 		u32 dsz = msg_data_sz(fragm);
@@ -2634,7 +2647,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
 		u32 exp_frags = get_expected_frags(pbuf) - 1;
 		skb_copy_to_linear_data_offset(pbuf, crs,
 					       msg_data(fragm), dsz);
-		buf_discard(fbuf);
+		kfree_skb(fbuf);
 
 		/* Is message complete? */
 
@@ -2651,7 +2664,7 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
 		set_expected_frags(pbuf, exp_frags);
 		return 0;
 	}
-	buf_discard(fbuf);
+	kfree_skb(fbuf);
 	return 0;
 }
 
@@ -2682,7 +2695,7 @@ static void link_check_defragm_bufs(struct tipc_link *l_ptr)
 				prev->next = buf->next;
 			else
 				l_ptr->defragm_buf = buf->next;
-			buf_discard(buf);
+			kfree_skb(buf);
 		}
 		buf = next;
 	}
@@ -3057,7 +3070,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_s
 	str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
 				  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
 	if (!str_len) {
-		buf_discard(buf);
+		kfree_skb(buf);
 		return tipc_cfg_reply_error_string("link not found");
 	}
 
diff --git a/net/tipc/log.c b/net/tipc/log.c
index 952c39f643e6..895c6e530b0b 100644
--- a/net/tipc/log.c
+++ b/net/tipc/log.c
@@ -304,7 +304,7 @@ struct sk_buff *tipc_log_resize_cmd(const void *req_tlv_area, int req_tlv_space)
 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
 
 	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
-	if (value != delimit(value, 0, 32768))
+	if (value > 32768)
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (log size must be 0-32768)");
 	if (tipc_log_resize(value))
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 3e4d3e29be61..e3afe162c0ac 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -106,7 +106,7 @@ int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
 	if (likely(res))
 		return dsz;
 
-	buf_discard(*buf);
+	kfree_skb(*buf);
 	*buf = NULL;
 	return -EFAULT;
 }
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7b0cda167107..eba524e34a6b 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -384,11 +384,6 @@ static inline void msg_set_destnode(struct tipc_msg *m, u32 a)
 	msg_set_word(m, 7, a);
 }
 
-static inline int msg_is_dest(struct tipc_msg *m, u32 d)
-{
-	return msg_short(m) || (msg_destnode(m) == d);
-}
-
 static inline u32 msg_nametype(struct tipc_msg *m)
 {
 	return msg_word(m, 8);
@@ -517,6 +512,16 @@ static inline void msg_set_seq_gap(struct tipc_msg *m, u32 n)
 	msg_set_bits(m, 1, 16, 0x1fff, n);
 }
 
+static inline u32 msg_node_sig(struct tipc_msg *m)
+{
+	return msg_bits(m, 1, 0, 0xffff);
+}
+
+static inline void msg_set_node_sig(struct tipc_msg *m, u32 n)
+{
+	msg_set_bits(m, 1, 0, 0xffff, n);
+}
+
 
 /*
  * Word 2
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 98ebb37f1808..d57da6159616 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -120,7 +120,7 @@ static void named_cluster_distribute(struct sk_buff *buf)
 		}
 	}
 
-	buf_discard(buf);
+	kfree_skb(buf);
 }
 
 /**
@@ -239,9 +239,6 @@ exit:
  *
  * Invoked for each publication issued by a newly failed node.
  * Removes publication structure from name table & deletes it.
- * In rare cases the link may have come back up again when this
- * function is called, and we have two items representing the same
- * publication. Nudge this item's key to distinguish it from the other.
  */
 
 static void named_purge_publ(struct publication *publ)
@@ -249,7 +246,6 @@ static void named_purge_publ(struct publication *publ)
 	struct publication *p;
 
 	write_lock_bh(&tipc_nametbl_lock);
-	publ->key += 1222345;
 	p = tipc_nametbl_remove_publ(publ->type, publ->lower,
 				     publ->node, publ->ref, publ->key);
 	if (p)
@@ -316,7 +312,7 @@ void tipc_named_recv(struct sk_buff *buf)
 		item++;
 	}
 	write_unlock_bh(&tipc_nametbl_lock);
-	buf_discard(buf);
+	kfree_skb(buf);
 }
 
 /**
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 89eb5621ebba..c6a1ae36952e 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -114,10 +114,8 @@ struct name_table {
 };
 
 static struct name_table table;
-static atomic_t rsv_publ_ok = ATOMIC_INIT(0);
 DEFINE_RWLOCK(tipc_nametbl_lock);
 
-
 static int hash(int x)
 {
 	return x & (tipc_nametbl_size - 1);
@@ -270,6 +268,13 @@ static struct publication *tipc_nameseq_insert_publ(struct name_seq *nseq,
 		}
 
 		info = sseq->info;
+
+		/* Check if an identical publication already exists */
+		list_for_each_entry(publ, &info->zone_list, zone_list) {
+			if ((publ->ref == port) && (publ->key == key) &&
+			    (!publ->node || (publ->node == node)))
+				return NULL;
+		}
 	} else {
 		u32 inspos;
 		struct sub_seq *freesseq;
@@ -534,10 +539,17 @@ struct publication *tipc_nametbl_remove_publ(u32 type, u32 lower,
 }
 
 /*
- * tipc_nametbl_translate - translate name to port id
+ * tipc_nametbl_translate - perform name translation
+ *
+ * On entry, 'destnode' is the search domain used during translation.
  *
- * Note: on entry 'destnode' is the search domain used during translation;
- *       on exit it passes back the node address of the matching port (if any)
+ * On exit:
+ * - if name translation is deferred to another node/cluster/zone,
+ *   leaves 'destnode' unchanged (will be non-zero) and returns 0
+ * - if name translation is attempted and succeeds, sets 'destnode'
+ *   to publishing node and returns port reference (will be non-zero)
+ * - if name translation is attempted and fails, sets 'destnode' to 0
+ *   and returns 0
  */
 
 u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
@@ -547,6 +559,7 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
 	struct publication *publ;
 	struct name_seq *seq;
 	u32 ref = 0;
+	u32 node = 0;
 
 	if (!tipc_in_scope(*destnode, tipc_own_addr))
 		return 0;
@@ -604,11 +617,12 @@ u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *destnode)
 	}
 
 	ref = publ->ref;
-	*destnode = publ->node;
+	node = publ->node;
 no_match:
 	spin_unlock_bh(&seq->lock);
 not_found:
 	read_unlock_bh(&tipc_nametbl_lock);
+	*destnode = node;
 	return ref;
 }
 
@@ -665,22 +679,7 @@ exit:
 	return res;
 }
 
-/**
- * tipc_nametbl_publish_rsv - publish port name using a reserved name type
- */
-
-int tipc_nametbl_publish_rsv(u32 ref, unsigned int scope,
-			struct tipc_name_seq const *seq)
-{
-	int res;
-
-	atomic_inc(&rsv_publ_ok);
-	res = tipc_publish(ref, scope, seq);
-	atomic_dec(&rsv_publ_ok);
-	return res;
-}
-
-/**
+/*
  * tipc_nametbl_publish - add name publication to network name tables
  */
 
@@ -694,11 +693,6 @@ struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 		     tipc_max_publications);
 		return NULL;
 	}
-	if ((type < TIPC_RESERVED_TYPES) && !atomic_read(&rsv_publ_ok)) {
-		warn("Publication failed, reserved name {%u,%u,%u}\n",
-		     type, lower, upper);
-		return NULL;
-	}
 
 	write_lock_bh(&tipc_nametbl_lock);
 	table.local_publ_count++;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 8086b42f92ad..207d59ebf849 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -91,8 +91,6 @@ struct sk_buff *tipc_nametbl_get(const void *req_tlv_area, int req_tlv_space);
 u32 tipc_nametbl_translate(u32 type, u32 instance, u32 *node);
 int tipc_nametbl_mc_translate(u32 type, u32 lower, u32 upper, u32 limit,
 			 struct tipc_port_list *dports);
-int tipc_nametbl_publish_rsv(u32 ref, unsigned int scope,
-			struct tipc_name_seq const *seq);
 struct publication *tipc_nametbl_publish(u32 type, u32 lower, u32 upper,
 				    u32 scope, u32 port_ref, u32 key);
 int tipc_nametbl_withdraw(u32 type, u32 lower, u32 ref, u32 key);
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 61afee7e8291..d4531b07076c 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -117,7 +117,7 @@ static void net_route_named_msg(struct sk_buff *buf)
 	u32 dport;
 
 	if (!msg_named(msg)) {
-		buf_discard(buf);
+		kfree_skb(buf);
 		return;
 	}
 
@@ -161,7 +161,7 @@ void tipc_net_route_msg(struct sk_buff *buf)
 			tipc_port_recv_proto_msg(buf);
 			break;
 		default:
-			buf_discard(buf);
+			kfree_skb(buf);
 		}
 		return;
 	}
@@ -175,14 +175,10 @@ int tipc_net_start(u32 addr)
 {
 	char addr_string[16];
 
-	if (tipc_mode != TIPC_NODE_MODE)
-		return -ENOPROTOOPT;
-
 	tipc_subscr_stop();
 	tipc_cfg_stop();
 
 	tipc_own_addr = addr;
-	tipc_mode = TIPC_NET_MODE;
 	tipc_named_reinit();
 	tipc_port_reinit();
 
@@ -201,10 +197,9 @@ void tipc_net_stop(void)
 {
 	struct tipc_node *node, *t_node;
 
-	if (tipc_mode != TIPC_NET_MODE)
+	if (!tipc_own_addr)
 		return;
 	write_lock_bh(&tipc_net_lock);
-	tipc_mode = TIPC_NODE_MODE;
 	tipc_bearer_stop();
 	tipc_bclink_stop();
 	list_for_each_entry_safe(node, t_node, &tipc_node_list, list)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6b226faad89f..a34cabc2c43a 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -39,6 +39,8 @@
 #include "node.h"
 #include "name_distr.h"
 
+#define NODE_HTABLE_SIZE 512
+
 static void node_lost_contact(struct tipc_node *n_ptr);
 static void node_established_contact(struct tipc_node *n_ptr);
 
@@ -49,9 +51,19 @@ LIST_HEAD(tipc_node_list);
 static u32 tipc_num_nodes;
 
 static atomic_t tipc_num_links = ATOMIC_INIT(0);
-u32 tipc_own_tag;
 
-/**
+/*
+ * A trivial power-of-two bitmask technique is used for speed, since this
+ * operation is done for every incoming TIPC packet. The number of hash table
+ * entries has been chosen so that no hash chain exceeds 8 nodes and will
+ * usually be much smaller (typically only a single node).
+ */
+static inline unsigned int tipc_hashfn(u32 addr)
+{
+	return addr & (NODE_HTABLE_SIZE - 1);
+}
+
+/*
  * tipc_node_find - locate specified node object, if it exists
  */
 
@@ -113,6 +125,7 @@ struct tipc_node *tipc_node_create(u32 addr)
 	}
 	list_add_tail(&n_ptr->list, &temp_node->list);
 	n_ptr->block_setup = WAIT_PEER_DOWN;
+	n_ptr->signature = INVALID_NODE_SIG;
 
 	tipc_num_nodes++;
 
@@ -253,63 +266,14 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 	n_ptr->link_cnt--;
 }
 
-/*
- * Routing table management - five cases to handle:
- *
- * 1: A link towards a zone/cluster external node comes up.
- *    => Send a multicast message updating routing tables of all
- *    system nodes within own cluster that the new destination
- *    can be reached via this node.
- *    (node.establishedContact()=>cluster.multicastNewRoute())
- *
- * 2: A link towards a slave node comes up.
- *    => Send a multicast message updating routing tables of all
- *    system nodes within own cluster that the new destination
- *    can be reached via this node.
- *    (node.establishedContact()=>cluster.multicastNewRoute())
- *    => Send a  message to the slave node about existence
- *    of all system nodes within cluster:
- *    (node.establishedContact()=>cluster.sendLocalRoutes())
- *
- * 3: A new cluster local system node becomes available.
- *    => Send message(s) to this particular node containing
- *    information about all cluster external and slave
- *     nodes which can be reached via this node.
- *    (node.establishedContact()==>network.sendExternalRoutes())
- *    (node.establishedContact()==>network.sendSlaveRoutes())
- *    => Send messages to all directly connected slave nodes
- *    containing information about the existence of the new node
- *    (node.establishedContact()=>cluster.multicastNewRoute())
- *
- * 4: The link towards a zone/cluster external node or slave
- *    node goes down.
- *    => Send a multcast message updating routing tables of all
- *    nodes within cluster that the new destination can not any
- *    longer be reached via this node.
- *    (node.lostAllLinks()=>cluster.bcastLostRoute())
- *
- * 5: A cluster local system node becomes unavailable.
- *    => Remove all references to this node from the local
- *    routing tables. Note: This is a completely node
- *    local operation.
- *    (node.lostAllLinks()=>network.removeAsRouter())
- *    => Send messages to all directly connected slave nodes
- *    containing information about loss of the node
- *    (node.establishedContact()=>cluster.multicastLostRoute())
- *
- */
-
 static void node_established_contact(struct tipc_node *n_ptr)
 {
 	tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
 
-	/* Syncronize broadcast acks */
-	n_ptr->bclink.acked = tipc_bclink_get_last_sent();
-
-	if (n_ptr->bclink.supported) {
+	if (n_ptr->bclink.supportable) {
+		n_ptr->bclink.acked = tipc_bclink_get_last_sent();
 		tipc_bclink_add_node(n_ptr->addr);
-		if (n_ptr->addr < tipc_own_addr)
-			tipc_own_tag++;
+		n_ptr->bclink.supported = 1;
 	}
 }
 
@@ -338,22 +302,20 @@ static void node_lost_contact(struct tipc_node *n_ptr)
 	/* Flush broadcast link info associated with lost node */
 
 	if (n_ptr->bclink.supported) {
-		n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
 		while (n_ptr->bclink.deferred_head) {
 			struct sk_buff *buf = n_ptr->bclink.deferred_head;
 			n_ptr->bclink.deferred_head = buf->next;
-			buf_discard(buf);
+			kfree_skb(buf);
 		}
+		n_ptr->bclink.deferred_size = 0;
 
 		if (n_ptr->bclink.defragm) {
-			buf_discard(n_ptr->bclink.defragm);
+			kfree_skb(n_ptr->bclink.defragm);
 			n_ptr->bclink.defragm = NULL;
 		}
 
 		tipc_bclink_remove_node(n_ptr->addr);
 		tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
-		if (n_ptr->addr < tipc_own_addr)
-			tipc_own_tag--;
 
 		n_ptr->bclink.supported = 0;
 	}
@@ -444,12 +406,12 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (network address)");
 
-	if (tipc_mode != TIPC_NET_MODE)
+	if (!tipc_own_addr)
 		return tipc_cfg_reply_none();
 
 	read_lock_bh(&tipc_net_lock);
 
-	/* Get space for all unicast links + multicast link */
+	/* Get space for all unicast links + broadcast link */
 
 	payload_size = TLV_SPACE(sizeof(link_info)) *
 		(atomic_read(&tipc_num_links) + 1);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 0b1c5f8b6996..72561c971d67 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -42,6 +42,11 @@
 #include "net.h"
 #include "bearer.h"
 
+/*
+ * Out-of-range value for node signature
+ */
+#define INVALID_NODE_SIG 0x10000
+
 /* Flags used to block (re)establishment of contact with a neighboring node */
 
 #define WAIT_PEER_DOWN	0x0001	/* wait to see that peer's links are down */
@@ -61,13 +66,15 @@
  * @block_setup: bit mask of conditions preventing link establishment to node
  * @link_cnt: number of links to node
  * @permit_changeover: non-zero if node has redundant links to this system
+ * @signature: node instance identifier
  * @bclink: broadcast-related info
+ *    @supportable: non-zero if node supports TIPC b'cast link capability
  *    @supported: non-zero if node supports TIPC b'cast capability
  *    @acked: sequence # of last outbound b'cast message acknowledged by node
  *    @last_in: sequence # of last in-sequence b'cast message received from node
- *    @gap_after: sequence # of last message not requiring a NAK request
- *    @gap_to: sequence # of last message requiring a NAK request
- *    @nack_sync: counter that determines when NAK requests should be sent
+ *    @last_sent: sequence # of last b'cast message sent by node
+ *    @oos_state: state tracker for handling OOS b'cast messages
+ *    @deferred_size: number of OOS b'cast messages in deferred queue
  *    @deferred_head: oldest OOS b'cast message received from node
  *    @deferred_tail: newest OOS b'cast message received from node
  *    @defragm: list of partially reassembled b'cast message fragments from node
@@ -85,35 +92,23 @@ struct tipc_node {
 	int working_links;
 	int block_setup;
 	int permit_changeover;
+	u32 signature;
 	struct {
-		int supported;
+		u8 supportable;
+		u8 supported;
 		u32 acked;
 		u32 last_in;
-		u32 gap_after;
-		u32 gap_to;
-		u32 nack_sync;
+		u32 last_sent;
+		u32 oos_state;
+		u32 deferred_size;
 		struct sk_buff *deferred_head;
 		struct sk_buff *deferred_tail;
 		struct sk_buff *defragm;
 	} bclink;
 };
 
-#define NODE_HTABLE_SIZE 512
 extern struct list_head tipc_node_list;
 
-/*
- * A trivial power-of-two bitmask technique is used for speed, since this
- * operation is done for every incoming TIPC packet. The number of hash table
- * entries has been chosen so that no hash chain exceeds 8 nodes and will
- * usually be much smaller (typically only a single node).
- */
-static inline unsigned int tipc_hashfn(u32 addr)
-{
-	return addr & (NODE_HTABLE_SIZE - 1);
-}
-
-extern u32 tipc_own_tag;
-
 struct tipc_node *tipc_node_find(u32 addr);
 struct tipc_node *tipc_node_create(u32 addr);
 void tipc_node_delete(struct tipc_node *n_ptr);
diff --git a/net/tipc/port.c b/net/tipc/port.c
index d91efc69e6f9..94d2904cce66 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -116,13 +116,13 @@ int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
 			ibuf = skb_copy(buf, GFP_ATOMIC);
 			if (ibuf == NULL) {
 				tipc_port_list_free(&dports);
-				buf_discard(buf);
+				kfree_skb(buf);
 				return -ENOMEM;
 			}
 		}
 		res = tipc_bclink_send_msg(buf);
 		if ((res < 0) && (dports.count != 0))
-			buf_discard(ibuf);
+			kfree_skb(ibuf);
 	} else {
 		ibuf = buf;
 	}
@@ -187,7 +187,7 @@ void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
 		}
 	}
 exit:
-	buf_discard(buf);
+	kfree_skb(buf);
 	tipc_port_list_free(dp);
 }
 
@@ -400,15 +400,16 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
 
 	/* send self-abort message when rejecting on a connected port */
 	if (msg_connected(msg)) {
-		struct sk_buff *abuf = NULL;
 		struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
 
 		if (p_ptr) {
+			struct sk_buff *abuf = NULL;
+
 			if (p_ptr->connected)
 				abuf = port_build_self_abort_msg(p_ptr, err);
 			tipc_port_unlock(p_ptr);
+			tipc_net_route_msg(abuf);
 		}
-		tipc_net_route_msg(abuf);
 	}
 
 	/* send returned message & dispose of rejected message */
@@ -419,7 +420,7 @@ int tipc_reject_msg(struct sk_buff *buf, u32 err)
 	else
 		tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
 exit:
-	buf_discard(buf);
+	kfree_skb(buf);
 	return data_sz;
 }
 
@@ -567,7 +568,7 @@ void tipc_port_recv_proto_msg(struct sk_buff *buf)
 	tipc_port_unlock(p_ptr);
 exit:
 	tipc_net_route_msg(r_buf);
-	buf_discard(buf);
+	kfree_skb(buf);
 }
 
 static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
@@ -758,7 +759,7 @@ static void port_dispatcher_sigh(void *dummy)
 			}
 		}
 		if (buf)
-			buf_discard(buf);
+			kfree_skb(buf);
 		buf = next;
 		continue;
 err:
@@ -812,7 +813,7 @@ err:
 			}
 		}
 		if (buf)
-			buf_discard(buf);
+			kfree_skb(buf);
 		buf = next;
 		continue;
 reject:
@@ -1053,8 +1054,6 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
 	msg = &p_ptr->phdr;
 	msg_set_destnode(msg, peer->node);
 	msg_set_destport(msg, peer->ref);
-	msg_set_orignode(msg, tipc_own_addr);
-	msg_set_origport(msg, p_ptr->ref);
 	msg_set_type(msg, TIPC_CONN_MSG);
 	msg_set_lookup_scope(msg, 0);
 	msg_set_hdr_sz(msg, SHORT_H_SIZE);
@@ -1132,6 +1131,49 @@ int tipc_shutdown(u32 ref)
 	return tipc_disconnect(ref);
 }
 
+/**
+ * tipc_port_recv_msg - receive message from lower layer and deliver to port user
+ */
+
+int tipc_port_recv_msg(struct sk_buff *buf)
+{
+	struct tipc_port *p_ptr;
+	struct tipc_msg *msg = buf_msg(buf);
+	u32 destport = msg_destport(msg);
+	u32 dsz = msg_data_sz(msg);
+	u32 err;
+
+	/* forward unresolved named message */
+	if (unlikely(!destport)) {
+		tipc_net_route_msg(buf);
+		return dsz;
+	}
+
+	/* validate destination & pass to port, otherwise reject message */
+	p_ptr = tipc_port_lock(destport);
+	if (likely(p_ptr)) {
+		if (likely(p_ptr->connected)) {
+			if ((unlikely(msg_origport(msg) !=
+				      tipc_peer_port(p_ptr))) ||
+			    (unlikely(msg_orignode(msg) !=
+				      tipc_peer_node(p_ptr))) ||
+			    (unlikely(!msg_connected(msg)))) {
+				err = TIPC_ERR_NO_PORT;
+				tipc_port_unlock(p_ptr);
+				goto reject;
+			}
+		}
+		err = p_ptr->dispatcher(p_ptr, buf);
+		tipc_port_unlock(p_ptr);
+		if (likely(!err))
+			return dsz;
+	} else {
+		err = TIPC_ERR_NO_PORT;
+	}
+reject:
+	return tipc_reject_msg(buf, err);
+}
+
 /*
  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
  *                        message for this node.
@@ -1210,8 +1252,6 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
 
 	msg = &p_ptr->phdr;
 	msg_set_type(msg, TIPC_NAMED_MSG);
-	msg_set_orignode(msg, tipc_own_addr);
-	msg_set_origport(msg, ref);
 	msg_set_hdr_sz(msg, NAMED_H_SIZE);
 	msg_set_nametype(msg, name->type);
 	msg_set_nameinst(msg, name->instance);
@@ -1220,7 +1260,7 @@ int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
 	msg_set_destnode(msg, destnode);
 	msg_set_destport(msg, destport);
 
-	if (likely(destport)) {
+	if (likely(destport || destnode)) {
 		if (likely(destnode == tipc_own_addr))
 			res = tipc_port_recv_sections(p_ptr, num_sect,
 						      msg_sect, total_len);
@@ -1261,8 +1301,6 @@ int tipc_send2port(u32 ref, struct tipc_portid const *dest,
 	msg = &p_ptr->phdr;
 	msg_set_type(msg, TIPC_DIRECT_MSG);
 	msg_set_lookup_scope(msg, 0);
-	msg_set_orignode(msg, tipc_own_addr);
-	msg_set_origport(msg, ref);
 	msg_set_destnode(msg, dest->node);
 	msg_set_destport(msg, dest->ref);
 	msg_set_hdr_sz(msg, BASIC_H_SIZE);
@@ -1301,8 +1339,6 @@ int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
 
 	msg = &p_ptr->phdr;
 	msg_set_type(msg, TIPC_DIRECT_MSG);
-	msg_set_orignode(msg, tipc_own_addr);
-	msg_set_origport(msg, ref);
 	msg_set_destnode(msg, dest->node);
 	msg_set_destport(msg, dest->ref);
 	msg_set_hdr_sz(msg, BASIC_H_SIZE);
diff --git a/net/tipc/port.h b/net/tipc/port.h
index f751807e2a91..9b88531e5a61 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -205,6 +205,7 @@ int tipc_disconnect_port(struct tipc_port *tp_ptr);
 /*
  * TIPC messaging routines
  */
+int tipc_port_recv_msg(struct sk_buff *buf);
 int tipc_send(u32 portref, unsigned int num_sect, struct iovec const *msg_sect,
 	      unsigned int total_len);
 
@@ -271,45 +272,4 @@ static inline int tipc_port_congested(struct tipc_port *p_ptr)
 	return (p_ptr->sent - p_ptr->acked) >= (TIPC_FLOW_CONTROL_WIN * 2);
 }
 
-/**
- * tipc_port_recv_msg - receive message from lower layer and deliver to port user
- */
-
-static inline int tipc_port_recv_msg(struct sk_buff *buf)
-{
-	struct tipc_port *p_ptr;
-	struct tipc_msg *msg = buf_msg(buf);
-	u32 destport = msg_destport(msg);
-	u32 dsz = msg_data_sz(msg);
-	u32 err;
-
-	/* forward unresolved named message */
-	if (unlikely(!destport)) {
-		tipc_net_route_msg(buf);
-		return dsz;
-	}
-
-	/* validate destination & pass to port, otherwise reject message */
-	p_ptr = tipc_port_lock(destport);
-	if (likely(p_ptr)) {
-		if (likely(p_ptr->connected)) {
-			if ((unlikely(msg_origport(msg) != tipc_peer_port(p_ptr))) ||
-			    (unlikely(msg_orignode(msg) != tipc_peer_node(p_ptr))) ||
-			    (unlikely(!msg_connected(msg)))) {
-				err = TIPC_ERR_NO_PORT;
-				tipc_port_unlock(p_ptr);
-				goto reject;
-			}
-		}
-		err = p_ptr->dispatcher(p_ptr, buf);
-		tipc_port_unlock(p_ptr);
-		if (likely(!err))
-			return dsz;
-	} else {
-		err = TIPC_ERR_NO_PORT;
-	}
-reject:
-	return tipc_reject_msg(buf, err);
-}
-
 #endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e2f7c5d370ba..29e957f64458 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -126,7 +126,7 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
 
 static void advance_rx_queue(struct sock *sk)
 {
-	buf_discard(__skb_dequeue(&sk->sk_receive_queue));
+	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 	atomic_dec(&tipc_queue_size);
 }
 
@@ -142,7 +142,7 @@ static void discard_rx_queue(struct sock *sk)
 
 	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
 		atomic_dec(&tipc_queue_size);
-		buf_discard(buf);
+		kfree_skb(buf);
 	}
 }
 
@@ -288,7 +288,7 @@ static int release(struct socket *sock)
 			break;
 		atomic_dec(&tipc_queue_size);
 		if (TIPC_SKB_CB(buf)->handle != 0)
-			buf_discard(buf);
+			kfree_skb(buf);
 		else {
 			if ((sock->state == SS_CONNECTING) ||
 			    (sock->state == SS_CONNECTED)) {
@@ -355,6 +355,9 @@ static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
 	else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
 		return -EAFNOSUPPORT;
 
+	if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
+		return -EACCES;
+
 	return (addr->scope > 0) ?
 		tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
 		tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
@@ -1612,7 +1615,7 @@ restart:
 		if (buf) {
 			atomic_dec(&tipc_queue_size);
 			if (TIPC_SKB_CB(buf)->handle != 0) {
-				buf_discard(buf);
+				kfree_skb(buf);
 				goto restart;
 			}
 			tipc_disconnect(tport->ref);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 8c49566da8f3..b2964e9895d3 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -552,7 +552,7 @@ int tipc_subscr_start(void)
 	if (res)
 		goto failed;
 
-	res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
+	res = tipc_publish(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
 	if (res) {
 		tipc_deleteport(topsrv.setup_port);
 		topsrv.setup_port = 0;