summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--include/trace/events/rxrpc.h1
-rw-r--r--include/uapi/linux/udp.h1
-rw-r--r--net/rxrpc/ar-internal.h23
-rw-r--r--net/rxrpc/call_accept.c27
-rw-r--r--net/rxrpc/call_object.c5
-rw-r--r--net/rxrpc/conn_client.c10
-rw-r--r--net/rxrpc/conn_event.c26
-rw-r--r--net/rxrpc/input.c253
-rw-r--r--net/rxrpc/local_object.c30
-rw-r--r--net/rxrpc/peer_event.c5
-rw-r--r--net/rxrpc/peer_object.c29
11 files changed, 236 insertions, 174 deletions
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 837393fa897b..573d5b901fb1 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -931,6 +931,7 @@ TRACE_EVENT(rxrpc_tx_packet,
 	    TP_fast_assign(
 		    __entry->call = call_id;
 		    memcpy(&__entry->whdr, whdr, sizeof(__entry->whdr));
+		    __entry->where = where;
 			   ),
 
 	    TP_printk("c=%08x %08x:%08x:%08x:%04x %08x %08x %02x %02x %s %s",
diff --git a/include/uapi/linux/udp.h b/include/uapi/linux/udp.h
index 09d00f8c442b..09502de447f5 100644
--- a/include/uapi/linux/udp.h
+++ b/include/uapi/linux/udp.h
@@ -40,5 +40,6 @@ struct udphdr {
 #define UDP_ENCAP_L2TPINUDP	3 /* rfc2661 */
 #define UDP_ENCAP_GTP0		4 /* GSM TS 09.60 */
 #define UDP_ENCAP_GTP1U		5 /* 3GPP TS 29.060 */
+#define UDP_ENCAP_RXRPC		6
 
 #endif /* _UAPI_LINUX_UDP_H */
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index ef9554131434..a6e6cae82c30 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -302,6 +302,7 @@ struct rxrpc_peer {
 
 	/* calculated RTT cache */
 #define RXRPC_RTT_CACHE_SIZE 32
+	spinlock_t		rtt_input_lock;	/* RTT lock for input routine */
 	ktime_t			rtt_last_req;	/* Time of last RTT request */
 	u64			rtt;		/* Current RTT estimate (in nS) */
 	u64			rtt_sum;	/* Sum of cache contents */
@@ -442,17 +443,17 @@ struct rxrpc_connection {
 	spinlock_t		state_lock;	/* state-change lock */
 	enum rxrpc_conn_cache_state cache_state;
 	enum rxrpc_conn_proto_state state;	/* current state of connection */
-	u32			local_abort;	/* local abort code */
-	u32			remote_abort;	/* remote abort code */
+	u32			abort_code;	/* Abort code of connection abort */
 	int			debug_id;	/* debug ID for printks */
 	atomic_t		serial;		/* packet serial number counter */
 	unsigned int		hi_serial;	/* highest serial number received */
 	u32			security_nonce;	/* response re-use preventer */
-	u16			service_id;	/* Service ID, possibly upgraded */
+	u32			service_id;	/* Service ID, possibly upgraded */
 	u8			size_align;	/* data size alignment (for security) */
 	u8			security_size;	/* security header size */
 	u8			security_ix;	/* security type */
 	u8			out_clientflag;	/* RXRPC_CLIENT_INITIATED if we are client */
+	short			error;		/* Local error code */
 };
 
 static inline bool rxrpc_to_server(const struct rxrpc_skb_priv *sp)
@@ -635,6 +636,8 @@ struct rxrpc_call {
 	bool			tx_phase;	/* T if transmission phase, F if receive phase */
 	u8			nr_jumbo_bad;	/* Number of jumbo dups/exceeds-windows */
 
+	spinlock_t		input_lock;	/* Lock for packet input to this call */
+
 	/* receive-phase ACK management */
 	u8			ackr_reason;	/* reason to ACK */
 	u16			ackr_skew;	/* skew on packet being ACK'd */
@@ -720,8 +723,6 @@ int rxrpc_service_prealloc(struct rxrpc_sock *, gfp_t);
 void rxrpc_discard_prealloc(struct rxrpc_sock *);
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *,
 					   struct rxrpc_sock *,
-					   struct rxrpc_peer *,
-					   struct rxrpc_connection *,
 					   struct sk_buff *);
 void rxrpc_accept_incoming_calls(struct rxrpc_local *);
 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
@@ -891,8 +892,9 @@ extern unsigned long rxrpc_conn_idle_client_fast_expiry;
 extern struct idr rxrpc_client_conn_ids;
 
 void rxrpc_destroy_client_conn_ids(void);
-int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
-		       struct sockaddr_rxrpc *, gfp_t);
+int rxrpc_connect_call(struct rxrpc_sock *, struct rxrpc_call *,
+		       struct rxrpc_conn_parameters *, struct sockaddr_rxrpc *,
+		       gfp_t);
 void rxrpc_expose_client_call(struct rxrpc_call *);
 void rxrpc_disconnect_client_call(struct rxrpc_call *);
 void rxrpc_put_client_conn(struct rxrpc_connection *);
@@ -965,7 +967,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 /*
  * input.c
  */
-void rxrpc_data_ready(struct sock *);
+int rxrpc_input_packet(struct sock *, struct sk_buff *);
 
 /*
  * insecure.c
@@ -1045,10 +1047,11 @@ void rxrpc_peer_keepalive_worker(struct work_struct *);
  */
 struct rxrpc_peer *rxrpc_lookup_peer_rcu(struct rxrpc_local *,
 					 const struct sockaddr_rxrpc *);
-struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_local *,
+struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_sock *, struct rxrpc_local *,
 				     struct sockaddr_rxrpc *, gfp_t);
 struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *, gfp_t);
-void rxrpc_new_incoming_peer(struct rxrpc_local *, struct rxrpc_peer *);
+void rxrpc_new_incoming_peer(struct rxrpc_sock *, struct rxrpc_local *,
+			     struct rxrpc_peer *);
 void rxrpc_destroy_all_peers(struct rxrpc_net *);
 struct rxrpc_peer *rxrpc_get_peer(struct rxrpc_peer *);
 struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *);
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 9c7f26d06a52..652e314de38e 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -287,7 +287,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
 					  (peer_tail + 1) &
 					  (RXRPC_BACKLOG_MAX - 1));
 
