summary refs log tree commit diff
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c230
1 files changed, 166 insertions, 64 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index b8670bf262e2..96ceefeb9daf 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
  */
 static void bclink_retransmit_pkt(u32 after, u32 to)
 {
-	struct sk_buff *buf;
+	struct sk_buff *skb;
 
-	buf = bcl->first_out;
-	while (buf && less_eq(buf_seqno(buf), after))
-		buf = buf->next;
-	tipc_link_retransmit(bcl, buf, mod(to - after));
+	skb_queue_walk(&bcl->outqueue, skb) {
+		if (more(buf_seqno(skb), after))
+			break;
+	}
+	tipc_link_retransmit(bcl, skb, mod(to - after));
 }
 
 /**
@@ -232,8 +233,11 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
  */
 void tipc_bclink_wakeup_users(void)
 {
-	while (skb_queue_len(&bclink->link.waiting_sks))
-		tipc_sk_rcv(skb_dequeue(&bclink->link.waiting_sks));
+	struct sk_buff *skb;
+
+	while ((skb = skb_dequeue(&bclink->link.waiting_sks)))
+		tipc_sk_rcv(skb);
+
 }
 
 /**
@@ -245,14 +249,14 @@ void tipc_bclink_wakeup_users(void)
  */
 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 {
-	struct sk_buff *crs;
+	struct sk_buff *skb, *tmp;
 	struct sk_buff *next;
 	unsigned int released = 0;
 
 	tipc_bclink_lock();
 	/* Bail out if tx queue is empty (no clean up is required) */
-	crs = bcl->first_out;
-	if (!crs)
+	skb = skb_peek(&bcl->outqueue);
+	if (!skb)
 		goto exit;
 
 	/* Determine which messages need to be acknowledged */
@@ -271,43 +275,43 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 		 * Bail out if specified sequence number does not correspond
 		 * to a message that has been sent and not yet acknowledged
 		 */
-		if (less(acked, buf_seqno(crs)) ||
+		if (less(acked, buf_seqno(skb)) ||
 		    less(bcl->fsm_msg_cnt, acked) ||
 		    less_eq(acked, n_ptr->bclink.acked))
 			goto exit;
 	}
 
 	/* Skip over packets that node has previously acknowledged */
-	while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked))
-		crs = crs->next;
+	skb_queue_walk(&bcl->outqueue, skb) {
+		if (more(buf_seqno(skb), n_ptr->bclink.acked))
+			break;
+	}
 
 	/* Update packets that node is now acknowledging */
