summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--net/tipc/msg.c20
-rw-r--r--net/tipc/msg.h3
-rw-r--r--net/tipc/socket.c255
-rw-r--r--net/tipc/socket.h2
4 files changed, 134 insertions, 146 deletions
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index b6cc58ec7346..562c926a51cc 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -520,17 +520,15 @@ exit:
 /**
  * tipc_msg_lookup_dest(): try to find new destination for named message
  * @skb: the buffer containing the message.
- * @dnode: return value: next-hop node, if destination found
- * @err: return value: error code to use, if message to be rejected
+ * @err: error code to be used by caller if lookup fails
  * Does not consume buffer
  * Returns true if a destination is found, false otherwise
  */
-bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
-			  u32 *dnode, int *err)
+bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
 {
 	struct tipc_msg *msg = buf_msg(skb);
-	u32 dport;
-	u32 own_addr = tipc_own_addr(net);
+	u32 dport, dnode;
+	u32 onode = tipc_own_addr(net);
 
 	if (!msg_isdata(msg))
 		return false;
@@ -543,15 +541,15 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
 		return false;
 	if (msg_reroute_cnt(msg))
 		return false;
-	*dnode = addr_domain(net, msg_lookup_scope(msg));
+	dnode = addr_domain(net, msg_lookup_scope(msg));
 	dport = tipc_nametbl_translate(net, msg_nametype(msg),
-				       msg_nameinst(msg), dnode);
+				       msg_nameinst(msg), &dnode);
 	if (!dport)
 		return false;
 	msg_incr_reroute_cnt(msg);
-	if (*dnode != own_addr)
-		msg_set_prevnode(msg, own_addr);
-	msg_set_destnode(msg, *dnode);
+	if (dnode != onode)
+		msg_set_prevnode(msg, onode);
+	msg_set_destnode(msg, dnode);
 	msg_set_destport(msg, dport);
 	*err = TIPC_OK;
 	return true;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d0834bc519aa..234fb0531d1d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -798,8 +798,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg,
 bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
 int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
 		   int offset, int dsz, int mtu, struct sk_buff_head *list);
-bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
-			  int *err);
+bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
 struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
 
 static inline u16 buf_seqno(struct sk_buff *skb)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 71d88adadb18..1060d52ff23e 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1520,82 +1520,81 @@ static void tipc_data_ready(struct sock *sk)
  * @tsk: TIPC socket
  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
  *
- * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
+ * Returns true if everything ok, false otherwise
  */
