summary refs log tree commit diff
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c226
-rw-r--r--net/tipc/bcast.h2
-rw-r--r--net/tipc/link.c21
-rw-r--r--net/tipc/node.c4
-rw-r--r--net/tipc/node.h12
5 files changed, 100 insertions, 165 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index facc216c6a92..1f3b1607d9d4 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -157,39 +157,14 @@ u32 tipc_bclink_get_last_sent(void)
 	return bcl->fsm_msg_cnt;
 }
 
-/**
- * bclink_set_gap - set gap according to contents of current deferred pkt queue
- *
- * Called with 'node' locked, bc_lock unlocked
- */
-
-static void bclink_set_gap(struct tipc_node *n_ptr)
-{
-	struct sk_buff *buf = n_ptr->bclink.deferred_head;
-
-	n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
-		mod(n_ptr->bclink.last_in);
-	if (unlikely(buf != NULL))
-		n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
-}
-
-/**
- * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
- *
- * This mechanism endeavours to prevent all nodes in network from trying
- * to ACK or NACK at the same time.
- *
- * Note: TIPC uses a different trigger to distribute ACKs than it does to
- *       distribute NACKs, but tries to use the same spacing (divide by 16).
- */
-
-static int bclink_ack_allowed(u32 n)
+static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
 {
-	return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag;
+	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
+						seqno : node->bclink.last_sent;
 }
 
 
-/**
+/*
  * tipc_bclink_retransmit_to - get most recent node to request retransmission
  *
  * Called with bc_lock locked
@@ -300,44 +275,56 @@ exit:
 	spin_unlock_bh(&bc_lock);
 }
 
-/**
- * bclink_send_ack - unicast an ACK msg
+/*
+ * tipc_bclink_update_link_state - update broadcast link state
  *
  * tipc_net_lock and node lock set
  */
 
-static void bclink_send_ack(struct tipc_node *n_ptr)
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
 {
-	struct tipc_link *l_ptr = n_ptr->active_links[n_ptr->addr & 1];
+	struct sk_buff *buf;
 
-	if (l_ptr != NULL)
-		tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
-}
+	/* Ignore "stale" link state info */
 
-/**
- * bclink_send_nack- broadcast a NACK msg
- *
- * tipc_net_lock and node lock set
- */
+	if (less_eq(last_sent, n_ptr->bclink.last_in))
+		return;
 
-static void bclink_send_nack(struct tipc_node *n_ptr)
-{
-	struct sk_buff *buf;
-	struct tipc_msg *msg;
+	/* Update link synchronization state; quit if in sync */
+
+	bclink_update_last_sent(n_ptr, last_sent);
+
+	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
+		return;
+
+	/* Update out-of-sync state; quit if loss is still unconfirmed */
 
-	if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to))
+	if ((++n_ptr->bclink.oos_state) == 1) {
+		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
+			return;
+		n_ptr->bclink.oos_state++;
+	}
+
+	/* Don't NACK if one has been recently sent (or seen) */
+
+	if (n_ptr->bclink.oos_state & 0x1)
 		return;
 
+	/* Send NACK */
+
 	buf = tipc_buf_acquire(INT_H_SIZE);
 	if (buf) {
-		msg = buf_msg(buf);
+		struct tipc_msg *msg = buf_msg(buf);
+
 		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
-			 INT_H_SIZE, n_ptr->addr);
+			      INT_H_SIZE, n_ptr->addr);
 		msg_set_non_seq(msg, 1);
 		msg_set_mc_netid(msg, tipc_net_id);
-		msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
-		msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
-		msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
+		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
+		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
+		msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
+				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
+				 : n_ptr->bclink.last_sent);
 		msg_set_bcast_tag(msg, tipc_own_tag);
 
 		spin_lock_bh(&bc_lock);
@@ -346,96 +333,37 @@ static void bclink_send_nack(struct tipc_node *n_ptr)
 		spin_unlock_bh(&bc_lock);
 		buf_discard(buf);
 
-		/*
-		 * Ensure we doesn't send another NACK msg to the node
-		 * until 16 more deferred messages arrive from it
-		 * (i.e. helps prevent all nodes from NACK'ing at same time)
-		 */
-
-		n_ptr->bclink.nack_sync = tipc_own_tag;
+		n_ptr->bclink.oos_state++;
 	}
 }
 
-/**
- * tipc_bclink_check_gap - send a NACK if a sequence gap exists
+/*
+ * bclink_peek_nack - monitor retransmission requests sent by other nodes
  *
- * tipc_net_lock and node lock set
- */
-
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent)
-{
-	if (!n_ptr->bclink.supported ||
-	    less_eq(last_sent, mod(n_ptr->bclink.last_in)))
-		return;
-
-	bclink_set_gap(n_ptr);
-	if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
-		n_ptr->bclink.gap_to = last_sent;
-	bclink_send_nack(n_ptr);
-}
-
-/**
- * tipc_bclink_peek_nack - process a NACK msg meant for another node
+ * Delay any upcoming NACK by this node if another node has already
+ * requested the first message this node is going to ask for.
  *
  * Only tipc_net_lock set.
  */
 
