summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--net/tipc/bearer.c26
-rw-r--r--net/tipc/bearer.h3
-rw-r--r--net/tipc/link.c132
-rw-r--r--net/tipc/link.h6
-rw-r--r--net/tipc/name_distr.c4
-rw-r--r--net/tipc/node.c78
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/socket.c22
8 files changed, 198 insertions, 77 deletions
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 00bc0e620532..eae58a6b121c 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
 	rcu_read_unlock();
 }
 
+/* tipc_bearer_xmit() -send buffer to destination over bearer
+ */
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+		      struct sk_buff_head *xmitq,
+		      struct tipc_media_addr *dst)
+{
+	struct tipc_net *tn = net_generic(net, tipc_net_id);
+	struct tipc_bearer *b;
+	struct sk_buff *skb, *tmp;
+
+	if (skb_queue_empty(xmitq))
+		return;
+
+	rcu_read_lock();
+	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
+	if (likely(b)) {
+		skb_queue_walk_safe(xmitq, skb, tmp) {
+			__skb_dequeue(xmitq);
+			b->media->send_msg(net, skb, b, dst);
+			/* Until we remove cloning in tipc_l2_send_msg(): */
+			kfree_skb(skb);
+		}
+	}
+	rcu_read_unlock();
+}
+
 /**
  * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
  * @buf: the received packet
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index dc714d977768..6426f242f626 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
 void tipc_bearer_stop(struct net *net);
 void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
 		      struct tipc_media_addr *dest);
+void tipc_bearer_xmit(struct net *net, u32 bearer_id,
+		      struct sk_buff_head *xmitq,
+		      struct tipc_media_addr *dst);
 
 #endif	/* _TIPC_BEARER_H */
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ea32679b6737..c052437a7cfa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 	/* This really cannot happen...  */
 	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
 		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
-		tipc_link_reset(link);
 		return -ENOBUFS;
 	}
 	/* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 	return 0;
 }
 
+/**
+ * tipc_link_xmit(): enqueue buffer list according to queue situation
+ * @link: link to use
+ * @list: chain of buffers containing message
+ * @xmitq: returned list of packets to be sent by caller
+ *
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
+ */
+int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
+		   struct sk_buff_head *xmitq)
+{
+	struct tipc_msg *hdr = buf_msg(skb_peek(list));
+	unsigned int maxwin = l->window;
+	unsigned int i, imp = msg_importance(hdr);
+	unsigned int mtu = l->mtu;
+	u16 ack = l->rcv_nxt - 1;
+	u16 seqno = l->snd_nxt;
+	u16 bc_last_in = l->owner->bclink.last_in;
+	struct sk_buff_head *transmq = &l->transmq;
+	struct sk_buff_head *backlogq = &l->backlogq;
+	struct sk_buff *skb, *_skb, *bskb;
+
+	/* Match msg importance against this and all higher backlog limits: */
+	for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+		if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+			return link_schedule_user(l, list);
+	}
+	if (unlikely(msg_size(hdr) > mtu))
+		return -EMSGSIZE;
+
+	/* Prepare each packet for sending, and add to relevant queue: */
+	while (skb_queue_len(list)) {
+		skb = skb_peek(list);
+		hdr = buf_msg(skb);
+		msg_set_seqno(hdr, seqno);
+		msg_set_ack(hdr, ack);
+		msg_set_bcast_ack(hdr, bc_last_in);
+
+		if (likely(skb_queue_len(transmq) < maxwin)) {
+			_skb = skb_clone(skb, GFP_ATOMIC);
+			if (!_skb)
+				return -ENOBUFS;
+			__skb_dequeue(list);
+			__skb_queue_tail(transmq, skb);
+			__skb_queue_tail(xmitq, _skb);
+			l->rcv_unacked = 0;
+			seqno++;
+			continue;
+		}
+		if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
+			kfree_skb(__skb_dequeue(list));
+			l->stats.sent_bundled++;
+			continue;
+		}
+		if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
+			kfree_skb(__skb_dequeue(list));
+			__skb_queue_tail(backlogq, bskb);
+			l->backlog[msg_importance(buf_msg(bskb))].len++;
+			l->stats.sent_bundled++;
+			l->stats.sent_bundles++;
+			continue;
+		}
+		l->backlog[imp].len += skb_queue_len(list);
+		skb_queue_splice_tail_init(list, backlogq);
+	}
+	l->snd_nxt = seqno;
+	return 0;
+}
+
 static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
 {
 	skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
 	return __tipc_link_xmit(link->owner->net, link, &head);
 }
 