-static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
+static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
 {
 	struct sock *sk = &tsk->sk;
 	struct net *net = sock_net(sk);
 	struct socket *sock = sk->sk_socket;
-	struct tipc_msg *msg = buf_msg(*skb);
-	int retval = -TIPC_ERR_NO_PORT;
+	struct tipc_msg *hdr = buf_msg(skb);
 
-	if (msg_mcast(msg))
-		return retval;
+	if (unlikely(msg_mcast(hdr)))
+		return false;
 
 	switch ((int)sock->state) {
 	case SS_CONNECTED:
+
 		/* Accept only connection-based messages sent by peer */
-		if (tsk_peer_msg(tsk, msg)) {
-			if (unlikely(msg_errcode(msg))) {
-				sock->state = SS_DISCONNECTING;
-				tsk->connected = 0;
-				/* let timer expire on it's own */
-				tipc_node_remove_conn(net, tsk_peer_node(tsk),
-						      tsk->portid);
-			}
-			retval = TIPC_OK;
+		if (unlikely(!tsk_peer_msg(tsk, hdr)))
+			return false;
+
+		if (unlikely(msg_errcode(hdr))) {
+			sock->state = SS_DISCONNECTING;
+			tsk->connected = 0;
+			/* Let timer expire on it's own */
+			tipc_node_remove_conn(net, tsk_peer_node(tsk),
+					      tsk->portid);
 		}
-		break;
+		return true;
+
 	case SS_CONNECTING:
-		/* Accept only ACK or NACK message */
 
-		if (unlikely(!msg_connected(msg)))
-			break;
+		/* Accept only ACK or NACK message */
+		if (unlikely(!msg_connected(hdr)))
+			return false;
 
-		if (unlikely(msg_errcode(msg))) {
+		if (unlikely(msg_errcode(hdr))) {
 			sock->state = SS_DISCONNECTING;
 			sk->sk_err = ECONNREFUSED;
-			retval = TIPC_OK;
-			break;
+			return true;
 		}
 
-		if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
+		if (unlikely(!msg_isdata(hdr))) {
 			sock->state = SS_DISCONNECTING;
 			sk->sk_err = EINVAL;
-			retval = TIPC_OK;
-			break;
+			return true;
 		}
 
-		tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
-		msg_set_importance(&tsk->phdr, msg_importance(msg));
+		tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
+		msg_set_importance(&tsk->phdr, msg_importance(hdr));
 		sock->state = SS_CONNECTED;
 
-		/* If an incoming message is an 'ACK-', it should be
-		 * discarded here because it doesn't contain useful
-		 * data. In addition, we should try to wake up
-		 * connect() routine if sleeping.
-		 */
-		if (msg_data_sz(msg) == 0) {
-			kfree_skb(*skb);
-			*skb = NULL;
-			if (waitqueue_active(sk_sleep(sk)))
-				wake_up_interruptible(sk_sleep(sk));
-		}
-		retval = TIPC_OK;
-		break;
+		/* If 'ACK+' message, add to socket receive queue */
+		if (msg_data_sz(hdr))
+			return true;
+
+		/* If empty 'ACK-' message, wake up sleeping connect() */
+		if (waitqueue_active(sk_sleep(sk)))
+			wake_up_interruptible(sk_sleep(sk));
+
+		/* 'ACK-' message is neither accepted nor rejected: */
+		msg_set_dest_droppable(hdr, 1);
+		return false;
+
 	case SS_LISTENING:
 	case SS_UNCONNECTED:
+
 		/* Accept only SYN message */
-		if (!msg_connected(msg) && !(msg_errcode(msg)))
-			retval = TIPC_OK;
+		if (!msg_connected(hdr) && !(msg_errcode(hdr)))
+			return true;
 		break;
 	case SS_DISCONNECTING:
 		break;
 	default:
 		pr_err("Unknown socket state %u\n", sock->state);
 	}
-	return retval;
+	return false;
 }
 
 /**
@@ -1630,61 +1629,70 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
 /**
  * filter_rcv - validate incoming message
  * @sk: socket
- * @skb: pointer to message. Set to NULL if buffer is consumed.
+ * @skb: pointer to message.
  *
  * Enqueues message on receive queue if acceptable; optionally handles
  * disconnect indication for a connected socket.
  *
  * Called with socket lock already taken
  *
- * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
+ * Returns true if message was added to socket receive queue, otherwise false
  */
-static int filter_rcv(struct sock *sk, struct sk_buff **skb)
+static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
 {
 	struct socket *sock = sk->sk_socket;
 	struct tipc_sock *tsk = tipc_sk(sk);
-	struct tipc_msg *msg = buf_msg(*skb);
-	unsigned int limit = rcvbuf_limit(sk, *skb);
-	int rc = TIPC_OK;
+	struct tipc_msg *hdr = buf_msg(skb);
+	unsigned int limit = rcvbuf_limit(sk, skb);
+	int err = TIPC_OK;
+	int usr = msg_user(hdr);
 
-	if (unlikely(msg_user(msg) == CONN_MANAGER)) {
-		tipc_sk_proto_rcv(tsk, *skb);
-		return TIPC_OK;
+	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
+		tipc_sk_proto_rcv(tsk, skb);
+		return false;
 	}
 
-	if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
-		kfree_skb(*skb);
+	if (unlikely(usr == SOCK_WAKEUP)) {
+		kfree_skb(skb);
 		tsk->link_cong = 0;
 		sk->sk_write_space(sk);
-		*skb = NULL;
-		return TIPC_OK;
+		return false;
 	}
 
-	/* Reject message if it is wrong sort of message for socket */
-	if (msg_type(msg) > TIPC_DIRECT_MSG)
-		return -TIPC_ERR_NO_PORT;
+	/* Drop if illegal message type */
+	if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
+		kfree_skb(skb);
+		return false;
+	}
 
-	if (sock->state == SS_READY) {
-		if (msg_connected(msg))
-			return -TIPC_ERR_NO_PORT;
-	} else {
-		rc = filter_connect(tsk, skb);
-		if (rc != TIPC_OK || !*skb)
-			return rc;
+	/* Reject if wrong message type for current socket state */
+	if (unlikely(sock->state == SS_READY)) {
+		if (msg_connected(hdr)) {
+			err = TIPC_ERR_NO_PORT;
+			goto reject;
+		}
+	} else if (unlikely(!filter_connect(tsk, skb))) {
+		err = TIPC_ERR_NO_PORT;
+		goto reject;
 	}
 
 	/* Reject message if there isn't room to queue it */
-	if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
-		return -TIPC_ERR_OVERLOAD;
+	if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
+		err = TIPC_ERR_OVERLOAD;
+		goto reject;
+	}
 
 	/* Enqueue message */
-	TIPC_SKB_CB(*skb)->handle = NULL;
-	__skb_queue_tail(&sk->sk_receive_queue, *skb);
-	skb_set_owner_r(*skb, sk);
+	TIPC_SKB_CB(skb)->handle = NULL;
+	__skb_queue_tail(&sk->sk_receive_queue, skb);
+	skb_set_owner_r(skb, sk);
 
 	sk->sk_data_ready(sk);
-	*skb = NULL;
-	return TIPC_OK;
+	return true;
+
+reject:
+	tipc_sk_respond(sk, skb, err);
+	return false;
 }
 
 /**
@@ -1698,22 +1706,10 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb)
  */
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
-	int err;
-	atomic_t *dcnt;
-	u32 dnode = msg_prevnode(buf_msg(skb));
-	struct tipc_sock *tsk = tipc_sk(sk);
-	struct net *net = sock_net(sk);
-	uint truesize = skb->truesize;
+	unsigned int truesize = skb->truesize;
 
-	err = filter_rcv(sk, &skb);
-	if (likely(!skb)) {
-		dcnt = &tsk->dupl_rcvcnt;
-		if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
-			atomic_add(truesize, dcnt);
-		return 0;
-	}
-	if (!err || tipc_msg_reverse(tsk_own_node(tsk), &skb, -err))
-		tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
+	if (likely(filter_rcv(sk, skb)))
+		atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
 	return 0;
 }
 
