summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--net/tipc/bcast.c105
-rw-r--r--net/tipc/bcast.h3
-rw-r--r--net/tipc/link.c8
-rw-r--r--net/tipc/msg.c17
-rw-r--r--net/tipc/msg.h9
-rw-r--r--net/tipc/node.c27
-rw-r--r--net/tipc/socket.c27
7 files changed, 149 insertions, 47 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 412d3351abb7..672e6ef93cab 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -70,7 +70,7 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
 
 int tipc_bcast_get_mtu(struct net *net)
 {
-	return tipc_link_mtu(tipc_bc_sndlink(net));
+	return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
 }
 
 /* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -175,42 +175,101 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
 	__skb_queue_purge(&_xmitq);
 }
 
-/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
- *                    and to identified node local sockets
+/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
  * @net: the applicable net namespace
- * @list: chain of buffers containing message
+ * @pkts: chain of buffers containing message
+ * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
  * Consumes the buffer chain.
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
+static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
+			   u16 *cong_link_cnt)
 {
 	struct tipc_link *l = tipc_bc_sndlink(net);
-	struct sk_buff_head xmitq, inputq, rcvq;
+	struct sk_buff_head xmitq;
 	int rc = 0;
 
-	__skb_queue_head_init(&rcvq);
 	__skb_queue_head_init(&xmitq);
-	skb_queue_head_init(&inputq);
-
-	/* Prepare message clone for local node */
-	if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
-		return -EHOSTUNREACH;
-
 	tipc_bcast_lock(net);
 	if (tipc_link_bc_peers(l))
-		rc = tipc_link_xmit(l, list, &xmitq);
+		rc = tipc_link_xmit(l, pkts, &xmitq);
 	tipc_bcast_unlock(net);
+	tipc_bcbase_xmit(net, &xmitq);
+	__skb_queue_purge(pkts);
+	if (rc == -ELINKCONG) {
+		*cong_link_cnt = 1;
+		rc = 0;
+	}
+	return rc;
+}
 
-	/* Don't send to local node if adding to link failed */
-	if (unlikely(rc && (rc != -ELINKCONG))) {
-		__skb_queue_purge(&rcvq);
-		return rc;
+/* tipc_rcast_xmit - replicate and send a message to given destination nodes
+ * @net: the applicable net namespace
+ * @pkts: chain of buffers containing message
+ * @dests: list of destination nodes
+ * @cong_link_cnt: returns number of congested links
+ * @cong_links: returns identities of congested links
+ * Returns 0 if success, otherwise errno
+ */
+static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
+			   struct tipc_nlist *dests, u16 *cong_link_cnt)
+{
+	struct sk_buff_head _pkts;
+	struct u32_item *n, *tmp;
+	u32 dst, selector;
+
+	selector = msg_link_selector(buf_msg(skb_peek(pkts)));
+	__skb_queue_head_init(&_pkts);
+
+	list_for_each_entry_safe(n, tmp, &dests->list, list) {
+		dst = n->value;
+		if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
+			return -ENOMEM;
+
+		/* Any other return value than -ELINKCONG is ignored */
+		if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
+			(*cong_link_cnt)++;
 	}
+	return 0;
+}
 
-	/* Broadcast to all nodes, inluding local node */
-	tipc_bcbase_xmit(net, &xmitq);
-	tipc_sk_mcast_rcv(net, &rcvq, &inputq);
-	__skb_queue_purge(list);
+/* tipc_mcast_xmit - deliver message to indicated destination nodes
+ *                   and to identified node local sockets
+ * @net: the applicable net namespace
+ * @pkts: chain of buffers containing message
+ * @dests: destination nodes for message. Not consumed.
+ * @cong_link_cnt: returns number of encountered congested destination links
+ * @cong_links: returns identities of congested links
+ * Consumes buffer chain.
+ * Returns 0 if success, otherwise errno
+ */
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
+		    struct tipc_nlist *dests, u16 *cong_link_cnt)
+{
+	struct tipc_bc_base *bb = tipc_bc_base(net);
+	struct sk_buff_head inputq, localq;
+	int rc = 0;
+
+	skb_queue_head_init(&inputq);
+	skb_queue_head_init(&localq);
+
+	/* Clone packets before they are consumed by next call */
+	if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
+		rc = -ENOMEM;
+		goto exit;
+	}
+
+	if (dests->remote) {
+		if (!bb->bcast_support)
+			rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
+		else
+			rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
+	}
+
+	if (dests->local)
+		tipc_sk_mcast_rcv(net, &localq, &inputq);
+exit:
+	__skb_queue_purge(pkts);
 	return rc;
 }
 
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 18f379198f8f..dd772e6f6fa4 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -66,7 +66,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
 void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
 void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
 int  tipc_bcast_get_mtu(struct net *net);
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
+		    struct tipc_nlist *dests, u16 *cong_link_cnt);
 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
 			struct tipc_msg *hdr);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b0f8646e0631..b17b9e155469 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1032,11 +1032,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
 			    struct sk_buff_head *inputq)
 {
-	switch (msg_user(buf_msg(skb))) {
+	struct tipc_msg *hdr = buf_msg(skb);
+
+	switch (msg_user(hdr)) {
 	case TIPC_LOW_IMPORTANCE:
 	case TIPC_MEDIUM_IMPORTANCE:
 	case TIPC_HIGH_IMPORTANCE:
 	case TIPC_CRITICAL_IMPORTANCE:
+		if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
+			skb_queue_tail(l->bc_rcvlink->inputq, skb);
+			return true;
+		}
 	case CONN_MANAGER:
 		skb_queue_tail(inputq, skb);
 		return true;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ab02d0742476..312ef7de57d7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -607,6 +607,23 @@ error:
 	return false;
 }
 
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+			struct sk_buff_head *cpy)
+{
+	struct sk_buff *skb, *_skb;
+
+	skb_queue_walk(msg, skb) {
+		_skb = pskb_copy(skb, GFP_ATOMIC);
+		if (!_skb) {
+			__skb_queue_purge(cpy);
+			return false;
+		}
+		msg_set_destnode(buf_msg(_skb), dst);
+		__skb_queue_tail(cpy, _skb);
+	}
+	return true;
+}
+
 /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
  * @list: list to be appended to
  * @seqno: sequence number of buffer to add
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index f07b51e3f6f1..c843fd2bc48d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)
 
 static inline u32 msg_link_selector(struct tipc_msg *m)
 {
+	if (msg_user(m) == MSG_FRAGMENTER)
+		m = (void *)msg_data(m);
 	return msg_bits(m, 4, 0, 1);
 }
 