+	skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) {
+		if (more(buf_seqno(skb), acked))
+			break;
 
-	while (crs && less_eq(buf_seqno(crs), acked)) {
-		next = crs->next;
-
-		if (crs != bcl->next_out)
-			bcbuf_decr_acks(crs);
-		else {
-			bcbuf_set_acks(crs, 0);
+		next = tipc_skb_queue_next(&bcl->outqueue, skb);
+		if (skb != bcl->next_out) {
+			bcbuf_decr_acks(skb);
+		} else {
+			bcbuf_set_acks(skb, 0);
 			bcl->next_out = next;
 			bclink_set_last_sent();
 		}
 
-		if (bcbuf_acks(crs) == 0) {
-			bcl->first_out = next;
-			bcl->out_queue_size--;
-			kfree_skb(crs);
+		if (bcbuf_acks(skb) == 0) {
+			__skb_unlink(skb, &bcl->outqueue);
+			kfree_skb(skb);
 			released = 1;
 		}
-		crs = next;
 	}
 	n_ptr->bclink.acked = acked;
 
 	/* Try resolving broadcast link congestion, if necessary */
-
 	if (unlikely(bcl->next_out)) {
-		tipc_link_push_queue(bcl);
+		tipc_link_push_packets(bcl);
 		bclink_set_last_sent();
 	}
 	if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
@@ -327,19 +331,16 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
 	struct sk_buff *buf;
 
 	/* Ignore "stale" link state info */
-
 	if (less_eq(last_sent, n_ptr->bclink.last_in))
 		return;
 
 	/* Update link synchronization state; quit if in sync */
-
 	bclink_update_last_sent(n_ptr, last_sent);
 
 	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
 		return;
 
 	/* Update out-of-sync state; quit if loss is still unconfirmed */
-
 	if ((++n_ptr->bclink.oos_state) == 1) {
 		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
 			return;
@@ -347,15 +348,15 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
 	}
 
 	/* Don't NACK if one has been recently sent (or seen) */
-
 	if (n_ptr->bclink.oos_state & 0x1)
 		return;
 
 	/* Send NACK */
-
 	buf = tipc_buf_acquire(INT_H_SIZE);
 	if (buf) {
 		struct tipc_msg *msg = buf_msg(buf);
+		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
+		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
 
 		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
 			      INT_H_SIZE, n_ptr->addr);
@@ -363,9 +364,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
 		msg_set_mc_netid(msg, tipc_net_id);
 		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
 		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
-		msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
-				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
-				 : n_ptr->bclink.last_sent);
+		msg_set_bcgap_to(msg, to);
 
 		tipc_bclink_lock();
 		tipc_bearer_send(MAX_BEARERS, buf, NULL);
@@ -402,20 +401,20 @@ static void bclink_peek_nack(struct tipc_msg *msg)
 
 /* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
  *                    and to identified node local sockets
- * @buf: chain of buffers containing message
+ * @list: chain of buffers containing message
  * Consumes the buffer chain, except when returning -ELINKCONG
  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_bclink_xmit(struct sk_buff *buf)
+int tipc_bclink_xmit(struct sk_buff_head *list)
 {
 	int rc = 0;
 	int bc = 0;
-	struct sk_buff *clbuf;
+	struct sk_buff *skb;
 
 	/* Prepare clone of message for local node */
-	clbuf = tipc_msg_reassemble(buf);
-	if (unlikely(!clbuf)) {
-		kfree_skb_list(buf);
+	skb = tipc_msg_reassemble(list);
+	if (unlikely(!skb)) {
+		__skb_queue_purge(list);
 		return -EHOSTUNREACH;
 	}
 
@@ -423,11 +422,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
 	if (likely(bclink)) {
 		tipc_bclink_lock();
 		if (likely(bclink->bcast_nodes.count)) {
-			rc = __tipc_link_xmit(bcl, buf);
+			rc = __tipc_link_xmit(bcl, list);
 			if (likely(!rc)) {
+				u32 len = skb_queue_len(&bcl->outqueue);
+
 				bclink_set_last_sent();
 				bcl->stats.queue_sz_counts++;
-				bcl->stats.accu_queue_sz += bcl->out_queue_size;
+				bcl->stats.accu_queue_sz += len;
 			}
 			bc = 1;
 		}
@@ -435,13 +436,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
 	}
 
 	if (unlikely(!bc))
-		kfree_skb_list(buf);
+		__skb_queue_purge(list);
 
 	/* Deliver message clone */
 	if (likely(!rc))
-		tipc_sk_mcast_rcv(clbuf);
+		tipc_sk_mcast_rcv(skb);
 	else
-		kfree_skb(clbuf);
+		kfree_skb(skb);
 
 	return rc;
 }
@@ -462,7 +463,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
 	 * Unicast an ACK periodically, ensuring that
 	 * all nodes in the cluster don't ACK at the same time
 	 */
-
 	if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
 		tipc_link_proto_xmit(node->active_links[node->addr & 1],
 				     STATE_MSG, 0, 0, 0, 0, 0);
@@ -484,7 +484,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
 	int deferred = 0;
 
 	/* Screen out unwanted broadcast messages */
-
 	if (msg_mc_netid(msg) != tipc_net_id)
 		goto exit;
 
@@ -497,7 +496,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
 		goto unlock;
 
 	/* Handle broadcast protocol message */
-
 	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
 		if (msg_type(msg) != STATE_MSG)
 			goto unlock;
@@ -518,14 +516,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
 	}
 
 	/* Handle in-sequence broadcast message */
-
 	seqno = msg_seqno(msg);
 	next_in = mod(node->bclink.last_in + 1);
 
 	if (likely(seqno == next_in)) {
 receive:
 		/* Deliver message to destination */
-
 		if (likely(msg_isdata(msg))) {
 			tipc_bclink_lock();
 			bclink_accept_pkt(node, seqno);
@@ -574,7 +570,6 @@ receive:
 		buf = NULL;
 
 		/* Determine new synchronization state */
-
 		tipc_node_lock(node);
 		if (unlikely(!tipc_node_is_up(node)))
 			goto unlock;
@@ -582,33 +577,26 @@ receive:
 		if (node->bclink.last_in == node->bclink.last_sent)
 			goto unlock;
 
-		if (!node->bclink.deferred_head) {
+		if (skb_queue_empty(&node->bclink.deferred_queue)) {
 			node->bclink.oos_state = 1;
 			goto unlock;
 		}
 
-		msg = buf_msg(node->bclink.deferred_head);
+		msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
 		seqno = msg_seqno(msg);
 		next_in = mod(next_in + 1);
 		if (seqno != next_in)
 			goto unlock;
 
 		/* Take in-sequence message from deferred queue & deliver it */
-
-		buf = node->bclink.deferred_head;
-		node->bclink.deferred_head = buf->next;
-		buf->next = NULL;
-		node->bclink.deferred_size--;
+		buf = __skb_dequeue(&node->bclink.deferred_queue);
 		goto receive;
 	}
 
 	/* Handle out-of-sequence broadcast message */
-
 	if (less(next_in, seqno)) {
-		deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
-					       &node->bclink.deferred_tail,
+		deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
 					       buf);
-		node->bclink.deferred_size += deferred;
 		bclink_update_last_sent(node, seqno);
 		buf = NULL;
 	}
@@ -767,6 +755,118 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
 	tipc_bclink_unlock();
 }
 
+static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
+				      struct tipc_stats *stats)
+{
+	int i;
+	struct nlattr *nest;
+
+	struct nla_map {
+		__u32 key;
+		__u32 val;
+	};
+
+	struct nla_map map[] = {
+		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
+		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
+		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
+		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
+		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
+		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
+		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
+		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
+		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
+		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
+		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
+		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
+		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
+		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
+		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
+		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
+		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
+		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
+		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
+			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
+	};
+
+	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
+	if (!nest)
+		return -EMSGSIZE;
+
+	for (i = 0; i <  ARRAY_SIZE(map); i++)
+		if (nla_put_u32(skb, map[i].key, map[i].val))
+			goto msg_full;
+
+	nla_nest_end(skb, nest);
+
+	return 0;
+msg_full:
+	nla_nest_cancel(skb, nest);
+
+	return -EMSGSIZE;
+}
+
+int tipc_nl_add_bc_link(struct tipc_nl_msg *msg)
+{
+	int err;
+	void *hdr;
+	struct nlattr *attrs;
+	struct nlattr *prop;
+
+	if (!bcl)
+		return 0;
+
+	tipc_bclink_lock();
+
+	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
+			  NLM_F_MULTI, TIPC_NL_LINK_GET);
+	if (!hdr)
+		return -EMSGSIZE;
+
+	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
+	if (!attrs)
+		goto msg_full;
+
+	/* The broadcast link is always up */
+	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
+		goto attr_msg_full;
+
+	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
+		goto attr_msg_full;
+	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
+		goto attr_msg_full;
+	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no))
+		goto attr_msg_full;
+	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no))
+		goto attr_msg_full;
+
+	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
+	if (!prop)
+		goto attr_msg_full;
+	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0]))
+		goto prop_msg_full;
+	nla_nest_end(msg->skb, prop);
+
+	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
+	if (err)
+		goto attr_msg_full;
+
+	tipc_bclink_unlock();
+	nla_nest_end(msg->skb, attrs);
+	genlmsg_end(msg->skb, hdr);
+
+	return 0;
+
+prop_msg_full:
+	nla_nest_cancel(msg->skb, prop);
+attr_msg_full:
+	nla_nest_cancel(msg->skb, attrs);
+msg_full:
+	tipc_bclink_unlock();
+	genlmsg_cancel(msg->skb, hdr);
+
+	return -EMSGSIZE;
+}
 
 int tipc_bclink_stats(char *buf, const u32 buf_size)
 {
@@ -851,7 +951,9 @@ int tipc_bclink_init(void)
 	sprintf(bcbearer->media.name, "tipc-broadcast");
 
 	spin_lock_init(&bclink->lock);
-	__skb_queue_head_init(&bcl->waiting_sks);
+	__skb_queue_head_init(&bcl->outqueue);
+	__skb_queue_head_init(&bcl->deferred_queue);
+	skb_queue_head_init(&bcl->waiting_sks);
 	bcl->next_out_no = 1;
 	spin_lock_init(&bclink->node.lock);
 	__skb_queue_head_init(&bclink->node.waiting_sks);