-/* tipc_link_xmit_skb(): send single buffer to destination
- * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not cause link congestion
- * The only exception is datagram messages rerouted after secondary
- * lookup, which are rare and safe to dispose of anyway.
- * TODO: Return real return value, and let callers use
- * tipc_wait_for_sendpkt() where applicable
- */
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
-		       u32 selector)
-{
-	struct sk_buff_head head;
-	int rc;
-
-	skb2list(skb, &head);
-	rc = tipc_link_xmit(net, &head, dnode, selector);
-	if (rc)
-		kfree_skb(skb);
-	return 0;
-}
-
-/**
- * tipc_link_xmit() is the general link level function for message sending
- * @net: the applicable net namespace
- * @list: chain of buffers containing message
- * @dsz: amount of user data to be sent
- * @dnode: address of destination node
- * @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning error
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
- */
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
-		   u32 selector)
-{
-	struct tipc_link *link = NULL;
-	struct tipc_node *node;
-	int rc = -EHOSTUNREACH;
-
-	node = tipc_node_find(net, dnode);
-	if (node) {
-		tipc_node_lock(node);
-		link = node_active_link(node, selector & 1);
-		if (link)
-			rc = __tipc_link_xmit(net, link, list);
-		tipc_node_unlock(node);
-		tipc_node_put(node);
-	}
-	if (link)
-		return rc;
-
-	if (likely(in_own_node(net, dnode))) {
-		tipc_sk_rcv(net, list);
-		return 0;
-	}
-
-	__skb_queue_purge(list);
-	return rc;
-}
-
 /*
  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
  *
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 9c71d9e42e93..7add2b90361d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
 void tipc_link_purge_backlog(struct tipc_link *l);
 void tipc_link_reset_all(struct tipc_node *node);
 void tipc_link_reset(struct tipc_link *l_ptr);
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
-		       u32 selector);
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
-		   u32 selector);
 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 		     struct sk_buff_head *list);
+int tipc_link_xmit(struct tipc_link *link,	struct sk_buff_head *list,
+		   struct sk_buff_head *xmitq);
 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
 			  u32 gap, u32 tolerance, u32 priority);
 void tipc_link_push_packets(struct tipc_link *l_ptr);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 3a1539e96294..e6018b7eb197 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
 		if (!oskb)
 			break;
 		msg_set_destnode(buf_msg(oskb), dnode);
-		tipc_link_xmit_skb(net, oskb, dnode, dnode);
+		tipc_node_xmit_skb(net, oskb, dnode, dnode);
 	}
 	rcu_read_unlock();
 
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
 			 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
 	rcu_read_unlock();
 
-	tipc_link_xmit(net, &head, dnode, dnode);
+	tipc_node_xmit(net, &head, dnode, dnode);
 }
 
 static void tipc_publ_subscribe(struct net *net, struct publication *publ,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 19729645d494..ad759bb034e7 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -563,6 +563,84 @@ msg_full:
 	return -EMSGSIZE;
 }
 
+static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
+					       int *bearer_id,
+					       struct tipc_media_addr **maddr)
+{
+	int id = n->active_links[sel & 1];
+
+	if (unlikely(id < 0))
+		return NULL;
+
+	*bearer_id = id;
+	*maddr = &n->links[id].maddr;
+	return n->links[id].link;
+}
+
+/**
+ * tipc_node_xmit() is the general link level function for message sending
+ * @net: the applicable net namespace
+ * @list: chain of buffers containing message
+ * @dnode: address of destination node
+ * @selector: a number used for deterministic link selection
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
+		   u32 dnode, int selector)
+{
+	struct tipc_link *l = NULL;
+	struct tipc_node *n;
+	struct sk_buff_head xmitq;
+	struct tipc_media_addr *maddr;
+	int bearer_id;
+	int rc = -EHOSTUNREACH;
+
+	__skb_queue_head_init(&xmitq);
+	n = tipc_node_find(net, dnode);
+	if (likely(n)) {
+		tipc_node_lock(n);
+		l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
+		if (likely(l))
+			rc = tipc_link_xmit(l, list, &xmitq);
+		if (unlikely(rc == -ENOBUFS))
+			tipc_link_reset(l);
+		tipc_node_unlock(n);
+		tipc_node_put(n);
+	}
+	if (likely(!rc)) {
+		tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
+		return 0;
+	}
+	if (likely(in_own_node(net, dnode))) {
+		tipc_sk_rcv(net, list);
+		return 0;
+	}
+	return rc;
+}
+
+/* tipc_node_xmit_skb(): send single buffer to destination
+ * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
+ * messages, which will not be rejected
+ * The only exception is datagram messages rerouted after secondary
+ * lookup, which are rare and safe to dispose of anyway.
+ * TODO: Return real return value, and let callers use
+ * tipc_wait_for_sendpkt() where applicable
+ */
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
+		       u32 selector)
+{
+	struct sk_buff_head head;
+	int rc;
+
+	skb_queue_head_init(&head);
+	__skb_queue_tail(&head, skb);
+	rc = tipc_node_xmit(net, &head, dnode, selector);
+	if (rc == -ELINKCONG)
+		kfree_skb(skb);
+	return 0;
+}
+
 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
 	int err;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 74f278adada3..86b7c740cf84 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n);
 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
 			   char *linkname, size_t len);
 void tipc_node_unlock(struct tipc_node *node);