-			rxrpc_new_incoming_peer(local, peer);
+			rxrpc_new_incoming_peer(rx, local, peer);
 		}
 
 		/* Now allocate and set up the connection */
@@ -333,11 +333,11 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
  */
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 					   struct rxrpc_sock *rx,
-					   struct rxrpc_peer *peer,
-					   struct rxrpc_connection *conn,
 					   struct sk_buff *skb)
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rxrpc_connection *conn;
+	struct rxrpc_peer *peer;
 	struct rxrpc_call *call;
 
 	_enter("");
@@ -354,6 +354,13 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 		goto out;
 	}
 
+	/* The peer, connection and call may all have sprung into existence due
+	 * to a duplicate packet being handled on another CPU in parallel, so
+	 * we have to recheck the routing.  However, we're now holding
+	 * rx->incoming_lock, so the values should remain stable.
+	 */
+	conn = rxrpc_find_connection_rcu(local, skb, &peer);
+
 	call = rxrpc_alloc_incoming_call(rx, local, peer, conn, skb);
 	if (!call) {
 		skb->mark = RXRPC_SKB_MARK_REJECT_BUSY;
@@ -396,20 +403,22 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 
 	case RXRPC_CONN_SERVICE:
 		write_lock(&call->state_lock);
-		if (rx->discard_new_call)
-			call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
-		else
-			call->state = RXRPC_CALL_SERVER_ACCEPTING;
+		if (call->state < RXRPC_CALL_COMPLETE) {
+			if (rx->discard_new_call)
+				call->state = RXRPC_CALL_SERVER_RECV_REQUEST;
+			else
+				call->state = RXRPC_CALL_SERVER_ACCEPTING;
+		}
 		write_unlock(&call->state_lock);
 		break;
 
 	case RXRPC_CONN_REMOTELY_ABORTED:
 		rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
-					  conn->remote_abort, -ECONNABORTED);
+					  conn->abort_code, conn->error);
 		break;
 	case RXRPC_CONN_LOCALLY_ABORTED:
 		rxrpc_abort_call("CON", call, sp->hdr.seq,
-				 conn->local_abort, -ECONNABORTED);
+				 conn->abort_code, conn->error);
 		break;
 	default:
 		BUG();
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 799f75b6900d..8f1a8f85b1f9 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -138,6 +138,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	init_waitqueue_head(&call->waitq);
 	spin_lock_init(&call->lock);
 	spin_lock_init(&call->notify_lock);
+	spin_lock_init(&call->input_lock);
 	rwlock_init(&call->state_lock);
 	atomic_set(&call->usage, 1);
 	call->debug_id = debug_id;
@@ -287,7 +288,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	/* Set up or get a connection record and set the protocol parameters,
 	 * including channel number and call ID.
 	 */
-	ret = rxrpc_connect_call(call, cp, srx, gfp);
+	ret = rxrpc_connect_call(rx, call, cp, srx, gfp);
 	if (ret < 0)
 		goto error;
 