-static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
-{
-	msg_set_bits(m, 4, 0, 1, n);
-}
-
 /*
  * Word 5
  */
@@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
 		   int offset, int dsz, int mtu, struct sk_buff_head *list);
 bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
 bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+			struct sk_buff_head *cpy);
 void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
 			     struct sk_buff *skb);
 
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 2883f6a0ed98..f96dacf173ab 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1257,6 +1257,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
 	kfree_skb(skb);
 }
 
+static void tipc_node_mcast_rcv(struct tipc_node *n)
+{
+	struct tipc_bclink_entry *be = &n->bc_entry;
+
+	/* 'arrvq' is under inputq2's lock protection */
+	spin_lock_bh(&be->inputq2.lock);
+	spin_lock_bh(&be->inputq1.lock);
+	skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
+	spin_unlock_bh(&be->inputq1.lock);
+	spin_unlock_bh(&be->inputq2.lock);
+	tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
+}
+
 static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
 				  int bearer_id, struct sk_buff_head *xmitq)
 {
@@ -1330,15 +1343,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
 	if (!skb_queue_empty(&xmitq))
 		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
 
-	/* Deliver. 'arrvq' is under inputq2's lock protection */
-	if (!skb_queue_empty(&be->inputq1)) {
-		spin_lock_bh(&be->inputq2.lock);
-		spin_lock_bh(&be->inputq1.lock);
-		skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
-		spin_unlock_bh(&be->inputq1.lock);
-		spin_unlock_bh(&be->inputq2.lock);
-		tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
-	}
+	if (!skb_queue_empty(&be->inputq1))
+		tipc_node_mcast_rcv(n);
 
 	if (rc & TIPC_LINK_DOWN_EVT) {
 		/* Reception reassembly failure => reset all links to peer */
@@ -1565,6 +1571,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 	if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
 		tipc_named_rcv(net, &n->bc_entry.namedq);
 
+	if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
+		tipc_node_mcast_rcv(n);
+
 	if (!skb_queue_empty(&le->inputq))
 		tipc_sk_rcv(net, &le->inputq);
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d2f353934f82..93b6ae3154c9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -740,32 +740,43 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct net *net = sock_net(sk);
 	int mtu = tipc_bcast_get_mtu(net);
+	u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
 	struct sk_buff_head pkts;
+	struct tipc_nlist dsts;
 	int rc;
 
+	/* Block or return if any destination link is congested */
 	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
 	if (unlikely(rc))
 		return rc;
 
+	/* Lookup destination nodes */
+	tipc_nlist_init(&dsts, tipc_own_addr(net));
+	tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
+				      seq->upper, domain, &dsts);
+	if (!dsts.local && !dsts.remote)
+		return -EHOSTUNREACH;
+
+	/* Build message header */
 	msg_set_type(hdr, TIPC_MCAST_MSG);
+	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
 	msg_set_destport(hdr, 0);
 	msg_set_destnode(hdr, 0);
 	msg_set_nametype(hdr, seq->type);
 	msg_set_namelower(hdr, seq->lower);
 	msg_set_nameupper(hdr, seq->upper);
-	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 
+	/* Build message as chain of buffers */
 	skb_queue_head_init(&pkts);
 	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
-	if (unlikely(rc != dlen))
-		return rc;
 
-	rc = tipc_bcast_xmit(net, &pkts);
-	if (unlikely(rc == -ELINKCONG)) {
-		tsk->cong_link_cnt = 1;
-		rc = 0;
-	}
+	/* Send message if build was successful */
+	if (unlikely(rc == dlen))
+		rc = tipc_mcast_xmit(net, &pkts, &dsts,
+				     &tsk->cong_link_cnt);
+
+	tipc_nlist_purge(&dsts);
 
 	return rc ? rc : dlen;
 }