+int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
+		   int selector);
+int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
+		       u32 selector);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 87fef25f6519..5b0b08d58fcc 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -261,7 +261,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
 
 	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
 		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
-			tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
+			tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
 	}
 }
 
@@ -443,7 +443,7 @@ static int tipc_release(struct socket *sock)
 			}
 			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
 					     TIPC_ERR_NO_PORT))
-				tipc_link_xmit_skb(net, skb, dnode, 0);
+				tipc_node_xmit_skb(net, skb, dnode, 0);
 		}
 	}
 
@@ -456,7 +456,7 @@ static int tipc_release(struct socket *sock)
 				      tsk_own_node(tsk), tsk_peer_port(tsk),
 				      tsk->portid, TIPC_ERR_NO_PORT);
 		if (skb)
-			tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 		tipc_node_remove_conn(net, dnode, tsk->portid);
 	}
 
@@ -925,7 +925,7 @@ new_mtu:
 	do {
 		skb = skb_peek(pktchain);
 		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-		rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
+		rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
 		if (likely(!rc)) {
 			if (sock->state != SS_READY)
 				sock->state = SS_CONNECTING;
@@ -1045,7 +1045,7 @@ next:
 		return rc;
 	do {
 		if (likely(!tsk_conn_cong(tsk))) {
-			rc = tipc_link_xmit(net, pktchain, dnode, portid);
+			rc = tipc_node_xmit(net, pktchain, dnode, portid);
 			if (likely(!rc)) {
 				tsk->sent_unacked++;
 				sent += send;
@@ -1224,7 +1224,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
 		return;
 	msg = buf_msg(skb);
 	msg_set_msgcnt(msg, ack);
-	tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
+	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1703,7 +1703,7 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 		return 0;
 	}
 	if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
-		tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+		tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 	return 0;
 }
 
@@ -1799,7 +1799,7 @@ int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
 			continue;
 xmit:
-		tipc_link_xmit_skb(net, skb, dnode, dport);
+		tipc_node_xmit_skb(net, skb, dnode, dport);
 	}
 	return err ? -EHOSTUNREACH : 0;
 }
@@ -2092,7 +2092,7 @@ restart:
 			}
 			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
 					     TIPC_CONN_SHUTDOWN))
-				tipc_link_xmit_skb(net, skb, dnode,
+				tipc_node_xmit_skb(net, skb, dnode,
 						   tsk->portid);
 		} else {
 			dnode = tsk_peer_node(tsk);
@@ -2102,7 +2102,7 @@ restart:
 					      0, dnode, tsk_own_node(tsk),
 					      tsk_peer_port(tsk),
 					      tsk->portid, TIPC_CONN_SHUTDOWN);
-			tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 		}
 		tsk->connected = 0;
 		sock->state = SS_DISCONNECTING;
@@ -2164,7 +2164,7 @@ static void tipc_sk_timeout(unsigned long data)
 	}
 	bh_unlock_sock(sk);
 	if (skb)
-		tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
+		tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
 exit:
 	sock_put(sk);
 }