@@ -339,7 +340,7 @@ int rxrpc_retry_client_call(struct rxrpc_sock *rx,
 	/* Set up or get a connection record and set the protocol parameters,
 	 * including channel number and call ID.
 	 */
-	ret = rxrpc_connect_call(call, cp, srx, gfp);
+	ret = rxrpc_connect_call(rx, call, cp, srx, gfp);
 	if (ret < 0)
 		goto error;
 
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index 8acf74fe24c0..521189f4b666 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -276,7 +276,8 @@ dont_reuse:
  * If we return with a connection, the call will be on its waiting list.  It's
  * left to the caller to assign a channel and wake up the call.
  */
-static int rxrpc_get_client_conn(struct rxrpc_call *call,
+static int rxrpc_get_client_conn(struct rxrpc_sock *rx,
+				 struct rxrpc_call *call,
 				 struct rxrpc_conn_parameters *cp,
 				 struct sockaddr_rxrpc *srx,
 				 gfp_t gfp)
@@ -289,7 +290,7 @@ static int rxrpc_get_client_conn(struct rxrpc_call *call,
 
 	_enter("{%d,%lx},", call->debug_id, call->user_call_ID);
 
-	cp->peer = rxrpc_lookup_peer(cp->local, srx, gfp);
+	cp->peer = rxrpc_lookup_peer(rx, cp->local, srx, gfp);
 	if (!cp->peer)
 		goto error;
 
@@ -683,7 +684,8 @@ out:
  * find a connection for a call
  * - called in process context with IRQs enabled
  */
-int rxrpc_connect_call(struct rxrpc_call *call,
+int rxrpc_connect_call(struct rxrpc_sock *rx,
+		       struct rxrpc_call *call,
 		       struct rxrpc_conn_parameters *cp,
 		       struct sockaddr_rxrpc *srx,
 		       gfp_t gfp)
@@ -696,7 +698,7 @@ int rxrpc_connect_call(struct rxrpc_call *call,
 	rxrpc_discard_expired_client_conns(&rxnet->client_conn_reaper);
 	rxrpc_cull_active_client_conns(rxnet);
 
-	ret = rxrpc_get_client_conn(call, cp, srx, gfp);
+	ret = rxrpc_get_client_conn(rx, call, cp, srx, gfp);
 	if (ret < 0)
 		goto out;
 
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index 6df56ce68861..b6fca8ebb117 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -126,7 +126,7 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
 
 	switch (chan->last_type) {
 	case RXRPC_PACKET_TYPE_ABORT:
-		_proto("Tx ABORT %%%u { %d } [re]", serial, conn->local_abort);
+		_proto("Tx ABORT %%%u { %d } [re]", serial, conn->abort_code);
 		break;
 	case RXRPC_PACKET_TYPE_ACK:
 		trace_rxrpc_tx_ack(chan->call_debug_id, serial,
@@ -153,13 +153,12 @@ static void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn,
  * pass a connection-level abort onto all calls on that connection
  */
 static void rxrpc_abort_calls(struct rxrpc_connection *conn,
-			      enum rxrpc_call_completion compl,
-			      u32 abort_code, int error)
+			      enum rxrpc_call_completion compl)
 {
 	struct rxrpc_call *call;
 	int i;
 
-	_enter("{%d},%x", conn->debug_id, abort_code);
+	_enter("{%d},%x", conn->debug_id, conn->abort_code);
 
 	spin_lock(&conn->channel_lock);
 
@@ -172,9 +171,11 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn,
 				trace_rxrpc_abort(call->debug_id,
 						  "CON", call->cid,
 						  call->call_id, 0,
-						  abort_code, error);
+						  conn->abort_code,
+						  conn->error);
 			if (rxrpc_set_call_completion(call, compl,
-						      abort_code, error))
+						      conn->abort_code,
+						      conn->error))
 				rxrpc_notify_socket(call);
 		}
 	}
@@ -207,10 +208,12 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
 		return 0;
 	}
 
+	conn->error = error;
+	conn->abort_code = abort_code;
 	conn->state = RXRPC_CONN_LOCALLY_ABORTED;
 	spin_unlock_bh(&conn->state_lock);
 
-	rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED, abort_code, error);
+	rxrpc_abort_calls(conn, RXRPC_CALL_LOCALLY_ABORTED);
 
 	msg.msg_name	= &conn->params.peer->srx.transport;
 	msg.msg_namelen	= conn->params.peer->srx.transport_len;
@@ -229,7 +232,7 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
 	whdr._rsvd	= 0;
 	whdr.serviceId	= htons(conn->service_id);
 
-	word		= htonl(conn->local_abort);
+	word		= htonl(conn->abort_code);
 
 	iov[0].iov_base	= &whdr;
 	iov[0].iov_len	= sizeof(whdr);
@@ -240,7 +243,7 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
 
 	serial = atomic_inc_return(&conn->serial);
 	whdr.serial = htonl(serial);
-	_proto("Tx CONN ABORT %%%u { %d }", serial, conn->local_abort);
+	_proto("Tx CONN ABORT %%%u { %d }", serial, conn->abort_code);
 
 	ret = kernel_sendmsg(conn->params.local->socket, &msg, iov, 2, len);
 	if (ret < 0) {
@@ -315,9 +318,10 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
 		abort_code = ntohl(wtmp);
 		_proto("Rx ABORT %%%u { ac=%d }", sp->hdr.serial, abort_code);
 
+		conn->error = -ECONNABORTED;
+		conn->abort_code = abort_code;
 		conn->state = RXRPC_CONN_REMOTELY_ABORTED;
-		rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED,
-				  abort_code, -ECONNABORTED);
+		rxrpc_abort_calls(conn, RXRPC_CALL_REMOTELY_ABORTED);
 		return -ECONNABORTED;
 
 	case RXRPC_PACKET_TYPE_CHALLENGE:
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 800f5b8a1baa..570b49d2da42 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -216,10 +216,11 @@ static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb,
 /*
  * Apply a hard ACK by advancing the Tx window.
  */
-static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
+static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
 				   struct rxrpc_ack_summary *summary)
 {
 	struct sk_buff *skb, *list = NULL;
+	bool rot_last = false;
 	int ix;
 	u8 annotation;
 
@@ -243,15 +244,17 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
 		skb->next = list;
 		list = skb;
 
-		if (annotation & RXRPC_TX_ANNO_LAST)
+		if (annotation & RXRPC_TX_ANNO_LAST) {
 			set_bit(RXRPC_CALL_TX_LAST, &call->flags);
+			rot_last = true;
+		}
 		if ((annotation & RXRPC_TX_ANNO_MASK) != RXRPC_TX_ANNO_ACK)
 			summary->nr_rot_new_acks++;
 	}
 
 	spin_unlock(&call->lock);
 