@@ -1723,45 +1719,43 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
  * @inputq: list of incoming buffers with potentially different destinations
  * @sk: socket where the buffers should be enqueued
  * @dport: port number for the socket
- * @_skb: returned buffer to be forwarded or rejected, if applicable
  *
  * Caller must hold socket lock
- *
- * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
- * or -TIPC_ERR_NO_PORT
  */
-static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
-			   u32 dport, struct sk_buff **_skb)
+static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+			    u32 dport)
 {
 	unsigned int lim;
 	atomic_t *dcnt;
-	int err;
 	struct sk_buff *skb;
 	unsigned long time_limit = jiffies + 2;
 
 	while (skb_queue_len(inputq)) {
 		if (unlikely(time_after_eq(jiffies, time_limit)))
-			return TIPC_OK;
+			return;
+
 		skb = tipc_skb_dequeue(inputq, dport);
 		if (unlikely(!skb))
-			return TIPC_OK;
+			return;
+
+		/* Add message directly to receive queue if possible */
 		if (!sock_owned_by_user(sk)) {
-			err = filter_rcv(sk, &skb);
-			if (likely(!skb))
-				continue;
-			*_skb = skb;
-			return err;
+			filter_rcv(sk, skb);
+			continue;
 		}
+
+		/* Try backlog, compensating for double-counted bytes */
 		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
 		if (sk->sk_backlog.len)
 			atomic_set(dcnt, 0);
 		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
 		if (likely(!sk_add_backlog(sk, skb, lim)))
 			continue;
-		*_skb = skb;
-		return -TIPC_ERR_OVERLOAD;
+
+		/* Overload => reject message back to sender */
+		tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
+		break;
 	}
-	return TIPC_OK;
 }
 
 /**
@@ -1769,51 +1763,46 @@ static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
  * @inputq: buffer list containing the buffers
  * Consumes all buffers in list until inputq is empty
  * Note: may be called in multiple threads referring to the same queue
- * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
- * Only node local calls check the return value, sending single-buffer queues
  */