-static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to)
+static void bclink_peek_nack(struct tipc_msg *msg)
 {
-	struct tipc_node *n_ptr = tipc_node_find(dest);
-	u32 my_after, my_to;
+	struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
 
-	if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr)))
+	if (unlikely(!n_ptr))
 		return;
+
 	tipc_node_lock(n_ptr);
-	/*
-	 * Modify gap to suppress unnecessary NACKs from this node
-	 */
-	my_after = n_ptr->bclink.gap_after;
-	my_to = n_ptr->bclink.gap_to;
-
-	if (less_eq(gap_after, my_after)) {
-		if (less(my_after, gap_to) && less(gap_to, my_to))
-			n_ptr->bclink.gap_after = gap_to;
-		else if (less_eq(my_to, gap_to))
-			n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
-	} else if (less_eq(gap_after, my_to)) {
-		if (less_eq(my_to, gap_to))
-			n_ptr->bclink.gap_to = gap_after;
-	} else {
-		/*
-		 * Expand gap if missing bufs not in deferred queue:
-		 */
-		struct sk_buff *buf = n_ptr->bclink.deferred_head;
-		u32 prev = n_ptr->bclink.gap_to;
 
-		for (; buf; buf = buf->next) {
-			u32 seqno = buf_seqno(buf);
+	if (n_ptr->bclink.supported &&
+	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
+	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
+		n_ptr->bclink.oos_state = 2;
 
-			if (mod(seqno - prev) != 1) {
-				buf = NULL;
-				break;
-			}
-			if (seqno == gap_after)
-				break;
-			prev = seqno;
-		}
-		if (buf == NULL)
-			n_ptr->bclink.gap_to = gap_after;
-	}
-	/*
-	 * Some nodes may send a complementary NACK now:
-	 */
-	if (bclink_ack_allowed(sender_tag + 1)) {
-		if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
-			bclink_send_nack(n_ptr);
-			bclink_set_gap(n_ptr);
-		}
-	}
 	tipc_node_unlock(n_ptr);
 }
 
-/**
+/*
  * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
  */
 
@@ -505,10 +433,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
 			spin_unlock_bh(&bc_lock);
 		} else {
 			tipc_node_unlock(node);
-			tipc_bclink_peek_nack(msg_destnode(msg),
-					      msg_bcast_tag(msg),
-					      msg_bcgap_after(msg),
-					      msg_bcgap_to(msg));
+			bclink_peek_nack(msg);
 		}
 		goto exit;
 	}
@@ -519,16 +444,28 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
 	next_in = mod(node->bclink.last_in + 1);
 
 	if (likely(seqno == next_in)) {
+		bclink_update_last_sent(node, seqno);
 receive:
+		node->bclink.last_in = seqno;
+		node->bclink.oos_state = 0;
+
 		spin_lock_bh(&bc_lock);
 		bcl->stats.recv_info++;
-		node->bclink.last_in++;
-		bclink_set_gap(node);
-		if (unlikely(bclink_ack_allowed(seqno))) {
-			bclink_send_ack(node);
+
+		/*
+		 * Unicast an ACK periodically, ensuring that
+		 * all nodes in the cluster don't ACK at the same time
+		 */
+
+		if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
+			tipc_link_send_proto_msg(
+				node->active_links[node->addr & 1],
+				STATE_MSG, 0, 0, 0, 0, 0);
 			bcl->stats.sent_acks++;
 		}
 
+		/* Deliver message to destination */
+
 		if (likely(msg_isdata(msg))) {
 			spin_unlock_bh(&bc_lock);
 			tipc_node_unlock(node);
@@ -567,9 +504,14 @@ receive:
 		if (unlikely(!tipc_node_is_up(node)))
 			goto unlock;
 
-		if (!node->bclink.deferred_head)
+		if (node->bclink.last_in == node->bclink.last_sent)
 			goto unlock;
 
+		if (!node->bclink.deferred_head) {
+			node->bclink.oos_state = 1;
+			goto unlock;
+		}
+
 		msg = buf_msg(node->bclink.deferred_head);
 		seqno = msg_seqno(msg);
 		next_in = mod(next_in + 1);
@@ -580,31 +522,19 @@ receive:
 
 		buf = node->bclink.deferred_head;
 		node->bclink.deferred_head = buf->next;
+		node->bclink.deferred_size--;
 		goto receive;
 	}
 
 	/* Handle out-of-sequence broadcast message */
 
 	if (less(next_in, seqno)) {
-		u32 gap_after = node->bclink.gap_after;
-		u32 gap_to = node->bclink.gap_to;
-
 		deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
 					       &node->bclink.deferred_tail,
 					       buf);
-		if (deferred) {
-			node->bclink.nack_sync++;
-			if (seqno == mod(gap_after + 1))
-				node->bclink.gap_after = seqno;
-			else if (less(gap_after, seqno) && less(seqno, gap_to))
-				node->bclink.gap_to = seqno;
-		}
+		node->bclink.deferred_size += deferred;
+		bclink_update_last_sent(node, seqno);
 		buf = NULL;
-		if (bclink_ack_allowed(node->bclink.nack_sync)) {
-			if (gap_to != gap_after)
-				bclink_send_nack(node);
-			bclink_set_gap(node);
-		}
 	} else
 		deferred = 0;
 
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index b009666c60b0..5571394098f9 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -96,7 +96,7 @@ int  tipc_bclink_send_msg(struct sk_buff *buf);
 void tipc_bclink_recv_pkt(struct sk_buff *buf);
 u32  tipc_bclink_get_last_sent(void);
 u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr);