-	trace_rxrpc_transmit(call, (test_bit(RXRPC_CALL_TX_LAST, &call->flags) ?
+	trace_rxrpc_transmit(call, (rot_last ?
 				    rxrpc_transmit_rotate_last :
 				    rxrpc_transmit_rotate));
 	wake_up(&call->waitq);
@@ -262,6 +265,8 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
 		skb->next = NULL;
 		rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
 	}
+
+	return rot_last;
 }
 
 /*
@@ -273,23 +278,26 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
 static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
 			       const char *abort_why)
 {
+	unsigned int state;
 
 	ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags));
 
 	write_lock(&call->state_lock);
 
-	switch (call->state) {
+	state = call->state;
+	switch (state) {
 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
 	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 		if (reply_begun)
-			call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
+			call->state = state = RXRPC_CALL_CLIENT_RECV_REPLY;
 		else
-			call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
+			call->state = state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
 		break;
 
 	case RXRPC_CALL_SERVER_AWAIT_ACK:
 		__rxrpc_call_completed(call);
 		rxrpc_notify_socket(call);
+		state = call->state;
 		break;
 
 	default:
@@ -297,11 +305,10 @@ static bool rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
 	}
 
 	write_unlock(&call->state_lock);
-	if (call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY) {
+	if (state == RXRPC_CALL_CLIENT_AWAIT_REPLY)
 		trace_rxrpc_transmit(call, rxrpc_transmit_await_reply);
-	} else {
+	else
 		trace_rxrpc_transmit(call, rxrpc_transmit_end);
-	}
 	_leave(" = ok");
 	return true;
 
@@ -332,11 +339,11 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
 		trace_rxrpc_timer(call, rxrpc_timer_init_for_reply, now);
 	}
 
-	if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags))
-		rxrpc_rotate_tx_window(call, top, &summary);
 	if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) {
-		rxrpc_proto_abort("TXL", call, top);
-		return false;
+		if (!rxrpc_rotate_tx_window(call, top, &summary)) {
+			rxrpc_proto_abort("TXL", call, top);
+			return false;
+		}
 	}
 	if (!rxrpc_end_tx_phase(call, true, "ETD"))
 		return false;
@@ -452,13 +459,15 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb,
 		}
 	}
 
+	spin_lock(&call->input_lock);
+
 	/* Received data implicitly ACKs all of the request packets we sent
 	 * when we're acting as a client.
 	 */
 	if ((state == RXRPC_CALL_CLIENT_SEND_REQUEST ||
 	     state == RXRPC_CALL_CLIENT_AWAIT_REPLY) &&
 	    !rxrpc_receiving_reply(call))
-		return;
+		goto unlock;
 
 	call->ackr_prev_seq = seq;
 
@@ -488,12 +497,16 @@ next_subpacket:
 
 	if (flags & RXRPC_LAST_PACKET) {
 		if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
-		    seq != call->rx_top)
-			return rxrpc_proto_abort("LSN", call, seq);
+		    seq != call->rx_top) {
+			rxrpc_proto_abort("LSN", call, seq);
+			goto unlock;
+		}
 	} else {
 		if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
-		    after_eq(seq, call->rx_top))
-			return rxrpc_proto_abort("LSA", call, seq);
+		    after_eq(seq, call->rx_top)) {
+			rxrpc_proto_abort("LSA", call, seq);
+			goto unlock;
+		}
 	}
 
 	trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
@@ -560,8 +573,10 @@ next_subpacket:
 skip:
 	offset += len;
 	if (flags & RXRPC_JUMBO_PACKET) {
-		if (skb_copy_bits(skb, offset, &flags, 1) < 0)
-			return rxrpc_proto_abort("XJF", call, seq);
+		if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
+			rxrpc_proto_abort("XJF", call, seq);
+			goto unlock;
+		}
 		offset += sizeof(struct rxrpc_jumbo_header);
 		seq++;
 		serial++;
@@ -601,6 +616,9 @@ ack:
 		trace_rxrpc_notify_socket(call->debug_id, serial);
 		rxrpc_notify_socket(call);
 	}
+
+unlock:
+	spin_unlock(&call->input_lock);
 	_leave(" [queued]");
 }
 
@@ -687,15 +705,14 @@ static void rxrpc_input_ping_response(struct rxrpc_call *call,
 
 	ping_time = call->ping_time;
 	smp_rmb();
-	ping_serial = call->ping_serial;
+	ping_serial = READ_ONCE(call->ping_serial);
 
 	if (orig_serial == call->acks_lost_ping)
 		rxrpc_input_check_for_lost_ack(call);
 
-	if (!test_bit(RXRPC_CALL_PINGING, &call->flags) ||
-	    before(orig_serial, ping_serial))
+	if (before(orig_serial, ping_serial) ||
+	    !test_and_clear_bit(RXRPC_CALL_PINGING, &call->flags))
 		return;
-	clear_bit(RXRPC_CALL_PINGING, &call->flags);
 	if (after(orig_serial, ping_serial))
 		return;
 
@@ -861,15 +878,32 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
 				  rxrpc_propose_ack_respond_to_ack);
 	}
 
+	/* Discard any out-of-order or duplicate ACKs. */
+	if (before_eq(sp->hdr.serial, call->acks_latest))
+		return;
+
+	buf.info.rxMTU = 0;
 	ioffset = offset + nr_acks + 3;