-int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
+void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 {
 	u32 dnode, dport = 0;
 	int err;
-	struct sk_buff *skb;
 	struct tipc_sock *tsk;
-	struct tipc_net *tn;
 	struct sock *sk;
+	struct sk_buff *skb;
 
 	while (skb_queue_len(inputq)) {
-		err = -TIPC_ERR_NO_PORT;
-		skb = NULL;
 		dport = tipc_skb_peek_port(inputq, dport);
 		tsk = tipc_sk_lookup(net, dport);
+
 		if (likely(tsk)) {
 			sk = &tsk->sk;
 			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
-				err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+				tipc_sk_enqueue(inputq, sk, dport);
 				spin_unlock_bh(&sk->sk_lock.slock);
-				dport = 0;
 			}
 			sock_put(sk);
-		} else {
-			skb = tipc_skb_dequeue(inputq, dport);
-		}
-		if (likely(!skb))
 			continue;
-		if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
-			goto xmit;
-		if (!err) {
-			dnode = msg_destnode(buf_msg(skb));
-			goto xmit;
-		} else {
-			dnode = msg_prevnode(buf_msg(skb));
 		}
-		tn = net_generic(net, tipc_net_id);
-		if (!tipc_msg_reverse(tn->own_addr, &skb, -err))
+
+		/* No destination socket => dequeue skb if still there */
+		skb = tipc_skb_dequeue(inputq, dport);
+		if (!skb)
+			return;
+
+		/* Try secondary lookup if unresolved named message */
+		err = TIPC_ERR_NO_PORT;
+		if (tipc_msg_lookup_dest(net, skb, &err))
+			goto xmit;
+
+		/* Prepare for message rejection */
+		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
 			continue;
 xmit:
+		dnode = msg_destnode(buf_msg(skb));
 		tipc_node_xmit_skb(net, skb, dnode, dport);
 	}
-	return err ? -EHOSTUNREACH : 0;
 }
 
 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -2082,7 +2071,10 @@ static int tipc_shutdown(struct socket *sock, int how)
 	struct net *net = sock_net(sk);
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct sk_buff *skb;
-	u32 dnode;
+	u32 dnode = tsk_peer_node(tsk);
+	u32 dport = tsk_peer_port(tsk);
+	u32 onode = tipc_own_addr(net);
+	u32 oport = tsk->portid;
 	int res;
 
 	if (how != SHUT_RDWR)
@@ -2108,9 +2100,8 @@ restart:
 		} else {
 			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
 					      TIPC_CONN_MSG, SHORT_H_SIZE,
-					      0, dnode, tsk_own_node(tsk),
-					      tsk_peer_port(tsk),
-					      tsk->portid, TIPC_CONN_SHUTDOWN);
+					      0, dnode, onode, dport, oport,
+					      TIPC_CONN_SHUTDOWN);
 			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 		}
 		tsk->connected = 0;
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index bf6551389522..4241f22069dc 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -44,7 +44,7 @@
 				  SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
 int tipc_socket_init(void);
 void tipc_socket_stop(void);
-int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
+void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 		       struct sk_buff_head *inputq);
 void tipc_sk_reinit(struct net *net);