-void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 seqno);
+void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent);
 int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 1150ba5a648b..cce953723ddb 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1501,14 +1501,13 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
 		tipc_node_lock(n_ptr);
 
 		tipc_addr_string_fill(addr_string, n_ptr->addr);
-		info("Multicast link info for %s\n", addr_string);
+		info("Broadcast link info for %s\n", addr_string);
 		info("Supportable: %d,  ", n_ptr->bclink.supportable);
 		info("Supported: %d,  ", n_ptr->bclink.supported);
 		info("Acked: %u\n", n_ptr->bclink.acked);
 		info("Last in: %u,  ", n_ptr->bclink.last_in);
-		info("Gap after: %u,  ", n_ptr->bclink.gap_after);
-		info("Gap to: %u\n", n_ptr->bclink.gap_to);
-		info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
+		info("Oos state: %u,  ", n_ptr->bclink.oos_state);
+		info("Last sent: %u\n", n_ptr->bclink.last_sent);
 
 		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
 
@@ -1974,7 +1973,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
 
 	msg_set_type(msg, msg_typ);
 	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
-	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
+	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
 
 	if (msg_typ == STATE_MSG) {
@@ -2133,8 +2132,12 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
 
 		/* Synchronize broadcast link info, if not done previously */
 
-		if (!tipc_node_is_up(l_ptr->owner))
-			l_ptr->owner->bclink.last_in = msg_last_bcast(msg);
+		if (!tipc_node_is_up(l_ptr->owner)) {
+			l_ptr->owner->bclink.last_sent =
+				l_ptr->owner->bclink.last_in =
+				msg_last_bcast(msg);
+			l_ptr->owner->bclink.oos_state = 0;
+		}
 
 		l_ptr->peer_session = msg_session(msg);
 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
@@ -2181,7 +2184,9 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
 
 		/* Protocol message before retransmits, reduce loss risk */
 
-		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
+		if (l_ptr->owner->bclink.supported)
+			tipc_bclink_update_link_state(l_ptr->owner,
+						      msg_last_bcast(msg));
 
 		if (rec_gap || (msg_probe(msg))) {
 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 9196f943b835..6d8bdfd95cd6 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -339,12 +339,12 @@ static void node_lost_contact(struct tipc_node *n_ptr)
 	/* Flush broadcast link info associated with lost node */
 
 	if (n_ptr->bclink.supported) {
-		n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 0;
 		while (n_ptr->bclink.deferred_head) {
 			struct sk_buff *buf = n_ptr->bclink.deferred_head;
 			n_ptr->bclink.deferred_head = buf->next;
 			buf_discard(buf);
 		}
+		n_ptr->bclink.deferred_size = 0;
 
 		if (n_ptr->bclink.defragm) {
 			buf_discard(n_ptr->bclink.defragm);
@@ -450,7 +450,7 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
 
 	read_lock_bh(&tipc_net_lock);
 
-	/* Get space for all unicast links + multicast link */
+	/* Get space for all unicast links + broadcast link */
 
 	payload_size = TLV_SPACE(sizeof(link_info)) *
 		(atomic_read(&tipc_num_links) + 1);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 90689f487615..c88ce64f8a31 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -66,9 +66,9 @@
  *    @supported: non-zero if node supports TIPC b'cast capability
  *    @acked: sequence # of last outbound b'cast message acknowledged by node
  *    @last_in: sequence # of last in-sequence b'cast message received from node
- *    @gap_after: sequence # of last message not requiring a NAK request
- *    @gap_to: sequence # of last message requiring a NAK request
- *    @nack_sync: counter that determines when NAK requests should be sent
+ *    @last_sent: sequence # of last b'cast message sent by node
+ *    @oos_state: state tracker for handling OOS b'cast messages
+ *    @deferred_size: number of OOS b'cast messages in deferred queue
  *    @deferred_head: oldest OOS b'cast message received from node
  *    @deferred_tail: newest OOS b'cast message received from node
  *    @defragm: list of partially reassembled b'cast message fragments from node
@@ -91,9 +91,9 @@ struct tipc_node {
 		u8 supported;
 		u32 acked;
 		u32 last_in;
-		u32 gap_after;
-		u32 gap_to;
-		u32 nack_sync;
+		u32 last_sent;
+		u32 oos_state;
+		u32 deferred_size;
 		struct sk_buff *deferred_head;
 		struct sk_buff *deferred_tail;
 		struct sk_buff *defragm;