-	if (skb->len >= ioffset + sizeof(buf.info)) {
-		if (skb_copy_bits(skb, ioffset, &buf.info, sizeof(buf.info)) < 0)
-			return rxrpc_proto_abort("XAI", call, 0);
+	if (skb->len >= ioffset + sizeof(buf.info) &&
+	    skb_copy_bits(skb, ioffset, &buf.info, sizeof(buf.info)) < 0)
+		return rxrpc_proto_abort("XAI", call, 0);
+
+	spin_lock(&call->input_lock);
+
+	/* Discard any out-of-order or duplicate ACKs. */
+	if (before_eq(sp->hdr.serial, call->acks_latest))
+		goto out;
+	call->acks_latest_ts = skb->tstamp;
+	call->acks_latest = sp->hdr.serial;
+
+	/* Parse rwind and mtu sizes if provided. */
+	if (buf.info.rxMTU)
 		rxrpc_input_ackinfo(call, skb, &buf.info);
-	}
 
-	if (first_soft_ack == 0)
-		return rxrpc_proto_abort("AK0", call, 0);
+	if (first_soft_ack == 0) {
+		rxrpc_proto_abort("AK0", call, 0);
+		goto out;
+	}
 
 	/* Ignore ACKs unless we are or have just been transmitting. */
 	switch (READ_ONCE(call->state)) {
@@ -879,39 +913,35 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
 	case RXRPC_CALL_SERVER_AWAIT_ACK:
 		break;
 	default:
-		return;
-	}
-
-	/* Discard any out-of-order or duplicate ACKs. */
-	if (before_eq(sp->hdr.serial, call->acks_latest)) {
-		_debug("discard ACK %d <= %d",
-		       sp->hdr.serial, call->acks_latest);
-		return;
+		goto out;
 	}
-	call->acks_latest_ts = skb->tstamp;
-	call->acks_latest = sp->hdr.serial;
 
 	if (before(hard_ack, call->tx_hard_ack) ||
-	    after(hard_ack, call->tx_top))
-		return rxrpc_proto_abort("AKW", call, 0);
-	if (nr_acks > call->tx_top - hard_ack)
-		return rxrpc_proto_abort("AKN", call, 0);
+	    after(hard_ack, call->tx_top)) {
+		rxrpc_proto_abort("AKW", call, 0);
+		goto out;
+	}
+	if (nr_acks > call->tx_top - hard_ack) {
+		rxrpc_proto_abort("AKN", call, 0);
+		goto out;
+	}
 
-	if (after(hard_ack, call->tx_hard_ack))
-		rxrpc_rotate_tx_window(call, hard_ack, &summary);
+	if (after(hard_ack, call->tx_hard_ack)) {
+		if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) {
+			rxrpc_end_tx_phase(call, false, "ETA");
+			goto out;
+		}
+	}
 
 	if (nr_acks > 0) {
-		if (skb_copy_bits(skb, offset, buf.acks, nr_acks) < 0)
-			return rxrpc_proto_abort("XSA", call, 0);
+		if (skb_copy_bits(skb, offset, buf.acks, nr_acks) < 0) {
+			rxrpc_proto_abort("XSA", call, 0);
+			goto out;
+		}
 		rxrpc_input_soft_acks(call, buf.acks, first_soft_ack, nr_acks,
 				      &summary);
 	}
 
-	if (test_bit(RXRPC_CALL_TX_LAST, &call->flags)) {
-		rxrpc_end_tx_phase(call, false, "ETA");
-		return;
-	}
-
 	if (call->rxtx_annotations[call->tx_top & RXRPC_RXTX_BUFF_MASK] &
 	    RXRPC_TX_ANNO_LAST &&
 	    summary.nr_acks == call->tx_top - hard_ack &&
@@ -920,7 +950,9 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
 				  false, true,
 				  rxrpc_propose_ack_ping_for_lost_reply);
 
-	return rxrpc_congestion_management(call, skb, &summary, acked_serial);
+	rxrpc_congestion_management(call, skb, &summary, acked_serial);
+out:
+	spin_unlock(&call->input_lock);
 }
 
 /*
@@ -933,9 +965,12 @@ static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb)
 
 	_proto("Rx ACKALL %%%u", sp->hdr.serial);
 
-	rxrpc_rotate_tx_window(call, call->tx_top, &summary);
-	if (test_bit(RXRPC_CALL_TX_LAST, &call->flags))
+	spin_lock(&call->input_lock);
+
+	if (rxrpc_rotate_tx_window(call, call->tx_top, &summary))
 		rxrpc_end_tx_phase(call, false, "ETL");
+
+	spin_unlock(&call->input_lock);
 }
 
 /*
@@ -1018,18 +1053,19 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
 }
 
 /*
- * Handle a new call on a channel implicitly completing the preceding call on
- * that channel.
+ * Handle a new service call on a channel implicitly completing the preceding
+ * call on that channel.  This does not apply to client conns.
  *
  * TODO: If callNumber > call_id + 1, renegotiate security.
  */
-static void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
+static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
+					  struct rxrpc_connection *conn,
 					  struct rxrpc_call *call)
 {
 	switch (READ_ONCE(call->state)) {
 	case RXRPC_CALL_SERVER_AWAIT_ACK:
 		rxrpc_call_completed(call);
-		break;
+		/* Fall through */
 	case RXRPC_CALL_COMPLETE:
 		break;
 	default:
@@ -1037,11 +1073,13 @@ static void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
 			set_bit(RXRPC_CALL_EV_ABORT, &call->events);
 			rxrpc_queue_call(call);
 		}
+		trace_rxrpc_improper_term(call);
 		break;
 	}
 
-	trace_rxrpc_improper_term(call);
+	spin_lock(&rx->incoming_lock);
 	__rxrpc_disconnect_call(conn, call);
