summary refs log tree commit diff
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c132
1 files changed, 85 insertions, 47 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 611a04fb0ddc..c1a4611649ab 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -41,6 +41,7 @@
 #include "node.h"
 #include "link.h"
 #include "config.h"
+#include "name_distr.h"
 #include "socket.h"
 
 #define SS_LISTENING		-1	/* socket is listening */
@@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
 	struct sk_buff *b;
 	uint i, last, dst = 0;
 	u32 scope = TIPC_CLUSTER_SCOPE;
+	struct sk_buff_head msgs;
 
 	if (in_own_node(net, msg_orignode(msg)))
 		scope = TIPC_NODE_SCOPE;
 
+	if (unlikely(!msg_mcast(msg))) {
+		pr_warn("Received non-multicast msg in multicast\n");
+		kfree_skb(buf);
+		goto exit;
+	}
 	/* Create destination port list: */
 	tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
 				  msg_nameupper(msg), scope, &dports);
@@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
 				continue;
 			}
 			msg_set_destport(msg, item->ports[i]);
-			tipc_sk_rcv(net, b);
+			skb_queue_head_init(&msgs);
+			skb_queue_tail(&msgs, b);
+			tipc_sk_rcv(net, &msgs);
 		}
 	}
+exit:
 	tipc_port_list_free(&dports);
 }
 
@@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 }
 
 /**
- * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue
- * @sk: socket
- * @skb: pointer to message. Set to NULL if buffer is consumed.
- * @dnode: if buffer should be forwarded/returned, send to this node
+ * tipc_sk_enqueue - extract all buffers with destination 'dport' from
+ *                   inputq and try adding them to socket or backlog queue
+ * @inputq: list of incoming buffers with potentially different destinations
+ * @sk: socket where the buffers should be enqueued
+ * @dport: port number for the socket
+ * @_skb: returned buffer to be forwarded or rejected, if applicable
  *
  * Caller must hold socket lock
  *
- * Returns TIPC_OK (0) or -tipc error code
+ * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
+ * or -TIPC_ERR_NO_PORT
  */
-static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb)
+static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+			   u32 dport, struct sk_buff **_skb)
 {
 	unsigned int lim;
 	atomic_t *dcnt;
-
-	if (unlikely(!*skb))
-		return TIPC_OK;
-	if (!sock_owned_by_user(sk))
-		return filter_rcv(sk, skb);
-	dcnt = &tipc_sk(sk)->dupl_rcvcnt;
-	if (sk->sk_backlog.len)
-		atomic_set(dcnt, 0);
-	lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt);
-	if (unlikely(sk_add_backlog(sk, *skb, lim)))
+	int err;
+	struct sk_buff *skb;
+	unsigned long time_limit = jiffies + 2;
+
+	while (skb_queue_len(inputq)) {
+		skb = tipc_skb_dequeue(inputq, dport);
+		if (unlikely(!skb))
+			return TIPC_OK;
+		/* Return if softirq window exhausted */
+		if (unlikely(time_after_eq(jiffies, time_limit)))
+			return TIPC_OK;
+		if (!sock_owned_by_user(sk)) {
+			err = filter_rcv(sk, &skb);
+			if (likely(!skb))
+				continue;
+			*_skb = skb;
+			return err;
+		}
+		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+		if (sk->sk_backlog.len)
+			atomic_set(dcnt, 0);
+		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+		if (likely(!sk_add_backlog(sk, skb, lim)))
+			continue;
+		*_skb = skb;
 		return -TIPC_ERR_OVERLOAD;
-	*skb = NULL;
+	}
 	return TIPC_OK;
 }
 
 /**
- * tipc_sk_rcv - handle incoming message
- * @skb: buffer containing arriving message
- * Consumes buffer
- * Returns 0 if success, or errno: -EHOSTUNREACH
+ * tipc_sk_rcv - handle a chain of incoming buffers
+ * @inputq: buffer list containing the buffers
+ * Consumes all buffers in list until inputq is empty
+ * Note: may be called in multiple threads referring to the same queue
+ * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
+ * Only node local calls check the return value, sending single-buffer queues
  */
-int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
+int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 {
+	u32 dnode, dport = 0;
+	int err = -TIPC_ERR_NO_PORT;
+	struct sk_buff *skb;
 	struct tipc_sock *tsk;
 	struct tipc_net *tn;
 	struct sock *sk;
-	u32 dport = msg_destport(buf_msg(skb));
-	int err = -TIPC_ERR_NO_PORT;
-	u32 dnode;
 
-	/* Find destination */
-	tsk = tipc_sk_lookup(net, dport);
-	if (likely(tsk)) {
-		sk = &tsk->sk;
-		spin_lock_bh(&sk->sk_lock.slock);
-		err = tipc_sk_enqueue_skb(sk, &skb);
-		spin_unlock_bh(&sk->sk_lock.slock);
-		sock_put(sk);
-	}
-	if (likely(!skb))
-		return 0;
-	if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
-		goto xmit;
-	if (!err) {
-		dnode = msg_destnode(buf_msg(skb));
-		goto xmit;
-	}
-	tn = net_generic(net, tipc_net_id);
-	if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
-		return -EHOSTUNREACH;
+	while (skb_queue_len(inputq)) {
+		skb = NULL;
+		dport = tipc_skb_peek_port(inputq, dport);
+		tsk = tipc_sk_lookup(net, dport);
+		if (likely(tsk)) {
+			sk = &tsk->sk;
+			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
+				err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+				spin_unlock_bh(&sk->sk_lock.slock);
+				dport = 0;
+			}
+			sock_put(sk);
+		} else {
+			skb = tipc_skb_dequeue(inputq, dport);
+		}
+		if (likely(!skb))
+			continue;
+		if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
+			goto xmit;
+		if (!err) {
+			dnode = msg_destnode(buf_msg(skb));
+			goto xmit;
+		}
+		tn = net_generic(net, tipc_net_id);
+		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
+			continue;
 xmit:
-	tipc_link_xmit_skb(net, skb, dnode, dport);
+		tipc_link_xmit_skb(net, skb, dnode, dport);
+	}
 	return err ? -EHOSTUNREACH : 0;
 }