+	spin_unlock(&rx->incoming_lock);
 	rxrpc_notify_socket(call);
 }
 
@@ -1120,8 +1158,10 @@ int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb)
  * The socket is locked by the caller and this prevents the socket from being
  * shut down and the local endpoint from going away, thus sk_user_data will not
  * be cleared until this function returns.
+ *
+ * Called with the RCU read lock held from the IP layer via UDP.
  */
-void rxrpc_data_ready(struct sock *udp_sk)
+int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn;
 	struct rxrpc_channel *chan;
@@ -1130,38 +1170,17 @@ void rxrpc_data_ready(struct sock *udp_sk)
 	struct rxrpc_local *local = udp_sk->sk_user_data;
 	struct rxrpc_peer *peer = NULL;
 	struct rxrpc_sock *rx = NULL;
-	struct sk_buff *skb;
 	unsigned int channel;
-	int ret, skew = 0;
+	int skew = 0;
 
 	_enter("%p", udp_sk);
 
-	ASSERT(!irqs_disabled());
-
-	skb = skb_recv_udp(udp_sk, 0, 1, &ret);
-	if (!skb) {
-		if (ret == -EAGAIN)
-			return;
-		_debug("UDP socket error %d", ret);
-		return;
-	}
-
 	if (skb->tstamp == 0)
 		skb->tstamp = ktime_get_real();
 
 	rxrpc_new_skb(skb, rxrpc_skb_rx_received);
 
-	_net("recv skb %p", skb);
-
-	/* we'll probably need to checksum it (didn't call sock_recvmsg) */
-	if (skb_checksum_complete(skb)) {
-		rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
-		__UDP_INC_STATS(&init_net, UDP_MIB_INERRORS, 0);
-		_leave(" [CSUM failed]");
-		return;
-	}
-
-	__UDP_INC_STATS(&init_net, UDP_MIB_INDATAGRAMS, 0);
+	skb_pull(skb, sizeof(struct udphdr));
 
 	/* The UDP protocol already released all skb resources;
 	 * we are free to add our own data there.
@@ -1176,11 +1195,13 @@ void rxrpc_data_ready(struct sock *udp_sk)
 		static int lose;
 		if ((lose++ & 7) == 7) {
 			trace_rxrpc_rx_lose(sp);
-			rxrpc_lose_skb(skb, rxrpc_skb_rx_lost);
-			return;
+			rxrpc_free_skb(skb, rxrpc_skb_rx_lost);
+			return 0;
 		}
 	}
 
+	if (skb->tstamp == 0)
+		skb->tstamp = ktime_get_real();
 	trace_rxrpc_rx_packet(sp);
 
 	switch (sp->hdr.type) {
@@ -1234,8 +1255,6 @@ void rxrpc_data_ready(struct sock *udp_sk)
 	if (sp->hdr.serviceId == 0)
 		goto bad_message;
 
-	rcu_read_lock();
-
 	if (rxrpc_to_server(sp)) {
 		/* Weed out packets to services we're not offering.  Packets
 		 * that would begin a call are explicitly rejected and the rest
@@ -1247,7 +1266,7 @@ void rxrpc_data_ready(struct sock *udp_sk)
 			if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
 			    sp->hdr.seq == 1)
 				goto unsupported_service;
-			goto discard_unlock;
+			goto discard;
 		}
 	}
 
@@ -1257,17 +1276,23 @@ void rxrpc_data_ready(struct sock *udp_sk)
 			goto wrong_security;
 
 		if (sp->hdr.serviceId != conn->service_id) {
-			if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags) ||
-			    conn->service_id != conn->params.service_id)
+			int old_id;
+
+			if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
+				goto reupgrade;
+			old_id = cmpxchg(&conn->service_id, conn->params.service_id,
+					 sp->hdr.serviceId);
+
+			if (old_id != conn->params.service_id &&
+			    old_id != sp->hdr.serviceId)
 				goto reupgrade;
-			conn->service_id = sp->hdr.serviceId;
 		}
 
 		if (sp->hdr.callNumber == 0) {
 			/* Connection-level packet */
 			_debug("CONN %p {%d}", conn, conn->debug_id);
 			rxrpc_post_packet_to_conn(conn, skb);
-			goto out_unlock;
+			goto out;
 		}
 
 		/* Note the serial number skew here */
@@ -1286,19 +1311,19 @@ void rxrpc_data_ready(struct sock *udp_sk)
 
 		/* Ignore really old calls */
 		if (sp->hdr.callNumber < chan->last_call)
-			goto discard_unlock;
+			goto discard;
 
 		if (sp->hdr.callNumber == chan->last_call) {
 			if (chan->call ||
 			    sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
-				goto discard_unlock;
+				goto discard;
 
 			/* For the previous service call, if completed
 			 * successfully, we discard all further packets.
 			 */
 			if (rxrpc_conn_is_service(conn) &&
 			    chan->last_type == RXRPC_PACKET_TYPE_ACK)
-				goto discard_unlock;
+				goto discard;
 
 			/* But otherwise we need to retransmit the final packet
 			 * from data cached in the connection record.
@@ -1309,18 +1334,16 @@ void rxrpc_data_ready(struct sock *udp_sk)
 						    sp->hdr.serial,
 						    sp->hdr.flags, 0);
 			rxrpc_post_packet_to_conn(conn, skb);
-			goto out_unlock;
+			goto out;
 		}
 
 		call = rcu_dereference(chan->call);
 
 		if (sp->hdr.callNumber > chan->call_id) {
-			if (rxrpc_to_client(sp)) {
-				rcu_read_unlock();
+			if (rxrpc_to_client(sp))
 				goto reject_packet;
-			}
 			if (call)
-				rxrpc_input_implicit_end_call(conn, call);
+				rxrpc_input_implicit_end_call(rx, conn, call);
 			call = NULL;
 		}
 
@@ -1337,55 +1360,42 @@ void rxrpc_data_ready(struct sock *udp_sk)
 	if (!call || atomic_read(&call->usage) == 0) {
 		if (rxrpc_to_client(sp) ||
 		    sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
-			goto bad_message_unlock;
+			goto bad_message;
 		if (sp->hdr.seq != 1)
-			goto discard_unlock;
-		call = rxrpc_new_incoming_call(local, rx, peer, conn, skb);
-		if (!call) {
-			rcu_read_unlock();
+			goto discard;
+		call = rxrpc_new_incoming_call(local, rx, skb);
+		if (!call)
 			goto reject_packet;
-		}
 		rxrpc_send_ping(call, skb, skew);
 		mutex_unlock(&call->user_mutex);
 	}
 
 	rxrpc_input_call_packet(call, skb, skew);
-	goto discard_unlock;
+	goto discard;
 
-discard_unlock:
-	rcu_read_unlock();
 discard:
 	rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 out:
 	trace_rxrpc_rx_done(0, 0);
-	return;
-
-out_unlock:
-	rcu_read_unlock();
-	goto out;
+	return 0;
 
 wrong_security:
-	rcu_read_unlock();
 	trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 			  RXKADINCONSISTENCY, EBADMSG);
 	skb->priority = RXKADINCONSISTENCY;
 	goto post_abort;
 
 unsupported_service:
-	rcu_read_unlock();
 	trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 			  RX_INVALID_OPERATION, EOPNOTSUPP);
 	skb->priority = RX_INVALID_OPERATION;
 	goto post_abort;
 
 reupgrade:
-	rcu_read_unlock();
 	trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 			  RX_PROTOCOL_ERROR, EBADMSG);
 	goto protocol_error;
 
-bad_message_unlock:
-	rcu_read_unlock();
 bad_message:
 	trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 			  RX_PROTOCOL_ERROR, EBADMSG);
@@ -1397,4 +1407,5 @@ reject_packet:
 	trace_rxrpc_rx_done(skb->mark, skb->priority);
 	rxrpc_reject_packet(local, skb);
 	_leave(" [badmsg]");
+	return 0;
 }
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 94d234e9c685..cad0691c2bb4 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -19,6 +19,7 @@
 #include <linux/ip.h>
 #include <linux/hashtable.h>
 #include <net/sock.h>
+#include <net/udp.h>
 #include <net/af_rxrpc.h>
 #include "ar-internal.h"
 
@@ -108,7 +109,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
  */
 static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 {
-	struct sock *sock;
+	struct sock *usk;
 	int ret, opt;
 
 	_enter("%p{%d,%d}",
@@ -122,6 +123,28 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 		return ret;
 	}
 
+	/* set the socket up */
+	usk = local->socket->sk;
+	inet_sk(usk)->mc_loop = 0;
+
+	/* Enable CHECKSUM_UNNECESSARY to CHECKSUM_COMPLETE conversion */
+	inet_inc_convert_csum(usk);
+
+	rcu_assign_sk_user_data(usk, local);
+
+	udp_sk(usk)->encap_type = UDP_ENCAP_RXRPC;
+	udp_sk(usk)->encap_rcv = rxrpc_input_packet;
+	udp_sk(usk)->encap_destroy = NULL;
+	udp_sk(usk)->gro_receive = NULL;
+	udp_sk(usk)->gro_complete = NULL;
+
+	udp_encap_enable();
+#if IS_ENABLED(CONFIG_IPV6)
+	if (local->srx.transport.family == AF_INET6)
+		udpv6_encap_enable();
+#endif
+	usk->sk_error_report = rxrpc_error_report;
+
 	/* if a local address was supplied then bind it */
 	if (local->srx.transport_len > sizeof(sa_family_t)) {
 		_debug("bind");
@@ -191,11 +214,6 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
 		BUG();
 	}
 
-	/* set the socket up */
-	sock = local->socket->sk;
-	sock->sk_user_data	= local;
-	sock->sk_data_ready	= rxrpc_data_ready;
-	sock->sk_error_report	= rxrpc_error_report;
 	_leave(" = 0");
 	return 0;
 
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index f3e6fc670da2..05b51bdbdd41 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -301,6 +301,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
 	if (rtt < 0)
 		return;
 
+	spin_lock(&peer->rtt_input_lock);
+
 	/* Replace the oldest datum in the RTT buffer */
 	sum -= peer->rtt_cache[cursor];
 	sum += rtt;
@@ -312,6 +314,8 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
 		peer->rtt_usage = usage;
 	}
 
+	spin_unlock(&peer->rtt_input_lock);
+
 	/* Now recalculate the average */
 	if (usage == RXRPC_RTT_CACHE_SIZE) {
 		avg = sum / RXRPC_RTT_CACHE_SIZE;
@@ -320,6 +324,7 @@ void rxrpc_peer_add_rtt(struct rxrpc_call *call, enum rxrpc_rtt_rx_trace why,
 		do_div(avg, usage);
 	}
 
+	/* Don't need to update this under lock */
 	peer->rtt = avg;
 	trace_rxrpc_rtt_rx(call, why, send_serial, resp_serial, rtt,
 			   usage, avg);
diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c
index 01a9febfa367..5691b7d266ca 100644
--- a/net/rxrpc/peer_object.c
+++ b/net/rxrpc/peer_object.c
@@ -153,8 +153,10 @@ struct rxrpc_peer *rxrpc_lookup_peer_rcu(struct rxrpc_local *local,
  * assess the MTU size for the network interface through which this peer is
  * reached
  */
-static void rxrpc_assess_MTU_size(struct rxrpc_peer *peer)
+static void rxrpc_assess_MTU_size(struct rxrpc_sock *rx,
+				  struct rxrpc_peer *peer)
 {
+	struct net *net = sock_net(&rx->sk);
 	struct dst_entry *dst;
 	struct rtable *rt;
 	struct flowi fl;
@@ -169,7 +171,7 @@ static void rxrpc_assess_MTU_size(struct rxrpc_peer *peer)
 	switch (peer->srx.transport.family) {
 	case AF_INET:
 		rt = ip_route_output_ports(
-			&init_net, fl4, NULL,
+			net, fl4, NULL,
 			peer->srx.transport.sin.sin_addr.s_addr, 0,
 			htons(7000), htons(7001), IPPROTO_UDP, 0, 0);
 		if (IS_ERR(rt)) {
@@ -188,7 +190,7 @@ static void rxrpc_assess_MTU_size(struct rxrpc_peer *peer)
 		       sizeof(struct in6_addr));
 		fl6->fl6_dport = htons(7001);
 		fl6->fl6_sport = htons(7000);
-		dst = ip6_route_output(&init_net, NULL, fl6);
+		dst = ip6_route_output(net, NULL, fl6);
 		if (dst->error) {
 			_leave(" [route err %d]", dst->error);
 			return;
@@ -223,6 +225,7 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
 		peer->service_conns = RB_ROOT;
 		seqlock_init(&peer->service_conn_lock);
 		spin_lock_init(&peer->lock);
+		spin_lock_init(&peer->rtt_input_lock);
 		peer->debug_id = atomic_inc_return(&rxrpc_debug_id);
 
 		if (RXRPC_TX_SMSS > 2190)
@@ -240,10 +243,11 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
 /*
  * Initialise peer record.
  */
-static void rxrpc_init_peer(struct rxrpc_peer *peer, unsigned long hash_key)
+static void rxrpc_init_peer(struct rxrpc_sock *rx, struct rxrpc_peer *peer,
+			    unsigned long hash_key)
 {
 	peer->hash_key = hash_key;
-	rxrpc_assess_MTU_size(peer);
+	rxrpc_assess_MTU_size(rx, peer);
 	peer->mtu = peer->if_mtu;
 	peer->rtt_last_req = ktime_get_real();
 
@@ -275,7 +279,8 @@ static void rxrpc_init_peer(struct rxrpc_peer *peer, unsigned long hash_key)
 /*
  * Set up a new peer.
  */
-static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_local *local,
+static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_sock *rx,
+					    struct rxrpc_local *local,
 					    struct sockaddr_rxrpc *srx,
 					    unsigned long hash_key,
 					    gfp_t gfp)
@@ -287,7 +292,7 @@ static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_local *local,
 	peer = rxrpc_alloc_peer(local, gfp);
 	if (peer) {
 		memcpy(&peer->srx, srx, sizeof(*srx));
-		rxrpc_init_peer(peer, hash_key);
+		rxrpc_init_peer(rx, peer, hash_key);
 	}
 
 	_leave(" = %p", peer);
@@ -299,14 +304,15 @@ static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_local *local,
  * since we've already done a search in the list from the non-reentrant context
  * (the data_ready handler) that is the only place we can add new peers.
  */
-void rxrpc_new_incoming_peer(struct rxrpc_local *local, struct rxrpc_peer *peer)
+void rxrpc_new_incoming_peer(struct rxrpc_sock *rx, struct rxrpc_local *local,
+			     struct rxrpc_peer *peer)
 {
 	struct rxrpc_net *rxnet = local->rxnet;
 	unsigned long hash_key;
 
 	hash_key = rxrpc_peer_hash_key(local, &peer->srx);
 	peer->local = local;
-	rxrpc_init_peer(peer, hash_key);
+	rxrpc_init_peer(rx, peer, hash_key);
 
 	spin_lock(&rxnet->peer_hash_lock);
 	hash_add_rcu(rxnet->peer_hash, &peer->hash_link, hash_key);
@@ -317,7 +323,8 @@ void rxrpc_new_incoming_peer(struct rxrpc_local *local, struct rxrpc_peer *peer)
 /*
  * obtain a remote transport endpoint for the specified address
  */
-struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_local *local,
+struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_sock *rx,
+				     struct rxrpc_local *local,
 				     struct sockaddr_rxrpc *srx, gfp_t gfp)
 {
 	struct rxrpc_peer *peer, *candidate;
@@ -337,7 +344,7 @@ struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_local *local,
 		/* The peer is not yet present in hash - create a candidate
 		 * for a new record and then redo the search.
 		 */
-		candidate = rxrpc_create_peer(local, srx, hash_key, gfp);
+		candidate = rxrpc_create_peer(rx, local, srx, hash_key, gfp);
 		if (!candidate) {
 			_leave(" = NULL [nomem]");
 			return NULL;