summary refs log tree commit diff
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2022-03-21 16:24:45 -0700
committerLinus Torvalds <torvalds@linux-foundation.org>2022-03-21 16:24:45 -0700
commitaf472a9efdf65cbb3398cb6478ec0e89fbc84109 (patch)
tree11ec956e35851d6b579cbab5c555f70598adc52c /fs
parent93e220a62da36f766b3188e76e234607e41488f9 (diff)
parent5e929367468c8f97cd1ffb0417316cecfebef94b (diff)
downloadlinux-af472a9efdf65cbb3398cb6478ec0e89fbc84109.tar.gz
Merge tag 'for-5.18/io_uring-2022-03-18' of git://git.kernel.dk/linux-block
Pull io_uring updates from Jens Axboe:

 - Fixes for current file position. Still doesn't have the f_pos_lock
   sorted, but it's a step in the right direction (Dylan)

 - Tracing updates (Dylan, Stefan)

 - Improvements to io-wq locking (Hao)

 - Improvements for provided buffers (me, Pavel)

 - Support for registered file descriptors (me, Xiaoguang)

 - Support for ring messages (me)

 - Poll improvements (me)

 - Fix for fixed buffers and non-iterator reads/writes (me)

 - Support for NAPI on sockets (Olivier)

 - Ring quiesce improvements (Usama)

 - Misc fixes (Olivier, Pavel)

* tag 'for-5.18/io_uring-2022-03-18' of git://git.kernel.dk/linux-block: (42 commits)
  io_uring: terminate manual loop iterator loop correctly for non-vecs
  io_uring: don't check unrelated req->open.how in accept request
  io_uring: manage provided buffers strictly ordered
  io_uring: fold evfd signalling under a slower path
  io_uring: thin down io_commit_cqring()
  io_uring: shuffle io_eventfd_signal() bits around
  io_uring: remove extra barrier for non-sqpoll iopoll
  io_uring: fix provided buffer return on failure for kiocb_done()
  io_uring: extend provided buf return to fails
  io_uring: refactor timeout cancellation cqe posting
  io_uring: normilise naming for fill_cqe*
  io_uring: cache poll/double-poll state with a request flag
  io_uring: cache req->apoll->events in req->cflags
  io_uring: move req->poll_refs into previous struct hole
  io_uring: make tracing format consistent
  io_uring: recycle apoll_poll entries
  io_uring: remove duplicated member check for io_msg_ring_prep()
  io_uring: allow submissions to continue on error
  io_uring: recycle provided buffers if request goes async
  io_uring: ensure reads re-import for selected buffers
  ...
Diffstat (limited to 'fs')
-rw-r--r--fs/io-wq.c114
-rw-r--r--fs/io_uring.c1251
2 files changed, 1024 insertions, 341 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index bb7f161bb19c..5b93fa67d346 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -76,6 +76,7 @@ struct io_wqe_acct {
 	unsigned max_workers;
 	int index;
 	atomic_t nr_running;
+	raw_spinlock_t lock;
 	struct io_wq_work_list work_list;
 	unsigned long flags;
 };
@@ -91,7 +92,7 @@ enum {
  */
 struct io_wqe {
 	raw_spinlock_t lock;
-	struct io_wqe_acct acct[2];
+	struct io_wqe_acct acct[IO_WQ_ACCT_NR];
 
 	int node;
 
@@ -224,12 +225,12 @@ static void io_worker_exit(struct io_worker *worker)
 	if (worker->flags & IO_WORKER_F_FREE)
 		hlist_nulls_del_rcu(&worker->nulls_node);
 	list_del_rcu(&worker->all_list);
-	preempt_disable();
+	raw_spin_unlock(&wqe->lock);
 	io_wqe_dec_running(worker);
 	worker->flags = 0;
+	preempt_disable();
 	current->flags &= ~PF_IO_WORKER;
 	preempt_enable();
-	raw_spin_unlock(&wqe->lock);
 
 	kfree_rcu(worker, rcu);
 	io_worker_ref_put(wqe->wq);
@@ -238,10 +239,15 @@ static void io_worker_exit(struct io_worker *worker)
 
 static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
 {
+	bool ret = false;
+
+	raw_spin_lock(&acct->lock);
 	if (!wq_list_empty(&acct->work_list) &&
 	    !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
-		return true;
-	return false;
+		ret = true;
+	raw_spin_unlock(&acct->lock);
+
+	return ret;
 }
 
 /*
@@ -385,7 +391,6 @@ fail:
 }
 
 static void io_wqe_dec_running(struct io_worker *worker)
-	__must_hold(wqe->lock)
 {
 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
@@ -393,13 +398,14 @@ static void io_wqe_dec_running(struct io_worker *worker)
 	if (!(worker->flags & IO_WORKER_F_UP))
 		return;
 
-	if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
-		atomic_inc(&acct->nr_running);
-		atomic_inc(&wqe->wq->worker_refs);
-		raw_spin_unlock(&wqe->lock);
-		io_queue_worker_create(worker, acct, create_worker_cb);
-		raw_spin_lock(&wqe->lock);
-	}
+	if (!atomic_dec_and_test(&acct->nr_running))
+		return;
+	if (!io_acct_run_queue(acct))
+		return;
+
+	atomic_inc(&acct->nr_running);
+	atomic_inc(&wqe->wq->worker_refs);
+	io_queue_worker_create(worker, acct, create_worker_cb);
 }
 
 /*
@@ -407,11 +413,12 @@ static void io_wqe_dec_running(struct io_worker *worker)
  * it's currently on the freelist
  */
 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
-	__must_hold(wqe->lock)
 {
 	if (worker->flags & IO_WORKER_F_FREE) {
 		worker->flags &= ~IO_WORKER_F_FREE;
+		raw_spin_lock(&wqe->lock);
 		hlist_nulls_del_init_rcu(&worker->nulls_node);
+		raw_spin_unlock(&wqe->lock);
 	}
 }
 
@@ -456,7 +463,7 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
 
 static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 					   struct io_worker *worker)
-	__must_hold(wqe->lock)
+	__must_hold(acct->lock)
 {
 	struct io_wq_work_node *node, *prev;
 	struct io_wq_work *work, *tail;
@@ -498,9 +505,9 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
 		 * work being added and clearing the stalled bit.
 		 */
 		set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
-		raw_spin_unlock(&wqe->lock);
+		raw_spin_unlock(&acct->lock);
 		unstalled = io_wait_on_hash(wqe, stall_hash);
-		raw_spin_lock(&wqe->lock);
+		raw_spin_lock(&acct->lock);
 		if (unstalled) {
 			clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
 			if (wq_has_sleeper(&wqe->wq->hash->wait))
@@ -538,7 +545,6 @@ static void io_assign_current_work(struct io_worker *worker,
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
 
 static void io_worker_handle_work(struct io_worker *worker)
-	__releases(wqe->lock)
 {
 	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
 	struct io_wqe *wqe = worker->wqe;
@@ -555,7 +561,9 @@ static void io_worker_handle_work(struct io_worker *worker)
 		 * can't make progress, any work completion or insertion will
 		 * clear the stalled flag.
 		 */
+		raw_spin_lock(&acct->lock);
 		work = io_get_next_work(acct, worker);
+		raw_spin_unlock(&acct->lock);
 		if (work) {
 			__io_worker_busy(wqe, worker);
 
@@ -569,10 +577,9 @@ static void io_worker_handle_work(struct io_worker *worker)
 			raw_spin_lock(&worker->lock);
 			worker->next_work = work;
 			raw_spin_unlock(&worker->lock);
-		}
-		raw_spin_unlock(&wqe->lock);
-		if (!work)
+		} else {
 			break;
+		}
 		io_assign_current_work(worker, work);
 		__set_current_state(TASK_RUNNING);
 
@@ -608,8 +615,6 @@ static void io_worker_handle_work(struct io_worker *worker)
 					wake_up(&wq->hash->wait);
 			}
 		} while (work);
-
-		raw_spin_lock(&wqe->lock);
 	} while (1);
 }
 
@@ -633,12 +638,10 @@ static int io_wqe_worker(void *data)
 		long ret;
 
 		set_current_state(TASK_INTERRUPTIBLE);
-loop:
-		raw_spin_lock(&wqe->lock);
-		if (io_acct_run_queue(acct)) {
+		while (io_acct_run_queue(acct))
 			io_worker_handle_work(worker);
-			goto loop;
-		}
+
+		raw_spin_lock(&wqe->lock);
 		/* timed out, exit unless we're the last worker */
 		if (last_timeout && acct->nr_workers > 1) {
 			acct->nr_workers--;
@@ -662,10 +665,8 @@ loop:
 		last_timeout = !ret;
 	}
 
-	if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
-		raw_spin_lock(&wqe->lock);
+	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
 		io_worker_handle_work(worker);
-	}
 
 	audit_free(current);
 	io_worker_exit(worker);
@@ -705,10 +706,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
 		return;
 
 	worker->flags &= ~IO_WORKER_F_RUNNING;
-
-	raw_spin_lock(&worker->wqe->lock);
 	io_wqe_dec_running(worker);
-	raw_spin_unlock(&worker->wqe->lock);
 }
 
 static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
@@ -778,10 +776,12 @@ static void create_worker_cont(struct callback_head *cb)
 				.cancel_all	= true,
 			};
 
+			raw_spin_unlock(&wqe->lock);
 			while (io_acct_cancel_pending_work(wqe, acct, &match))
-				raw_spin_lock(&wqe->lock);
+				;
+		} else {
+			raw_spin_unlock(&wqe->lock);
 		}
-		raw_spin_unlock(&wqe->lock);
 		io_worker_ref_put(wqe->wq);
 		kfree(worker);
 		return;
@@ -914,6 +914,7 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 {
 	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
+	struct io_cb_cancel_data match;
 	unsigned work_flags = work->flags;
 	bool do_create;
 
@@ -927,10 +928,12 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 		return;
 	}
 
-	raw_spin_lock(&wqe->lock);
+	raw_spin_lock(&acct->lock);
 	io_wqe_insert_work(wqe, work);
 	clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+	raw_spin_unlock(&acct->lock);
 
+	raw_spin_lock(&wqe->lock);
 	rcu_read_lock();
 	do_create = !io_wqe_activate_free_worker(wqe, acct);
 	rcu_read_unlock();
@@ -946,18 +949,18 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
 			return;
 
 		raw_spin_lock(&wqe->lock);
-		/* fatal condition, failed to create the first worker */
-		if (!acct->nr_workers) {
-			struct io_cb_cancel_data match = {
-				.fn		= io_wq_work_match_item,
-				.data		= work,
-				.cancel_all	= false,
-			};
-
-			if (io_acct_cancel_pending_work(wqe, acct, &match))
-				raw_spin_lock(&wqe->lock);
+		if (acct->nr_workers) {
+			raw_spin_unlock(&wqe->lock);
+			return;
 		}
 		raw_spin_unlock(&wqe->lock);
+
+		/* fatal condition, failed to create the first worker */
+		match.fn		= io_wq_work_match_item,
+		match.data		= work,
+		match.cancel_all	= false,
+
+		io_acct_cancel_pending_work(wqe, acct, &match);
 	}
 }
 
@@ -1032,22 +1035,23 @@ static inline void io_wqe_remove_pending(struct io_wqe *wqe,
 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
 					struct io_wqe_acct *acct,
 					struct io_cb_cancel_data *match)
-	__releases(wqe->lock)
 {
 	struct io_wq_work_node *node, *prev;
 	struct io_wq_work *work;
 
+	raw_spin_lock(&acct->lock);
 	wq_list_for_each(node, prev, &acct->work_list) {
 		work = container_of(node, struct io_wq_work, list);
 		if (!match->fn(work, match->data))
 			continue;
 		io_wqe_remove_pending(wqe, work, prev);
-		raw_spin_unlock(&wqe->lock);
+		raw_spin_unlock(&acct->lock);
 		io_run_cancel(work, wqe);
 		match->nr_pending++;
 		/* not safe to continue after unlock */
 		return true;
 	}
+	raw_spin_unlock(&acct->lock);
 
 	return false;
 }
@@ -1061,7 +1065,6 @@ retry:
 		struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
 
 		if (io_acct_cancel_pending_work(wqe, acct, match)) {
-			raw_spin_lock(&wqe->lock);
 			if (match->cancel_all)
 				goto retry;
 			break;
@@ -1103,13 +1106,11 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
 	for_each_node(node) {
 		struct io_wqe *wqe = wq->wqes[node];
 
-		raw_spin_lock(&wqe->lock);
 		io_wqe_cancel_pending_work(wqe, &match);
-		if (match.nr_pending && !match.cancel_all) {
-			raw_spin_unlock(&wqe->lock);
+		if (match.nr_pending && !match.cancel_all)
 			return IO_WQ_CANCEL_OK;
-		}
 
+		raw_spin_lock(&wqe->lock);
 		io_wqe_cancel_running_work(wqe, &match);
 		raw_spin_unlock(&wqe->lock);
 		if (match.nr_running && !match.cancel_all)
@@ -1190,6 +1191,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
 			acct->index = i;
 			atomic_set(&acct->nr_running, 0);
 			INIT_WQ_LIST(&acct->work_list);
+			raw_spin_lock_init(&acct->lock);
 		}
 		wqe->wq = wq;
 		raw_spin_lock_init(&wqe->lock);
@@ -1282,9 +1284,7 @@ static void io_wq_destroy(struct io_wq *wq)
 			.fn		= io_wq_work_match_all,
 			.cancel_all	= true,
 		};
-		raw_spin_lock(&wqe->lock);
 		io_wqe_cancel_pending_work(wqe, &match);
-		raw_spin_unlock(&wqe->lock);
 		free_cpumask_var(wqe->cpu_mask);
 		kfree(wqe);
 	}
@@ -1376,7 +1376,7 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
 	BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
 	BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);
 
-	for (i = 0; i < 2; i++) {
+	for (i = 0; i < IO_WQ_ACCT_NR; i++) {
 		if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
 			new_count[i] = task_rlimit(current, RLIMIT_NPROC);
 	}
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 4715980e9015..5fa736344b67 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -63,6 +63,7 @@
 #include <net/sock.h>
 #include <net/af_unix.h>
 #include <net/scm.h>
+#include <net/busy_poll.h>
 #include <linux/anon_inodes.h>
 #include <linux/sched/mm.h>
 #include <linux/uaccess.h>
@@ -263,11 +264,18 @@ struct io_rsrc_data {
 	bool				quiesce;
 };
 
+struct io_buffer_list {
+	struct list_head list;
+	struct list_head buf_list;
+	__u16 bgid;
+};
+
 struct io_buffer {
 	struct list_head list;
 	__u64 addr;
 	__u32 len;
 	__u16 bid;
+	__u16 bgid;
 };
 
 struct io_restriction {
@@ -326,6 +334,14 @@ struct io_submit_state {
 	struct blk_plug		plug;
 };
 
+struct io_ev_fd {
+	struct eventfd_ctx	*cq_ev_fd;
+	unsigned int		eventfd_async: 1;
+	struct rcu_head		rcu;
+};
+
+#define IO_BUFFERS_HASH_BITS	5
+
 struct io_ring_ctx {
 	/* const or read-mostly hot data */
 	struct {
@@ -335,11 +351,11 @@ struct io_ring_ctx {
 		unsigned int		flags;
 		unsigned int		compat: 1;
 		unsigned int		drain_next: 1;
-		unsigned int		eventfd_async: 1;
 		unsigned int		restricted: 1;
 		unsigned int		off_timeout_used: 1;
 		unsigned int		drain_active: 1;
 		unsigned int		drain_disabled: 1;
+		unsigned int		has_evfd: 1;
 	} ____cacheline_aligned_in_smp;
 
 	/* submission data */
@@ -378,7 +394,9 @@ struct io_ring_ctx {
 		struct list_head	timeout_list;
 		struct list_head	ltimeout_list;
 		struct list_head	cq_overflow_list;
-		struct xarray		io_buffers;
+		struct list_head	*io_buffers;
+		struct list_head	io_buffers_cache;
+		struct list_head	apoll_cache;
 		struct xarray		personalities;
 		u32			pers_next;
 		unsigned		sq_thread_idle;
@@ -395,11 +413,16 @@ struct io_ring_ctx {
 	struct list_head	sqd_list;
 
 	unsigned long		check_cq_overflow;
+#ifdef CONFIG_NET_RX_BUSY_POLL
+	/* used to track busy poll napi_id */
+	struct list_head	napi_list;
+	spinlock_t		napi_lock;	/* napi_list lock */
+#endif
 
 	struct {
 		unsigned		cached_cq_tail;
 		unsigned		cq_entries;
-		struct eventfd_ctx	*cq_ev_fd;
+		struct io_ev_fd	__rcu	*io_ev_fd;
 		struct wait_queue_head	cq_wait;
 		unsigned		cq_extra;
 		atomic_t		cq_timeouts;
@@ -421,6 +444,8 @@ struct io_ring_ctx {
 		struct hlist_head	*cancel_hash;
 		unsigned		cancel_hash_bits;
 		bool			poll_multi_queue;
+
+		struct list_head	io_buffers_comp;
 	} ____cacheline_aligned_in_smp;
 
 	struct io_restriction		restrictions;
@@ -436,6 +461,8 @@ struct io_ring_ctx {
 		struct llist_head		rsrc_put_llist;
 		struct list_head		rsrc_ref_list;
 		spinlock_t			rsrc_ref_lock;
+
+		struct list_head	io_buffers_pages;
 	};
 
 	/* Keep this last, we don't need it for the fast path */
@@ -461,6 +488,11 @@ struct io_ring_ctx {
 	};
 };
 
+/*
+ * Arbitrary limit, can be raised if need be
+ */
+#define IO_RINGFD_REG_MAX 16
+
 struct io_uring_task {
 	/* submission side */
 	int			cached_refs;
@@ -476,6 +508,7 @@ struct io_uring_task {
 	struct io_wq_work_list	task_list;
 	struct io_wq_work_list	prior_task_list;
 	struct callback_head	task_work;
+	struct file		**registered_rings;
 	bool			task_running;
 };
 
@@ -690,6 +723,12 @@ struct io_hardlink {
 	int				flags;
 };
 
+struct io_msg {
+	struct file			*file;
+	u64 user_data;
+	u32 len;
+};
+
 struct io_async_connect {
 	struct sockaddr_storage		address;
 };
@@ -741,6 +780,8 @@ enum {
 	REQ_F_ARM_LTIMEOUT_BIT,
 	REQ_F_ASYNC_DATA_BIT,
 	REQ_F_SKIP_LINK_CQES_BIT,
+	REQ_F_SINGLE_POLL_BIT,
+	REQ_F_DOUBLE_POLL_BIT,
 	/* keep async read/write and isreg together and in order */
 	REQ_F_SUPPORT_NOWAIT_BIT,
 	REQ_F_ISREG_BIT,
@@ -799,6 +840,10 @@ enum {
 	REQ_F_ASYNC_DATA	= BIT(REQ_F_ASYNC_DATA_BIT),
 	/* don't post CQEs while failing linked requests */
 	REQ_F_SKIP_LINK_CQES	= BIT(REQ_F_SKIP_LINK_CQES_BIT),
+	/* single poll may be active */
+	REQ_F_SINGLE_POLL	= BIT(REQ_F_SINGLE_POLL_BIT),
+	/* double poll may active */
+	REQ_F_DOUBLE_POLL	= BIT(REQ_F_DOUBLE_POLL_BIT),
 };
 
 struct async_poll {
@@ -825,7 +870,7 @@ enum {
  * NOTE! Each of the iocb union members has the file pointer
  * as the first entry in their struct definition. So you can
  * access the file pointer through any of the sub-structs,
- * or directly as just 'ki_filp' in this struct.
+ * or directly as just 'file' in this struct.
  */
 struct io_kiocb {
 	union {
@@ -855,6 +900,7 @@ struct io_kiocb {
 		struct io_mkdir		mkdir;
 		struct io_symlink	symlink;
 		struct io_hardlink	hardlink;
+		struct io_msg		msg;
 	};
 
 	u8				opcode;
@@ -877,6 +923,7 @@ struct io_kiocb {
 	/* used by request caches, completion batching and iopoll */
 	struct io_wq_work_node		comp_list;
 	atomic_t			refs;
+	atomic_t			poll_refs;
 	struct io_kiocb			*link;
 	struct io_task_work		io_task_work;
 	/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
@@ -885,12 +932,11 @@ struct io_kiocb {
 	struct async_poll		*apoll;
 	/* opcode allocated if it needs to store data for async defer */
 	void				*async_data;
-	struct io_wq_work		work;
 	/* custom credentials, valid IFF REQ_F_CREDS is set */
-	const struct cred		*creds;
 	/* stores selected buf, valid IFF REQ_F_BUFFER_SELECTED is set */
 	struct io_buffer		*kbuf;
-	atomic_t			poll_refs;
+	const struct cred		*creds;
+	struct io_wq_work		work;
 };
 
 struct io_tctx_node {
@@ -1105,6 +1151,9 @@ static const struct io_op_def io_op_defs[] = {
 	[IORING_OP_MKDIRAT] = {},
 	[IORING_OP_SYMLINKAT] = {},
 	[IORING_OP_LINKAT] = {},
+	[IORING_OP_MSG_RING] = {
+		.needs_file		= 1,
+	},
 };
 
 /* requests with any of those set should undergo io_disarm_next() */
@@ -1141,6 +1190,7 @@ static int io_install_fixed_file(struct io_kiocb *req, struct file *file,
 static int io_close_fixed(struct io_kiocb *req, unsigned int issue_flags);
 
 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer);
+static void io_eventfd_signal(struct io_ring_ctx *ctx);
 
 static struct kmem_cache *req_cachep;
 
@@ -1267,36 +1317,88 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req,
 	}
 }
 
-static unsigned int __io_put_kbuf(struct io_kiocb *req)
+static unsigned int __io_put_kbuf(struct io_kiocb *req, struct list_head *list)
 {
 	struct io_buffer *kbuf = req->kbuf;
 	unsigned int cflags;
 
-	cflags = kbuf->bid << IORING_CQE_BUFFER_SHIFT;
-	cflags |= IORING_CQE_F_BUFFER;
+	cflags = IORING_CQE_F_BUFFER | (kbuf->bid << IORING_CQE_BUFFER_SHIFT);
 	req->flags &= ~REQ_F_BUFFER_SELECTED;
-	kfree(kbuf);
+	list_add(&kbuf->list, list);
 	req->kbuf = NULL;
 	return cflags;
 }
 
-static inline unsigned int io_put_kbuf(struct io_kiocb *req)
+static inline unsigned int io_put_kbuf_comp(struct io_kiocb *req)
+{
+	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
+		return 0;
+	return __io_put_kbuf(req, &req->ctx->io_buffers_comp);
+}
+
+static inline unsigned int io_put_kbuf(struct io_kiocb *req,
+				       unsigned issue_flags)
 {
+	unsigned int cflags;
+
 	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
 		return 0;
-	return __io_put_kbuf(req);
+
+	/*
+	 * We can add this buffer back to two lists:
+	 *
+	 * 1) The io_buffers_cache list. This one is protected by the
+	 *    ctx->uring_lock. If we already hold this lock, add back to this
+	 *    list as we can grab it from issue as well.
+	 * 2) The io_buffers_comp list. This one is protected by the
+	 *    ctx->completion_lock.
+	 *
+	 * We migrate buffers from the comp_list to the issue cache list
+	 * when we need one.
+	 */
+	if (issue_flags & IO_URING_F_UNLOCKED) {
+		struct io_ring_ctx *ctx = req->ctx;
+
+		spin_lock(&ctx->completion_lock);
+		cflags = __io_put_kbuf(req, &ctx->io_buffers_comp);
+		spin_unlock(&ctx->completion_lock);
+	} else {
+		cflags = __io_put_kbuf(req, &req->ctx->io_buffers_cache);
+	}
+
+	return cflags;
 }
 
-static void io_refs_resurrect(struct percpu_ref *ref, struct completion *compl)
+static struct io_buffer_list *io_buffer_get_list(struct io_ring_ctx *ctx,
+						 unsigned int bgid)
 {
-	bool got = percpu_ref_tryget(ref);
+	struct list_head *hash_list;
+	struct io_buffer_list *bl;
 
-	/* already at zero, wait for ->release() */
-	if (!got)
-		wait_for_completion(compl);
-	percpu_ref_resurrect(ref);
-	if (got)
-		percpu_ref_put(ref);
+	hash_list = &ctx->io_buffers[hash_32(bgid, IO_BUFFERS_HASH_BITS)];
+	list_for_each_entry(bl, hash_list, list)
+		if (bl->bgid == bgid || bgid == -1U)
+			return bl;
+
+	return NULL;
+}
+
+static void io_kbuf_recycle(struct io_kiocb *req)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+	struct io_buffer_list *bl;
+	struct io_buffer *buf;
+
+	if (likely(!(req->flags & REQ_F_BUFFER_SELECTED)))
+		return;
+
+	lockdep_assert_held(&ctx->uring_lock);
+
+	buf = req->kbuf;
+	bl = io_buffer_get_list(ctx, buf->bgid);
+	list_add(&buf->list, &bl->buf_list);
+	req->flags &= ~REQ_F_BUFFER_SELECTED;
+	req->kbuf = NULL;
 }
 
 static bool io_match_task(struct io_kiocb *head, struct task_struct *task,
@@ -1409,7 +1511,7 @@ static __cold void io_fallback_req_func(struct work_struct *work)
 static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 {
 	struct io_ring_ctx *ctx;
-	int hash_bits;
+	int i, hash_bits;
 
 	ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
 	if (!ctx)
@@ -1436,6 +1538,13 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	/* set invalid range, so io_import_fixed() fails meeting it */
 	ctx->dummy_ubuf->ubuf = -1UL;
 
+	ctx->io_buffers = kcalloc(1U << IO_BUFFERS_HASH_BITS,
+					sizeof(struct list_head), GFP_KERNEL);
+	if (!ctx->io_buffers)
+		goto err;
+	for (i = 0; i < (1U << IO_BUFFERS_HASH_BITS); i++)
+		INIT_LIST_HEAD(&ctx->io_buffers[i]);
+
 	if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
 			    PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
 		goto err;
@@ -1444,14 +1553,17 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	init_waitqueue_head(&ctx->sqo_sq_wait);
 	INIT_LIST_HEAD(&ctx->sqd_list);
 	INIT_LIST_HEAD(&ctx->cq_overflow_list);
+	INIT_LIST_HEAD(&ctx->io_buffers_cache);
+	INIT_LIST_HEAD(&ctx->apoll_cache);
 	init_completion(&ctx->ref_comp);
-	xa_init_flags(&ctx->io_buffers, XA_FLAGS_ALLOC1);
 	xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
 	mutex_init(&ctx->uring_lock);
 	init_waitqueue_head(&ctx->cq_wait);
 	spin_lock_init(&ctx->completion_lock);
 	spin_lock_init(&ctx->timeout_lock);
 	INIT_WQ_LIST(&ctx->iopoll_list);
+	INIT_LIST_HEAD(&ctx->io_buffers_pages);
+	INIT_LIST_HEAD(&ctx->io_buffers_comp);
 	INIT_LIST_HEAD(&ctx->defer_list);
 	INIT_LIST_HEAD(&ctx->timeout_list);
 	INIT_LIST_HEAD(&ctx->ltimeout_list);
@@ -1464,10 +1576,15 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_WQ_LIST(&ctx->locked_free_list);
 	INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
 	INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
+#ifdef CONFIG_NET_RX_BUSY_POLL
+	INIT_LIST_HEAD(&ctx->napi_list);
+	spin_lock_init(&ctx->napi_lock);
+#endif
 	return ctx;
 err:
 	kfree(ctx->dummy_ubuf);
 	kfree(ctx->cancel_hash);
+	kfree(ctx->io_buffers);
 	kfree(ctx);
 	return NULL;
 }
@@ -1610,8 +1727,8 @@ static void io_queue_async_work(struct io_kiocb *req, bool *dont_use)
 	if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
 		req->work.flags |= IO_WQ_WORK_CANCEL;
 
-	trace_io_uring_queue_async_work(ctx, io_wq_is_hashed(&req->work), req,
-					&req->work, req->flags);
+	trace_io_uring_queue_async_work(ctx, req, req->user_data, req->opcode, req->flags,
+					&req->work, io_wq_is_hashed(&req->work));
 	io_wq_enqueue(tctx->io_wq, &req->work);
 	if (link)
 		io_queue_linked_timeout(link);
@@ -1681,22 +1798,27 @@ static __cold void io_flush_timeouts(struct io_ring_ctx *ctx)
 	spin_unlock_irq(&ctx->timeout_lock);
 }
 
-static __cold void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
-{
-	if (ctx->off_timeout_used)
-		io_flush_timeouts(ctx);
-	if (ctx->drain_active)
-		io_queue_deferred(ctx);
-}
-
 static inline void io_commit_cqring(struct io_ring_ctx *ctx)
 {
-	if (unlikely(ctx->off_timeout_used || ctx->drain_active))
-		__io_commit_cqring_flush(ctx);
 	/* order cqe stores with ring update */
 	smp_store_release(&ctx->rings->cq.tail, ctx->cached_cq_tail);
 }
 
+static void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
+{
+	if (ctx->off_timeout_used || ctx->drain_active) {
+		spin_lock(&ctx->completion_lock);
+		if (ctx->off_timeout_used)
+			io_flush_timeouts(ctx);
+		if (ctx->drain_active)
+			io_queue_deferred(ctx);
+		io_commit_cqring(ctx);
+		spin_unlock(&ctx->completion_lock);
+	}
+	if (ctx->has_evfd)
+		io_eventfd_signal(ctx);
+}
+
 static inline bool io_sqring_full(struct io_ring_ctx *ctx)
 {
 	struct io_rings *r = ctx->rings;
@@ -1726,23 +1848,34 @@ static inline struct io_uring_cqe *io_get_cqe(struct io_ring_ctx *ctx)
 	return &rings->cqes[tail & mask];
 }
 
-static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx)
+static void io_eventfd_signal(struct io_ring_ctx *ctx)
 {
-	if (likely(!ctx->cq_ev_fd))
-		return false;
+	struct io_ev_fd *ev_fd;
+
+	rcu_read_lock();
+	/*
+	 * rcu_dereference ctx->io_ev_fd once and use it for both for checking
+	 * and eventfd_signal
+	 */
+	ev_fd = rcu_dereference(ctx->io_ev_fd);
+
+	/*
+	 * Check again if ev_fd exists incase an io_eventfd_unregister call
+	 * completed between the NULL check of ctx->io_ev_fd at the start of
+	 * the function and rcu_read_lock.
+	 */
+	if (unlikely(!ev_fd))
+		goto out;
 	if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
-		return false;
-	return !ctx->eventfd_async || io_wq_current_is_worker();
+		goto out;
+
+	if (!ev_fd->eventfd_async || io_wq_current_is_worker())
+		eventfd_signal(ev_fd->cq_ev_fd, 1);
+out:
+	rcu_read_unlock();
 }
 
-/*
- * This should only get called when at least one event has been posted.
- * Some applications rely on the eventfd notification count only changing
- * IFF a new CQE has been added to the CQ ring. There's no depedency on
- * 1:1 relationship between how many times this function is called (and
- * hence the eventfd count) and number of CQEs posted to the CQ ring.
- */
-static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
+static inline void io_cqring_wake(struct io_ring_ctx *ctx)
 {
 	/*
 	 * wake_up_all() may seem excessive, but io_wake_function() and
@@ -1751,21 +1884,32 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
 	 */
 	if (wq_has_sleeper(&ctx->cq_wait))
 		wake_up_all(&ctx->cq_wait);
-	if (io_should_trigger_evfd(ctx))
-		eventfd_signal(ctx->cq_ev_fd, 1);
+}
+
+/*
+ * This should only get called when at least one event has been posted.
+ * Some applications rely on the eventfd notification count only changing
+ * IFF a new CQE has been added to the CQ ring. There's no depedency on
+ * 1:1 relationship between how many times this function is called (and
+ * hence the eventfd count) and number of CQEs posted to the CQ ring.
+ */
+static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
+{
+	if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
+		     ctx->has_evfd))
+		__io_commit_cqring_flush(ctx);
+
+	io_cqring_wake(ctx);
 }
 
 static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
 {
-	/* see waitqueue_active() comment */
-	smp_mb();
+	if (unlikely(ctx->off_timeout_used || ctx->drain_active ||
+		     ctx->has_evfd))
+		__io_commit_cqring_flush(ctx);
 
-	if (ctx->flags & IORING_SETUP_SQPOLL) {
-		if (waitqueue_active(&ctx->cq_wait))
-			wake_up_all(&ctx->cq_wait);
-	}
-	if (io_should_trigger_evfd(ctx))
-		eventfd_signal(ctx->cq_ev_fd, 1);
+	if (ctx->flags & IORING_SETUP_SQPOLL)
+		io_cqring_wake(ctx);
 }
 
 /* Returns true if there are no backlogged entries after the flush */
@@ -1905,8 +2049,6 @@ static inline bool __io_fill_cqe(struct io_ring_ctx *ctx, u64 user_data,
 {
 	struct io_uring_cqe *cqe;
 
-	trace_io_uring_complete(ctx, user_data, res, cflags);
-
 	/*
 	 * If we can't get a cq entry, userspace overflowed the
 	 * submission (by quite a lot). Increment the overflow count in
@@ -1922,16 +2064,23 @@ static inline bool __io_fill_cqe(struct io_ring_ctx *ctx, u64 user_data,
 	return io_cqring_event_overflow(ctx, user_data, res, cflags);
 }
 
+static inline bool __io_fill_cqe_req(struct io_kiocb *req, s32 res, u32 cflags)
+{
+	trace_io_uring_complete(req->ctx, req, req->user_data, res, cflags);
+	return __io_fill_cqe(req->ctx, req->user_data, res, cflags);
+}
+
 static noinline void io_fill_cqe_req(struct io_kiocb *req, s32 res, u32 cflags)
 {
 	if (!(req->flags & REQ_F_CQE_SKIP))
-		__io_fill_cqe(req->ctx, req->user_data, res, cflags);
+		__io_fill_cqe_req(req, res, cflags);
 }
 
 static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data,
 				     s32 res, u32 cflags)
 {
 	ctx->cq_extra++;
+	trace_io_uring_complete(ctx, NULL, user_data, res, cflags);
 	return __io_fill_cqe(ctx, user_data, res, cflags);
 }
 
@@ -1941,7 +2090,7 @@ static void __io_req_complete_post(struct io_kiocb *req, s32 res,
 	struct io_ring_ctx *ctx = req->ctx;
 
 	if (!(req->flags & REQ_F_CQE_SKIP))
-		__io_fill_cqe(ctx, req->user_data, res, cflags);
+		__io_fill_cqe_req(req, res, cflags);
 	/*
 	 * If we're the last reference to this request, add to our locked
 	 * free_list cache.
@@ -2000,7 +2149,7 @@ static inline void io_req_complete(struct io_kiocb *req, s32 res)
 static void io_req_complete_failed(struct io_kiocb *req, s32 res)
 {
 	req_set_fail(req);
-	io_req_complete_post(req, res, 0);
+	io_req_complete_post(req, res, io_put_kbuf(req, 0));
 }
 
 static void io_req_complete_fail_submit(struct io_kiocb *req)
@@ -2183,7 +2332,9 @@ static void io_fail_links(struct io_kiocb *req)
 		nxt = link->link;
 		link->link = NULL;
 
-		trace_io_uring_fail_link(req, link);
+		trace_io_uring_fail_link(req->ctx, req, req->user_data,
+					req->opcode, link);
+
 		if (!ignore_cqes) {
 			link->flags &= ~REQ_F_CQE_SKIP;
 			io_fill_cqe_req(link, res, 0);
@@ -2302,7 +2453,8 @@ static void handle_prev_tw_list(struct io_wq_work_node *node,
 		if (likely(*uring_locked))
 			req->io_task_work.func(req, uring_locked);
 		else
-			__io_req_complete_post(req, req->result, io_put_kbuf(req));
+			__io_req_complete_post(req, req->result,
+						io_put_kbuf_comp(req));
 		node = next;
 	} while (node);
 
@@ -2530,8 +2682,16 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
 						    comp_list);
 
 			if (!(req->flags & REQ_F_CQE_SKIP))
-				__io_fill_cqe(ctx, req->user_data, req->result,
-					      req->cflags);
+				__io_fill_cqe_req(req, req->result, req->cflags);
+			if ((req->flags & REQ_F_POLLED) && req->apoll) {
+				struct async_poll *apoll = req->apoll;
+
+				if (apoll->double_poll)
+					kfree(apoll->double_poll);
+				list_add(&apoll->poll.wait.entry,
+						&ctx->apoll_cache);
+				req->flags &= ~REQ_F_POLLED;
+			}
 		}
 
 		io_commit_cqring(ctx);
@@ -2653,7 +2813,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin)
 		if (unlikely(req->flags & REQ_F_CQE_SKIP))
 			continue;
 
-		__io_fill_cqe(ctx, req->user_data, req->result, io_put_kbuf(req));
+		__io_fill_cqe_req(req, req->result, io_put_kbuf(req, 0));
 		nr_events++;
 	}
 
@@ -2829,14 +2989,14 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res)
 
 static inline void io_req_task_complete(struct io_kiocb *req, bool *locked)
 {
-	unsigned int cflags = io_put_kbuf(req);
 	int res = req->result;
 
 	if (*locked) {
-		io_req_complete_state(req, res, cflags);
+		io_req_complete_state(req, res, io_put_kbuf(req, 0));
 		io_req_add_compl_list(req);
 	} else {
-		io_req_complete_post(req, res, cflags);
+		io_req_complete_post(req, res,
+					io_put_kbuf(req, IO_URING_F_UNLOCKED));
 	}
 }
 
@@ -2845,7 +3005,8 @@ static void __io_complete_rw(struct io_kiocb *req, long res,
 {
 	if (__io_complete_rw_common(req, res))
 		return;
-	__io_req_complete(req, issue_flags, req->result, io_put_kbuf(req));
+	__io_req_complete(req, issue_flags, req->result,
+				io_put_kbuf(req, issue_flags));
 }
 
 static void io_complete_rw(struct kiocb *kiocb, long res)
@@ -3000,14 +3161,6 @@ static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 		req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT;
 
 	kiocb->ki_pos = READ_ONCE(sqe->off);
-	if (kiocb->ki_pos == -1) {
-		if (!(file->f_mode & FMODE_STREAM)) {
-			req->flags |= REQ_F_CUR_POS;
-			kiocb->ki_pos = file->f_pos;
-		} else {
-			kiocb->ki_pos = 0;
-		}
-	}
 	kiocb->ki_flags = iocb_flags(file);
 	ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
 	if (unlikely(ret))
@@ -3074,6 +3227,24 @@ static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
 	}
 }
 
+static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
+{
+	struct kiocb *kiocb = &req->rw.kiocb;
+	bool is_stream = req->file->f_mode & FMODE_STREAM;
+
+	if (kiocb->ki_pos == -1) {
+		if (!is_stream) {
+			req->flags |= REQ_F_CUR_POS;
+			kiocb->ki_pos = req->file->f_pos;
+			return &kiocb->ki_pos;
+		} else {
+			kiocb->ki_pos = 0;
+			return NULL;
+		}
+	}
+	return is_stream ? NULL : &kiocb->ki_pos;
+}
+
 static void kiocb_done(struct io_kiocb *req, ssize_t ret,
 		       unsigned int issue_flags)
 {
@@ -3096,14 +3267,10 @@ static void kiocb_done(struct io_kiocb *req, ssize_t ret,
 
 	if (req->flags & REQ_F_REISSUE) {
 		req->flags &= ~REQ_F_REISSUE;
-		if (io_resubmit_prep(req)) {
+		if (io_resubmit_prep(req))
 			io_req_task_queue_reissue(req);
-		} else {
-			req_set_fail(req);
-			req->result = ret;
-			req->io_task_work.func = io_req_task_complete;
-			io_req_task_work_add(req, false);
-		}
+		else
+			io_req_task_queue_fail(req, ret);
 	}
 }
 
@@ -3201,30 +3368,36 @@ static void io_ring_submit_lock(struct io_ring_ctx *ctx, bool needs_lock)
 		mutex_lock(&ctx->uring_lock);
 }
 
+static void io_buffer_add_list(struct io_ring_ctx *ctx,
+			       struct io_buffer_list *bl, unsigned int bgid)
+{
+	struct list_head *list;
+
+	list = &ctx->io_buffers[hash_32(bgid, IO_BUFFERS_HASH_BITS)];
+	INIT_LIST_HEAD(&bl->buf_list);
+	bl->bgid = bgid;
+	list_add(&bl->list, list);
+}
+
 static struct io_buffer *io_buffer_select(struct io_kiocb *req, size_t *len,
 					  int bgid, unsigned int issue_flags)
 {
 	struct io_buffer *kbuf = req->kbuf;
-	struct io_buffer *head;
 	bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
+	struct io_ring_ctx *ctx = req->ctx;
+	struct io_buffer_list *bl;
 
 	if (req->flags & REQ_F_BUFFER_SELECTED)
 		return kbuf;
 
-	io_ring_submit_lock(req->ctx, needs_lock);
+	io_ring_submit_lock(ctx, needs_lock);
 
-	lockdep_assert_held(&req->ctx->uring_lock);
+	lockdep_assert_held(&ctx->uring_lock);
 
-	head = xa_load(&req->ctx->io_buffers, bgid);
-	if (head) {
-		if (!list_empty(&head->list)) {
-			kbuf = list_last_entry(&head->list, struct io_buffer,
-							list);
-			list_del(&kbuf->list);
-		} else {
-			kbuf = head;
-			xa_erase(&req->ctx->io_buffers, bgid);
-		}
+	bl = io_buffer_get_list(ctx, bgid);
+	if (bl && !list_empty(&bl->buf_list)) {
+		kbuf = list_first_entry(&bl->buf_list, struct io_buffer, list);
+		list_del(&kbuf->list);
 		if (*len > kbuf->len)
 			*len = kbuf->len;
 		req->flags |= REQ_F_BUFFER_SELECTED;
@@ -3400,6 +3573,7 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter)
 	struct kiocb *kiocb = &req->rw.kiocb;
 	struct file *file = req->file;
 	ssize_t ret = 0;
+	loff_t *ppos;
 
 	/*
 	 * Don't support polled IO through this interface, and we can't
@@ -3412,6 +3586,8 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter)
 	    !(kiocb->ki_filp->f_flags & O_NONBLOCK))
 		return -EAGAIN;
 
+	ppos = io_kiocb_ppos(kiocb);
+
 	while (iov_iter_count(iter)) {
 		struct iovec iovec;
 		ssize_t nr;
@@ -3425,10 +3601,10 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter)
 
 		if (rw == READ) {
 			nr = file->f_op->read(file, iovec.iov_base,
-					      iovec.iov_len, io_kiocb_ppos(kiocb));
+					      iovec.iov_len, ppos);
 		} else {
 			nr = file->f_op->write(file, iovec.iov_base,
-					       iovec.iov_len, io_kiocb_ppos(kiocb));
+					       iovec.iov_len, ppos);
 		}
 
 		if (nr < 0) {
@@ -3436,13 +3612,15 @@ static ssize_t loop_rw_iter(int rw, struct io_kiocb *req, struct iov_iter *iter)
 				ret = nr;
 			break;
 		}
+		ret += nr;
 		if (!iov_iter_is_bvec(iter)) {
 			iov_iter_advance(iter, nr);
 		} else {
-			req->rw.len -= nr;
 			req->rw.addr += nr;
+			req->rw.len -= nr;
+			if (!req->rw.len)
+				break;
 		}
-		ret += nr;
 		if (nr != iovec.iov_len)
 			break;
 	}
@@ -3629,12 +3807,23 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
 	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
 	struct io_async_rw *rw;
 	ssize_t ret, ret2;
+	loff_t *ppos;
 
 	if (!req_has_async_data(req)) {
 		ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
 		if (unlikely(ret < 0))
 			return ret;
 	} else {
+		/*
+		 * Safe and required to re-import if we're using provided
+		 * buffers, as we dropped the selected one before retry.
+		 */
+		if (req->flags & REQ_F_BUFFER_SELECT) {
+			ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
+			if (unlikely(ret < 0))
+				return ret;
+		}
+
 		rw = req->async_data;
 		s = &rw->s;
 		/*
@@ -3659,7 +3848,9 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
 		kiocb->ki_flags &= ~IOCB_NOWAIT;
 	}
 
-	ret = rw_verify_area(READ, req->file, io_kiocb_ppos(kiocb), req->result);
+	ppos = io_kiocb_update_pos(req);
+
+	ret = rw_verify_area(READ, req->file, ppos, req->result);
 	if (unlikely(ret)) {
 		kfree(iovec);
 		return ret;
@@ -3669,6 +3860,9 @@ static int io_read(struct io_kiocb *req, unsigned int issue_flags)
 
 	if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
 		req->flags &= ~REQ_F_REISSUE;
+		/* if we can poll, just do that */
+		if (req->opcode == IORING_OP_READ && file_can_poll(req->file))
+			return -EAGAIN;
 		/* IOPOLL retry should happen for io-wq threads */
 		if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
 			goto done;
@@ -3758,6 +3952,7 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags)
 	struct kiocb *kiocb = &req->rw.kiocb;
 	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
 	ssize_t ret, ret2;
+	loff_t *ppos;
 
 	if (!req_has_async_data(req)) {
 		ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags);
@@ -3788,7 +3983,9 @@ static int io_write(struct io_kiocb *req, unsigned int issue_flags)
 		kiocb->ki_flags &= ~IOCB_NOWAIT;
 	}
 
-	ret = rw_verify_area(WRITE, req->file, io_kiocb_ppos(kiocb), req->result);
+	ppos = io_kiocb_update_pos(req);
+
+	ret = rw_verify_area(WRITE, req->file, ppos, req->result);
 	if (unlikely(ret))
 		goto out_free;
 
@@ -4235,6 +4432,45 @@ static int io_nop(struct io_kiocb *req, unsigned int issue_flags)
 	return 0;
 }
 
+static int io_msg_ring_prep(struct io_kiocb *req,
+			    const struct io_uring_sqe *sqe)
+{
+	if (unlikely(sqe->addr || sqe->ioprio || sqe->rw_flags ||
+		     sqe->splice_fd_in || sqe->buf_index || sqe->personality))
+		return -EINVAL;
+
+	if (req->file->f_op != &io_uring_fops)
+		return -EBADFD;
+
+	req->msg.user_data = READ_ONCE(sqe->off);
+	req->msg.len = READ_ONCE(sqe->len);
+	return 0;
+}
+
+static int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_ring_ctx *target_ctx;
+	struct io_msg *msg = &req->msg;
+	int ret = -EOVERFLOW;
+	bool filled;
+
+	target_ctx = req->file->private_data;
+
+	spin_lock(&target_ctx->completion_lock);
+	filled = io_fill_cqe_aux(target_ctx, msg->user_data, msg->len,
+					IORING_CQE_F_MSG);
+	io_commit_cqring(target_ctx);
+	spin_unlock(&target_ctx->completion_lock);
+
+	if (filled) {
+		io_cqring_ev_posted(target_ctx);
+		ret = 0;
+	}
+
+	__io_req_complete(req, issue_flags, ret, 0);
+	return 0;
+}
+
 static int io_fsync_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 {
 	struct io_ring_ctx *ctx = req->ctx;
@@ -4458,8 +4694,8 @@ static int io_remove_buffers_prep(struct io_kiocb *req,
 	return 0;
 }
 
-static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf,
-			       int bgid, unsigned nbufs)
+static int __io_remove_buffers(struct io_ring_ctx *ctx,
+			       struct io_buffer_list *bl, unsigned nbufs)
 {
 	unsigned i = 0;
 
@@ -4468,19 +4704,16 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx, struct io_buffer *buf,
 		return 0;
 
 	/* the head kbuf is the list itself */
-	while (!list_empty(&buf->list)) {
+	while (!list_empty(&bl->buf_list)) {
 		struct io_buffer *nxt;
 
-		nxt = list_first_entry(&buf->list, struct io_buffer, list);
+		nxt = list_first_entry(&bl->buf_list, struct io_buffer, list);
 		list_del(&nxt->list);
-		kfree(nxt);
 		if (++i == nbufs)
 			return i;
 		cond_resched();
 	}
 	i++;
-	kfree(buf);
-	xa_erase(&ctx->io_buffers, bgid);
 
 	return i;
 }
@@ -4489,7 +4722,7 @@ static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_provide_buf *p = &req->pbuf;
 	struct io_ring_ctx *ctx = req->ctx;
-	struct io_buffer *head;
+	struct io_buffer_list *bl;
 	int ret = 0;
 	bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
 
@@ -4498,9 +4731,9 @@ static int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
 	lockdep_assert_held(&ctx->uring_lock);
 
 	ret = -ENOENT;
-	head = xa_load(&ctx->io_buffers, p->bgid);
-	if (head)
-		ret = __io_remove_buffers(ctx, head, p->bgid, p->nbufs);
+	bl = io_buffer_get_list(ctx, p->bgid);
+	if (bl)
+		ret = __io_remove_buffers(ctx, bl, p->nbufs);
 	if (ret < 0)
 		req_set_fail(req);
 
@@ -4545,39 +4778,80 @@ static int io_provide_buffers_prep(struct io_kiocb *req,
 	return 0;
 }
 
-static int io_add_buffers(struct io_provide_buf *pbuf, struct io_buffer **head)
+static int io_refill_buffer_cache(struct io_ring_ctx *ctx)
+{
+	struct io_buffer *buf;
+	struct page *page;
+	int bufs_in_page;
+
+	/*
+	 * Completions that don't happen inline (eg not under uring_lock) will
+	 * add to ->io_buffers_comp. If we don't have any free buffers, check
+	 * the completion list and splice those entries first.
+	 */
+	if (!list_empty_careful(&ctx->io_buffers_comp)) {
+		spin_lock(&ctx->completion_lock);
+		if (!list_empty(&ctx->io_buffers_comp)) {
+			list_splice_init(&ctx->io_buffers_comp,
+						&ctx->io_buffers_cache);
+			spin_unlock(&ctx->completion_lock);
+			return 0;
+		}
+		spin_unlock(&ctx->completion_lock);
+	}
+
+	/*
+	 * No free buffers and no completion entries either. Allocate a new
+	 * page worth of buffer entries and add those to our freelist.
+	 */
+	page = alloc_page(GFP_KERNEL_ACCOUNT);
+	if (!page)
+		return -ENOMEM;
+
+	list_add(&page->lru, &ctx->io_buffers_pages);
+
+	buf = page_address(page);
+	bufs_in_page = PAGE_SIZE / sizeof(*buf);
+	while (bufs_in_page) {
+		list_add_tail(&buf->list, &ctx->io_buffers_cache);
+		buf++;
+		bufs_in_page--;
+	}
+
+	return 0;
+}
+
+static int io_add_buffers(struct io_ring_ctx *ctx, struct io_provide_buf *pbuf,
+			  struct io_buffer_list *bl)
 {
 	struct io_buffer *buf;
 	u64 addr = pbuf->addr;
 	int i, bid = pbuf->bid;
 
 	for (i = 0; i < pbuf->nbufs; i++) {
-		buf = kmalloc(sizeof(*buf), GFP_KERNEL_ACCOUNT);
-		if (!buf)
+		if (list_empty(&ctx->io_buffers_cache) &&
+		    io_refill_buffer_cache(ctx))
 			break;
-
+		buf = list_first_entry(&ctx->io_buffers_cache, struct io_buffer,
+					list);
+		list_move_tail(&buf->list, &bl->buf_list);
 		buf->addr = addr;
 		buf->len = min_t(__u32, pbuf->len, MAX_RW_COUNT);
 		buf->bid = bid;
+		buf->bgid = pbuf->bgid;
 		addr += pbuf->len;
 		bid++;
-		if (!*head) {
-			INIT_LIST_HEAD(&buf->list);
-			*head = buf;
-		} else {
-			list_add_tail(&buf->list, &(*head)->list);
-		}
 		cond_resched();
 	}
 
-	return i ? i : -ENOMEM;
+	return i ? 0 : -ENOMEM;
 }
 
 static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
 {
 	struct io_provide_buf *p = &req->pbuf;
 	struct io_ring_ctx *ctx = req->ctx;
-	struct io_buffer *head, *list;
+	struct io_buffer_list *bl;
 	int ret = 0;
 	bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
 
@@ -4585,14 +4859,18 @@ static int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
 
 	lockdep_assert_held(&ctx->uring_lock);
 
-	list = head = xa_load(&ctx->io_buffers, p->bgid);
-
-	ret = io_add_buffers(p, &head);
-	if (ret >= 0 && !list) {
-		ret = xa_insert(&ctx->io_buffers, p->bgid, head, GFP_KERNEL);
-		if (ret < 0)
-			__io_remove_buffers(ctx, head, p->bgid, -1U);
+	bl = io_buffer_get_list(ctx, p->bgid);
+	if (unlikely(!bl)) {
+		bl = kmalloc(sizeof(*bl), GFP_KERNEL);
+		if (!bl) {
+			ret = -ENOMEM;
+			goto err;
+		}
+		io_buffer_add_list(ctx, bl, p->bgid);
 	}
+
+	ret = io_add_buffers(ctx, p, bl);
+err:
 	if (ret < 0)
 		req_set_fail(req);
 	/* complete before unlock, IOPOLL may need the lock */
@@ -5184,7 +5462,7 @@ static int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
 	if (kmsg->free_iov)
 		kfree(kmsg->free_iov);
 	req->flags &= ~REQ_F_NEED_CLEANUP;
-	__io_req_complete(req, issue_flags, ret, io_put_kbuf(req));
+	__io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags));
 	return 0;
 }
 
@@ -5239,7 +5517,8 @@ static int io_recv(struct io_kiocb *req, unsigned int issue_flags)
 out_free:
 		req_set_fail(req);
 	}
-	__io_req_complete(req, issue_flags, ret, io_put_kbuf(req));
+
+	__io_req_complete(req, issue_flags, ret, io_put_kbuf(req, issue_flags));
 	return 0;
 }
 
@@ -5258,8 +5537,7 @@ static int io_accept_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	accept->nofile = rlimit(RLIMIT_NOFILE);
 
 	accept->file_slot = READ_ONCE(sqe->file_index);
-	if (accept->file_slot && ((req->open.how.flags & O_CLOEXEC) ||
-				  (accept->flags & SOCK_CLOEXEC)))
+	if (accept->file_slot && (accept->flags & SOCK_CLOEXEC))
 		return -EINVAL;
 	if (accept->flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK))
 		return -EINVAL;
@@ -5399,6 +5677,108 @@ IO_NETOP_FN(send);
 IO_NETOP_FN(recv);
 #endif /* CONFIG_NET */
 
+#ifdef CONFIG_NET_RX_BUSY_POLL
+
+#define NAPI_TIMEOUT			(60 * SEC_CONVERSION)
+
+struct napi_entry {
+	struct list_head	list;
+	unsigned int		napi_id;
+	unsigned long		timeout;
+};
+
+/*
+ * Add busy poll NAPI ID from sk.
+ */
+static void io_add_napi(struct file *file, struct io_ring_ctx *ctx)
+{
+	unsigned int napi_id;
+	struct socket *sock;
+	struct sock *sk;
+	struct napi_entry *ne;
+
+	if (!net_busy_loop_on())
+		return;
+
+	sock = sock_from_file(file);
+	if (!sock)
+		return;
+
+	sk = sock->sk;
+	if (!sk)
+		return;
+
+	napi_id = READ_ONCE(sk->sk_napi_id);
+
+	/* Non-NAPI IDs can be rejected */
+	if (napi_id < MIN_NAPI_ID)
+		return;
+
+	spin_lock(&ctx->napi_lock);
+	list_for_each_entry(ne, &ctx->napi_list, list) {
+		if (ne->napi_id == napi_id) {
+			ne->timeout = jiffies + NAPI_TIMEOUT;
+			goto out;
+		}
+	}
+
+	ne = kmalloc(sizeof(*ne), GFP_NOWAIT);
+	if (!ne)
+		goto out;
+
+	ne->napi_id = napi_id;
+	ne->timeout = jiffies + NAPI_TIMEOUT;
+	list_add_tail(&ne->list, &ctx->napi_list);
+out:
+	spin_unlock(&ctx->napi_lock);
+}
+
+static inline void io_check_napi_entry_timeout(struct napi_entry *ne)
+{
+	if (time_after(jiffies, ne->timeout)) {
+		list_del(&ne->list);
+		kfree(ne);
+	}
+}
+
+/*
+ * Busy poll if globally on and supporting sockets found
+ */
+static bool io_napi_busy_loop(struct list_head *napi_list)
+{
+	struct napi_entry *ne, *n;
+
+	list_for_each_entry_safe(ne, n, napi_list, list) {
+		napi_busy_loop(ne->napi_id, NULL, NULL, true,
+			       BUSY_POLL_BUDGET);
+		io_check_napi_entry_timeout(ne);
+	}
+	return !list_empty(napi_list);
+}
+
+static void io_free_napi_list(struct io_ring_ctx *ctx)
+{
+	spin_lock(&ctx->napi_lock);
+	while (!list_empty(&ctx->napi_list)) {
+		struct napi_entry *ne =
+			list_first_entry(&ctx->napi_list, struct napi_entry,
+					 list);
+
+		list_del(&ne->list);
+		kfree(ne);
+	}
+	spin_unlock(&ctx->napi_lock);
+}
+#else
+static inline void io_add_napi(struct file *file, struct io_ring_ctx *ctx)
+{
+}
+
+static inline void io_free_napi_list(struct io_ring_ctx *ctx)
+{
+}
+#endif /* CONFIG_NET_RX_BUSY_POLL */
+
 struct io_poll_table {
 	struct poll_table_struct pt;
 	struct io_kiocb *req;
@@ -5474,8 +5854,12 @@ static inline void io_poll_remove_entry(struct io_poll_iocb *poll)
 
 static void io_poll_remove_entries(struct io_kiocb *req)
 {
-	struct io_poll_iocb *poll = io_poll_get_single(req);
-	struct io_poll_iocb *poll_double = io_poll_get_double(req);
+	/*
+	 * Nothing to do if neither of those flags are set. Avoid dipping
+	 * into the poll/apoll/double cachelines if we can.
+	 */
+	if (!(req->flags & (REQ_F_SINGLE_POLL | REQ_F_DOUBLE_POLL)))
+		return;
 
 	/*
 	 * While we hold the waitqueue lock and the waitqueue is nonempty,
@@ -5493,9 +5877,10 @@ static void io_poll_remove_entries(struct io_kiocb *req)
 	 * In that case, only RCU prevents the queue memory from being freed.
 	 */
 	rcu_read_lock();
-	io_poll_remove_entry(poll);
-	if (poll_double)
-		io_poll_remove_entry(poll_double);
+	if (req->flags & REQ_F_SINGLE_POLL)
+		io_poll_remove_entry(io_poll_get_single(req));
+	if (req->flags & REQ_F_DOUBLE_POLL)
+		io_poll_remove_entry(io_poll_get_double(req));
 	rcu_read_unlock();
 }
 
@@ -5527,13 +5912,13 @@ static int io_poll_check_events(struct io_kiocb *req)
 			return -ECANCELED;
 
 		if (!req->result) {
-			struct poll_table_struct pt = { ._key = poll->events };
+			struct poll_table_struct pt = { ._key = req->cflags };
 
-			req->result = vfs_poll(req->file, &pt) & poll->events;
+			req->result = vfs_poll(req->file, &pt) & req->cflags;
 		}
 
 		/* multishot, just fill an CQE and proceed */
-		if (req->result && !(poll->events & EPOLLONESHOT)) {
+		if (req->result && !(req->cflags & EPOLLONESHOT)) {
 			__poll_t mask = mangle_poll(req->result & poll->events);
 			bool filled;
 
@@ -5545,6 +5930,7 @@ static int io_poll_check_events(struct io_kiocb *req)
 			if (unlikely(!filled))
 				return -ECANCELED;
 			io_cqring_ev_posted(ctx);
+			io_add_napi(req->file, ctx);
 		} else if (req->result) {
 			return 0;
 		}
@@ -5603,29 +5989,36 @@ static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
 		io_req_complete_failed(req, ret);
 }
 
-static void __io_poll_execute(struct io_kiocb *req, int mask)
+static void __io_poll_execute(struct io_kiocb *req, int mask, int events)
 {
 	req->result = mask;
+	/*
+	 * This is useful for poll that is armed on behalf of another
+	 * request, and where the wakeup path could be on a different
+	 * CPU. We want to avoid pulling in req->apoll->events for that
+	 * case.
+	 */
+	req->cflags = events;
 	if (req->opcode == IORING_OP_POLL_ADD)
 		req->io_task_work.func = io_poll_task_func;
 	else
 		req->io_task_work.func = io_apoll_task_func;
 
-	trace_io_uring_task_add(req->ctx, req->opcode, req->user_data, mask);
+	trace_io_uring_task_add(req->ctx, req, req->user_data, req->opcode, mask);
 	io_req_task_work_add(req, false);
 }
 
-static inline void io_poll_execute(struct io_kiocb *req, int res)
+static inline void io_poll_execute(struct io_kiocb *req, int res, int events)
 {
 	if (io_poll_get_ownership(req))
-		__io_poll_execute(req, res);
+		__io_poll_execute(req, res, events);
 }
 
 static void io_poll_cancel_req(struct io_kiocb *req)
 {
 	io_poll_mark_cancelled(req);
 	/* kick tw, which should complete the request */
-	io_poll_execute(req, 0);
+	io_poll_execute(req, 0, 0);
 }
 
 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
@@ -5639,7 +6032,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 	if (unlikely(mask & POLLFREE)) {
 		io_poll_mark_cancelled(req);
 		/* we have to kick tw in case it's not already */
-		io_poll_execute(req, 0);
+		io_poll_execute(req, 0, poll->events);
 
 		/*
 		 * If the waitqueue is being freed early but someone is already
@@ -5669,8 +6062,9 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
 		if (mask && poll->events & EPOLLONESHOT) {
 			list_del_init(&poll->wait.entry);
 			poll->head = NULL;
+			req->flags &= ~REQ_F_SINGLE_POLL;
 		}
-		__io_poll_execute(req, mask);
+		__io_poll_execute(req, mask, poll->events);
 	}
 	return 1;
 }
@@ -5705,12 +6099,14 @@ static void __io_queue_proc(struct io_poll_iocb *poll, struct io_poll_table *pt,
 			pt->error = -ENOMEM;
 			return;
 		}
+		req->flags |= REQ_F_DOUBLE_POLL;
 		io_init_poll_iocb(poll, first->events, first->wait.func);
 		*poll_ptr = poll;
 		if (req->opcode == IORING_OP_POLL_ADD)
 			req->flags |= REQ_F_ASYNC_DATA;
 	}
 
+	req->flags |= REQ_F_SINGLE_POLL;
 	pt->nr_entries++;
 	poll->head = head;
 	poll->wait.private = req;
@@ -5774,9 +6170,10 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
 		/* can't multishot if failed, just queue the event we've got */
 		if (unlikely(ipt->error || !ipt->nr_entries))
 			poll->events |= EPOLLONESHOT;
-		__io_poll_execute(req, mask);
+		__io_poll_execute(req, mask, poll->events);
 		return 0;
 	}
+	io_add_napi(req->file, req->ctx);
 
 	/*
 	 * Release ownership. If someone tried to queue a tw while it was
@@ -5784,7 +6181,7 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
 	 */
 	v = atomic_dec_return(&req->poll_refs);
 	if (unlikely(v & IO_POLL_REF_MASK))
-		__io_poll_execute(req, 0);
+		__io_poll_execute(req, 0, poll->events);
 	return 0;
 }
 
@@ -5803,7 +6200,7 @@ enum {
 	IO_APOLL_READY
 };
 
-static int io_arm_poll_handler(struct io_kiocb *req)
+static int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
 {
 	const struct io_op_def *def = &io_op_defs[req->opcode];
 	struct io_ring_ctx *ctx = req->ctx;
@@ -5828,9 +6225,16 @@ static int io_arm_poll_handler(struct io_kiocb *req)
 		mask |= POLLOUT | POLLWRNORM;
 	}
 
-	apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
-	if (unlikely(!apoll))
-		return IO_APOLL_ABORTED;
+	if (!(issue_flags & IO_URING_F_UNLOCKED) &&
+	    !list_empty(&ctx->apoll_cache)) {
+		apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
+						poll.wait.entry);
+		list_del_init(&apoll->poll.wait.entry);
+	} else {
+		apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
+		if (unlikely(!apoll))
+			return IO_APOLL_ABORTED;
+	}
 	apoll->double_poll = NULL;
 	req->apoll = apoll;
 	req->flags |= REQ_F_POLLED;
@@ -5840,7 +6244,7 @@ static int io_arm_poll_handler(struct io_kiocb *req)
 	if (ret || ipt.error)
 		return ret ? IO_APOLL_READY : IO_APOLL_ABORTED;
 
-	trace_io_uring_poll_arm(ctx, req, req->opcode, req->user_data,
+	trace_io_uring_poll_arm(ctx, req, req->user_data, req->opcode,
 				mask, apoll->poll.events);
 	return IO_APOLL_OK;
 }
@@ -5975,7 +6379,7 @@ static int io_poll_add_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe
 		return -EINVAL;
 
 	io_req_set_refcount(req);
-	poll->events = io_poll_parse_events(sqe, flags);
+	req->cflags = poll->events = io_poll_parse_events(sqe, flags);
 	return 0;
 }
 
@@ -6092,10 +6496,7 @@ static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
 
 	if (IS_ERR(req))
 		return PTR_ERR(req);
-
-	req_set_fail(req);
-	io_fill_cqe_req(req, -ECANCELED, 0);
-	io_put_req_deferred(req);
+	io_req_task_queue_fail(req, -ECANCELED);
 	return 0;
 }
 
@@ -6568,6 +6969,8 @@ static int io_req_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 		return io_symlinkat_prep(req, sqe);
 	case IORING_OP_LINKAT:
 		return io_linkat_prep(req, sqe);
+	case IORING_OP_MSG_RING:
+		return io_msg_ring_prep(req, sqe);
 	}
 
 	printk_once(KERN_WARNING "io_uring: unhandled opcode %d\n",
@@ -6649,7 +7052,7 @@ fail:
 		goto queue;
 	}
 
-	trace_io_uring_defer(ctx, req, req->user_data);
+	trace_io_uring_defer(ctx, req, req->user_data, req->opcode);
 	de->req = req;
 	de->seq = seq;
 	list_add_tail(&de->list, &ctx->defer_list);
@@ -6659,7 +7062,7 @@ fail:
 static void io_clean_op(struct io_kiocb *req)
 {
 	if (req->flags & REQ_F_BUFFER_SELECTED)
-		io_put_kbuf(req);
+		io_put_kbuf_comp(req);
 
 	if (req->flags & REQ_F_NEED_CLEANUP) {
 		switch (req->opcode) {
@@ -6851,6 +7254,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
 	case IORING_OP_LINKAT:
 		ret = io_linkat(req, issue_flags);
 		break;
+	case IORING_OP_MSG_RING:
+		ret = io_msg_ring(req, issue_flags);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
@@ -6926,7 +7332,7 @@ static void io_wq_submit_work(struct io_wq_work *work)
 			continue;
 		}
 
-		if (io_arm_poll_handler(req) == IO_APOLL_OK)
+		if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
 			return;
 		/* aborted or ready, in either case retry blocking */
 		needs_poll = false;
@@ -6983,7 +7389,7 @@ static struct file *io_file_get_normal(struct io_ring_ctx *ctx,
 {
 	struct file *file = fget(fd);
 
-	trace_io_uring_file_get(ctx, fd);
+	trace_io_uring_file_get(ctx, req, req->user_data, fd);
 
 	/* we don't allow fixed io_uring files */
 	if (file && unlikely(file->f_op == &io_uring_fops))
@@ -7072,7 +7478,7 @@ static void io_queue_sqe_arm_apoll(struct io_kiocb *req)
 {
 	struct io_kiocb *linked_timeout = io_prep_linked_timeout(req);
 
-	switch (io_arm_poll_handler(req)) {
+	switch (io_arm_poll_handler(req, 0)) {
 	case IO_APOLL_READY:
 		io_req_task_queue(req);
 		break;
@@ -7081,8 +7487,12 @@ static void io_queue_sqe_arm_apoll(struct io_kiocb *req)
 		 * Queued up for async execution, worker will release
 		 * submit reference when the iocb is actually submitted.
 		 */
+		io_kbuf_recycle(req);
 		io_queue_async_work(req, NULL);
 		break;
+	case IO_APOLL_OK:
+		io_kbuf_recycle(req);
+		break;
 	}
 
 	if (linked_timeout)
@@ -7281,7 +7691,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 
 	ret = io_init_req(ctx, req, sqe);
 	if (unlikely(ret)) {
-		trace_io_uring_req_failed(sqe, ret);
+		trace_io_uring_req_failed(sqe, ctx, req, ret);
 
 		/* fail even hard links since we don't submit */
 		if (link->head) {
@@ -7308,7 +7718,7 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	}
 
 	/* don't need @sqe from now on */
-	trace_io_uring_submit_sqe(ctx, req, req->opcode, req->user_data,
+	trace_io_uring_submit_sqe(ctx, req, req->user_data, req->opcode,
 				  req->flags, true,
 				  ctx->flags & IORING_SETUP_SQPOLL);
 
@@ -7451,8 +7861,14 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
 		}
 		/* will complete beyond this point, count as submitted */
 		submitted++;
-		if (io_submit_sqe(ctx, req, sqe))
-			break;
+		if (io_submit_sqe(ctx, req, sqe)) {
+			/*
+			 * Continue submitting even for sqe failure if the
+			 * ring was setup with IORING_SETUP_SUBMIT_ALL
+			 */
+			if (!(ctx->flags & IORING_SETUP_SUBMIT_ALL))
+				break;
+		}
 	} while (submitted < nr);
 
 	if (unlikely(submitted != nr)) {
@@ -7519,7 +7935,13 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
 		    !(ctx->flags & IORING_SETUP_R_DISABLED))
 			ret = io_submit_sqes(ctx, to_submit);
 		mutex_unlock(&ctx->uring_lock);
-
+#ifdef CONFIG_NET_RX_BUSY_POLL
+		spin_lock(&ctx->napi_lock);
+		if (!list_empty(&ctx->napi_list) &&
+		    io_napi_busy_loop(&ctx->napi_list))
+			++ret;
+		spin_unlock(&ctx->napi_lock);
+#endif
 		if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
 			wake_up(&ctx->sqo_sq_wait);
 		if (creds)
@@ -7650,6 +8072,9 @@ struct io_wait_queue {
 	struct io_ring_ctx *ctx;
 	unsigned cq_tail;
 	unsigned nr_timeouts;
+#ifdef CONFIG_NET_RX_BUSY_POLL
+	unsigned busy_poll_to;
+#endif
 };
 
 static inline bool io_should_wake(struct io_wait_queue *iowq)
@@ -7684,11 +8109,11 @@ static int io_run_task_work_sig(void)
 {
 	if (io_run_task_work())
 		return 1;
-	if (!signal_pending(current))
-		return 0;
 	if (test_thread_flag(TIF_NOTIFY_SIGNAL))
 		return -ERESTARTSYS;
-	return -EINTR;
+	if (task_sigpending(current))
+		return -EINTR;
+	return 0;
 }
 
 /* when returns >0, the caller should retry */
@@ -7711,6 +8136,87 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
 	return 1;
 }
 
+#ifdef CONFIG_NET_RX_BUSY_POLL
+static void io_adjust_busy_loop_timeout(struct timespec64 *ts,
+					struct io_wait_queue *iowq)
+{
+	unsigned busy_poll_to = READ_ONCE(sysctl_net_busy_poll);
+	struct timespec64 pollto = ns_to_timespec64(1000 * (s64)busy_poll_to);
+
+	if (timespec64_compare(ts, &pollto) > 0) {
+		*ts = timespec64_sub(*ts, pollto);
+		iowq->busy_poll_to = busy_poll_to;
+	} else {
+		u64 to = timespec64_to_ns(ts);
+
+		do_div(to, 1000);
+		iowq->busy_poll_to = to;
+		ts->tv_sec = 0;
+		ts->tv_nsec = 0;
+	}
+}
+
+static inline bool io_busy_loop_timeout(unsigned long start_time,
+					unsigned long bp_usec)
+{
+	if (bp_usec) {
+		unsigned long end_time = start_time + bp_usec;
+		unsigned long now = busy_loop_current_time();
+
+		return time_after(now, end_time);
+	}
+	return true;
+}
+
+static bool io_busy_loop_end(void *p, unsigned long start_time)
+{
+	struct io_wait_queue *iowq = p;
+
+	return signal_pending(current) ||
+	       io_should_wake(iowq) ||
+	       io_busy_loop_timeout(start_time, iowq->busy_poll_to);
+}
+
+static void io_blocking_napi_busy_loop(struct list_head *napi_list,
+				       struct io_wait_queue *iowq)
+{
+	unsigned long start_time =
+		list_is_singular(napi_list) ? 0 :
+		busy_loop_current_time();
+
+	do {
+		if (list_is_singular(napi_list)) {
+			struct napi_entry *ne =
+				list_first_entry(napi_list,
+						 struct napi_entry, list);
+
+			napi_busy_loop(ne->napi_id, io_busy_loop_end, iowq,
+				       true, BUSY_POLL_BUDGET);
+			io_check_napi_entry_timeout(ne);
+			break;
+		}
+	} while (io_napi_busy_loop(napi_list) &&
+		 !io_busy_loop_end(iowq, start_time));
+}
+
+static void io_putback_napi_list(struct io_ring_ctx *ctx,
+				 struct list_head *napi_list)
+{
+	struct napi_entry *cne, *lne;
+
+	spin_lock(&ctx->napi_lock);
+	list_for_each_entry(cne, &ctx->napi_list, list)
+		list_for_each_entry(lne, napi_list, list)
+			if (cne->napi_id == lne->napi_id) {
+				list_del(&lne->list);
+				kfree(lne);
+				break;
+			}
+	list_splice(napi_list, &ctx->napi_list);
+	spin_unlock(&ctx->napi_lock);
+}
+#endif /* CONFIG_NET_RX_BUSY_POLL */
+
 /*
  * Wait until events become available, if we don't already have some. The
  * application must reap them itself, as they reside on the shared cq ring.
@@ -7723,6 +8229,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 	struct io_rings *rings = ctx->rings;
 	ktime_t timeout = KTIME_MAX;
 	int ret;
+#ifdef CONFIG_NET_RX_BUSY_POLL
+	LIST_HEAD(local_napi_list);
+#endif
 
 	do {
 		io_cqring_overflow_flush(ctx);
@@ -7732,14 +8241,6 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			break;
 	} while (1);
 
-	if (uts) {
-		struct timespec64 ts;
-
-		if (get_timespec64(&ts, uts))
-			return -EFAULT;
-		timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
-	}
-
 	if (sig) {
 #ifdef CONFIG_COMPAT
 		if (in_compat_syscall())
@@ -7753,6 +8254,30 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			return ret;
 	}
 
+#ifdef CONFIG_NET_RX_BUSY_POLL
+	iowq.busy_poll_to = 0;
+	if (!(ctx->flags & IORING_SETUP_SQPOLL)) {
+		spin_lock(&ctx->napi_lock);
+		list_splice_init(&ctx->napi_list, &local_napi_list);
+		spin_unlock(&ctx->napi_lock);
+	}
+#endif
+	if (uts) {
+		struct timespec64 ts;
+
+		if (get_timespec64(&ts, uts))
+			return -EFAULT;
+#ifdef CONFIG_NET_RX_BUSY_POLL
+		if (!list_empty(&local_napi_list))
+			io_adjust_busy_loop_timeout(&ts, &iowq);
+#endif
+		timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
+	}
+#ifdef CONFIG_NET_RX_BUSY_POLL
+	else if (!list_empty(&local_napi_list))
+		iowq.busy_poll_to = READ_ONCE(sysctl_net_busy_poll);
+#endif
+
 	init_waitqueue_func_entry(&iowq.wq, io_wake_function);
 	iowq.wq.private = current;
 	INIT_LIST_HEAD(&iowq.wq.entry);
@@ -7761,6 +8286,12 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 	iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
 
 	trace_io_uring_cqring_wait(ctx, min_events);
+#ifdef CONFIG_NET_RX_BUSY_POLL
+	if (iowq.busy_poll_to)
+		io_blocking_napi_busy_loop(&local_napi_list, &iowq);
+	if (!list_empty(&local_napi_list))
+		io_putback_napi_list(ctx, &local_napi_list);
+#endif
 	do {
 		/* if we can't even flush overflow, don't wait for more */
 		if (!io_cqring_overflow_flush(ctx)) {
@@ -8749,8 +9280,16 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
 	if (unlikely(!tctx))
 		return -ENOMEM;
 
+	tctx->registered_rings = kcalloc(IO_RINGFD_REG_MAX,
+					 sizeof(struct file *), GFP_KERNEL);
+	if (unlikely(!tctx->registered_rings)) {
+		kfree(tctx);
+		return -ENOMEM;
+	}
+
 	ret = percpu_counter_init(&tctx->inflight, 0, GFP_KERNEL);
 	if (unlikely(ret)) {
+		kfree(tctx->registered_rings);
 		kfree(tctx);
 		return ret;
 	}
@@ -8759,6 +9298,7 @@ static __cold int io_uring_alloc_task_context(struct task_struct *task,
 	if (IS_ERR(tctx->io_wq)) {
 		ret = PTR_ERR(tctx->io_wq);
 		percpu_counter_destroy(&tctx->inflight);
+		kfree(tctx->registered_rings);
 		kfree(tctx);
 		return ret;
 	}
@@ -8783,6 +9323,7 @@ void __io_uring_free(struct task_struct *tsk)
 	WARN_ON_ONCE(tctx->io_wq);
 	WARN_ON_ONCE(tctx->cached_refs);
 
+	kfree(tctx->registered_rings);
 	percpu_counter_destroy(&tctx->inflight);
 	kfree(tctx);
 	tsk->io_uring = NULL;
@@ -9359,33 +9900,55 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
 	return done ? done : err;
 }
 
-static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
+static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
+			       unsigned int eventfd_async)
 {
+	struct io_ev_fd *ev_fd;
 	__s32 __user *fds = arg;
 	int fd;
 
-	if (ctx->cq_ev_fd)
+	ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
+					lockdep_is_held(&ctx->uring_lock));
+	if (ev_fd)
 		return -EBUSY;
 
 	if (copy_from_user(&fd, fds, sizeof(*fds)))
 		return -EFAULT;
 
-	ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
-	if (IS_ERR(ctx->cq_ev_fd)) {
-		int ret = PTR_ERR(ctx->cq_ev_fd);
+	ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL);
+	if (!ev_fd)
+		return -ENOMEM;
 
-		ctx->cq_ev_fd = NULL;
+	ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd);
+	if (IS_ERR(ev_fd->cq_ev_fd)) {
+		int ret = PTR_ERR(ev_fd->cq_ev_fd);
+		kfree(ev_fd);
 		return ret;
 	}
-
+	ev_fd->eventfd_async = eventfd_async;
+	ctx->has_evfd = true;
+	rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
 	return 0;
 }
 
+static void io_eventfd_put(struct rcu_head *rcu)
+{
+	struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
+
+	eventfd_ctx_put(ev_fd->cq_ev_fd);
+	kfree(ev_fd);
+}
+
 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 {
-	if (ctx->cq_ev_fd) {
-		eventfd_ctx_put(ctx->cq_ev_fd);
-		ctx->cq_ev_fd = NULL;
+	struct io_ev_fd *ev_fd;
+
+	ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
+					lockdep_is_held(&ctx->uring_lock));
+	if (ev_fd) {
+		ctx->has_evfd = false;
+		rcu_assign_pointer(ctx->io_ev_fd, NULL);
+		call_rcu(&ev_fd->rcu, io_eventfd_put);
 		return 0;
 	}
 
@@ -9394,11 +9957,28 @@ static int io_eventfd_unregister(struct io_ring_ctx *ctx)
 
 static void io_destroy_buffers(struct io_ring_ctx *ctx)
 {
-	struct io_buffer *buf;
-	unsigned long index;
+	int i;
+
+	for (i = 0; i < (1U << IO_BUFFERS_HASH_BITS); i++) {
+		struct list_head *list = &ctx->io_buffers[i];
 
-	xa_for_each(&ctx->io_buffers, index, buf)
-		__io_remove_buffers(ctx, buf, index, -1U);
+		while (!list_empty(list)) {
+			struct io_buffer_list *bl;
+
+			bl = list_first_entry(list, struct io_buffer_list, list);
+			__io_remove_buffers(ctx, bl, -1U);
+			list_del(&bl->list);
+			kfree(bl);
+		}
+	}
+
+	while (!list_empty(&ctx->io_buffers_pages)) {
+		struct page *page;
+
+		page = list_first_entry(&ctx->io_buffers_pages, struct page, lru);
+		list_del_init(&page->lru);
+		__free_page(page);
+	}
 }
 
 static void io_req_caches_free(struct io_ring_ctx *ctx)
@@ -9429,6 +10009,18 @@ static void io_wait_rsrc_data(struct io_rsrc_data *data)
 		wait_for_completion(&data->done);
 }
 
+static void io_flush_apoll_cache(struct io_ring_ctx *ctx)
+{
+	struct async_poll *apoll;
+
+	while (!list_empty(&ctx->apoll_cache)) {
+		apoll = list_first_entry(&ctx->apoll_cache, struct async_poll,
+						poll.wait.entry);
+		list_del(&apoll->poll.wait.entry);
+		kfree(apoll);
+	}
+}
+
 static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 {
 	io_sq_thread_finish(ctx);
@@ -9450,8 +10042,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 		__io_sqe_files_unregister(ctx);
 	if (ctx->rings)
 		__io_cqring_overflow_flush(ctx, true);
-	mutex_unlock(&ctx->uring_lock);
 	io_eventfd_unregister(ctx);
+	io_flush_apoll_cache(ctx);
+	mutex_unlock(&ctx->uring_lock);
 	io_destroy_buffers(ctx);
 	if (ctx->sq_creds)
 		put_cred(ctx->sq_creds);
@@ -9483,8 +10076,10 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
 	io_req_caches_free(ctx);
 	if (ctx->hash_map)
 		io_wq_put_hash(ctx->hash_map);
+	io_free_napi_list(ctx);
 	kfree(ctx->cancel_hash);
 	kfree(ctx->dummy_ubuf);
+	kfree(ctx->io_buffers);
 	kfree(ctx);
 }
 
@@ -9983,6 +10578,139 @@ void __io_uring_cancel(bool cancel_all)
 	io_uring_cancel_generic(cancel_all, NULL);
 }
 
+void io_uring_unreg_ringfd(void)
+{
+	struct io_uring_task *tctx = current->io_uring;
+	int i;
+
+	for (i = 0; i < IO_RINGFD_REG_MAX; i++) {
+		if (tctx->registered_rings[i]) {
+			fput(tctx->registered_rings[i]);
+			tctx->registered_rings[i] = NULL;
+		}
+	}
+}
+
+static int io_ring_add_registered_fd(struct io_uring_task *tctx, int fd,
+				     int start, int end)
+{
+	struct file *file;
+	int offset;
+
+	for (offset = start; offset < end; offset++) {
+		offset = array_index_nospec(offset, IO_RINGFD_REG_MAX);
+		if (tctx->registered_rings[offset])
+			continue;
+
+		file = fget(fd);
+		if (!file) {
+			return -EBADF;
+		} else if (file->f_op != &io_uring_fops) {
+			fput(file);
+			return -EOPNOTSUPP;
+		}
+		tctx->registered_rings[offset] = file;
+		return offset;
+	}
+
+	return -EBUSY;
+}
+
+/*
+ * Register a ring fd to avoid fdget/fdput for each io_uring_enter()
+ * invocation. User passes in an array of struct io_uring_rsrc_update
+ * with ->data set to the ring_fd, and ->offset given for the desired
+ * index. If no index is desired, application may set ->offset == -1U
+ * and we'll find an available index. Returns number of entries
+ * successfully processed, or < 0 on error if none were processed.
+ */
+static int io_ringfd_register(struct io_ring_ctx *ctx, void __user *__arg,
+			      unsigned nr_args)
+{
+	struct io_uring_rsrc_update __user *arg = __arg;
+	struct io_uring_rsrc_update reg;
+	struct io_uring_task *tctx;
+	int ret, i;
+
+	if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
+		return -EINVAL;
+
+	mutex_unlock(&ctx->uring_lock);
+	ret = io_uring_add_tctx_node(ctx);
+	mutex_lock(&ctx->uring_lock);
+	if (ret)
+		return ret;
+
+	tctx = current->io_uring;
+	for (i = 0; i < nr_args; i++) {
+		int start, end;
+
+		if (copy_from_user(&reg, &arg[i], sizeof(reg))) {
+			ret = -EFAULT;
+			break;
+		}
+
+		if (reg.offset == -1U) {
+			start = 0;
+			end = IO_RINGFD_REG_MAX;
+		} else {
+			if (reg.offset >= IO_RINGFD_REG_MAX) {
+				ret = -EINVAL;
+				break;
+			}
+			start = reg.offset;
+			end = start + 1;
+		}
+
+		ret = io_ring_add_registered_fd(tctx, reg.data, start, end);
+		if (ret < 0)
+			break;
+
+		reg.offset = ret;
+		if (copy_to_user(&arg[i], &reg, sizeof(reg))) {
+			fput(tctx->registered_rings[reg.offset]);
+			tctx->registered_rings[reg.offset] = NULL;
+			ret = -EFAULT;
+			break;
+		}
+	}
+
+	return i ? i : ret;
+}
+
+static int io_ringfd_unregister(struct io_ring_ctx *ctx, void __user *__arg,
+				unsigned nr_args)
+{
+	struct io_uring_rsrc_update __user *arg = __arg;
+	struct io_uring_task *tctx = current->io_uring;
+	struct io_uring_rsrc_update reg;
+	int ret = 0, i;
+
+	if (!nr_args || nr_args > IO_RINGFD_REG_MAX)
+		return -EINVAL;
+	if (!tctx)
+		return 0;
+
+	for (i = 0; i < nr_args; i++) {
+		if (copy_from_user(&reg, &arg[i], sizeof(reg))) {
+			ret = -EFAULT;
+			break;
+		}
+		if (reg.offset >= IO_RINGFD_REG_MAX) {
+			ret = -EINVAL;
+			break;
+		}
+
+		reg.offset = array_index_nospec(reg.offset, IO_RINGFD_REG_MAX);
+		if (tctx->registered_rings[reg.offset]) {
+			fput(tctx->registered_rings[reg.offset]);
+			tctx->registered_rings[reg.offset] = NULL;
+		}
+	}
+
+	return i ? i : ret;
+}
+
 static void *io_uring_validate_mmap_request(struct file *file,
 					    loff_t pgoff, size_t sz)
 {
@@ -10113,12 +10841,28 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 	io_run_task_work();
 
 	if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
-			       IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG)))
+			       IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
+			       IORING_ENTER_REGISTERED_RING)))
 		return -EINVAL;
 
-	f = fdget(fd);
-	if (unlikely(!f.file))
-		return -EBADF;
+	/*
+	 * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
+	 * need only dereference our task private array to find it.
+	 */
+	if (flags & IORING_ENTER_REGISTERED_RING) {
+		struct io_uring_task *tctx = current->io_uring;
+
+		if (!tctx || fd >= IO_RINGFD_REG_MAX)
+			return -EINVAL;
+		fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
+		f.file = tctx->registered_rings[fd];
+		if (unlikely(!f.file))
+			return -EBADF;
+	} else {
+		f = fdget(fd);
+		if (unlikely(!f.file))
+			return -EBADF;
+	}
 
 	ret = -EOPNOTSUPP;
 	if (unlikely(f.file->f_op != &io_uring_fops))
@@ -10192,7 +10936,8 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
 out:
 	percpu_ref_put(&ctx->refs);
 out_fput:
-	fdput(f);
+	if (!(flags & IORING_ENTER_REGISTERED_RING))
+		fdput(f);
 	return submitted ? submitted : ret;
 }
 
@@ -10610,7 +11355,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
 	if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
 			IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
 			IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
-			IORING_SETUP_R_DISABLED))
+			IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL))
 		return -EINVAL;
 
 	return  io_uring_create(entries, &p, params);
@@ -10960,61 +11705,6 @@ err:
 	return ret;
 }
 
-static bool io_register_op_must_quiesce(int op)
-{
-	switch (op) {
-	case IORING_REGISTER_BUFFERS:
-	case IORING_UNREGISTER_BUFFERS:
-	case IORING_REGISTER_FILES:
-	case IORING_UNREGISTER_FILES:
-	case IORING_REGISTER_FILES_UPDATE:
-	case IORING_REGISTER_PROBE:
-	case IORING_REGISTER_PERSONALITY:
-	case IORING_UNREGISTER_PERSONALITY:
-	case IORING_REGISTER_FILES2:
-	case IORING_REGISTER_FILES_UPDATE2:
-	case IORING_REGISTER_BUFFERS2:
-	case IORING_REGISTER_BUFFERS_UPDATE:
-	case IORING_REGISTER_IOWQ_AFF:
-	case IORING_UNREGISTER_IOWQ_AFF:
-	case IORING_REGISTER_IOWQ_MAX_WORKERS:
-		return false;
-	default:
-		return true;
-	}
-}
-
-static __cold int io_ctx_quiesce(struct io_ring_ctx *ctx)
-{
-	long ret;
-
-	percpu_ref_kill(&ctx->refs);
-
-	/*
-	 * Drop uring mutex before waiting for references to exit. If another
-	 * thread is currently inside io_uring_enter() it might need to grab the
-	 * uring_lock to make progress. If we hold it here across the drain
-	 * wait, then we can deadlock. It's safe to drop the mutex here, since
-	 * no new references will come in after we've killed the percpu ref.
-	 */
-	mutex_unlock(&ctx->uring_lock);
-	do {
-		ret = wait_for_completion_interruptible_timeout(&ctx->ref_comp, HZ);
-		if (ret) {
-			ret = min(0L, ret);
-			break;
-		}
-
-		ret = io_run_task_work_sig();
-		io_req_caches_free(ctx);
-	} while (ret >= 0);
-	mutex_lock(&ctx->uring_lock);
-
-	if (ret)
-		io_refs_resurrect(&ctx->refs, &ctx->ref_comp);
-	return ret;
-}
-
 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			       void __user *arg, unsigned nr_args)
 	__releases(ctx->uring_lock)
@@ -11038,12 +11728,6 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			return -EACCES;
 	}
 
-	if (io_register_op_must_quiesce(opcode)) {
-		ret = io_ctx_quiesce(ctx);
-		if (ret)
-			return ret;
-	}
-
 	switch (opcode) {
 	case IORING_REGISTER_BUFFERS:
 		ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
@@ -11067,17 +11751,16 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 		ret = io_register_files_update(ctx, arg, nr_args);
 		break;
 	case IORING_REGISTER_EVENTFD:
-	case IORING_REGISTER_EVENTFD_ASYNC:
 		ret = -EINVAL;
 		if (nr_args != 1)
 			break;
-		ret = io_eventfd_register(ctx, arg);
-		if (ret)
+		ret = io_eventfd_register(ctx, arg, 0);
+		break;
+	case IORING_REGISTER_EVENTFD_ASYNC:
+		ret = -EINVAL;
+		if (nr_args != 1)
 			break;
-		if (opcode == IORING_REGISTER_EVENTFD_ASYNC)
-			ctx->eventfd_async = 1;
-		else
-			ctx->eventfd_async = 0;
+		ret = io_eventfd_register(ctx, arg, 1);
 		break;
 	case IORING_UNREGISTER_EVENTFD:
 		ret = -EINVAL;
@@ -11144,16 +11827,17 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
 			break;
 		ret = io_register_iowq_max_workers(ctx, arg);
 		break;
+	case IORING_REGISTER_RING_FDS:
+		ret = io_ringfd_register(ctx, arg, nr_args);
+		break;
+	case IORING_UNREGISTER_RING_FDS:
+		ret = io_ringfd_unregister(ctx, arg, nr_args);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
 	}
 
-	if (io_register_op_must_quiesce(opcode)) {
-		/* bring the ctx back to life */
-		percpu_ref_reinit(&ctx->refs);
-		reinit_completion(&ctx->ref_comp);
-	}
 	return ret;
 }
 
@@ -11179,8 +11863,7 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
 	mutex_lock(&ctx->uring_lock);
 	ret = __io_uring_register(ctx, opcode, arg, nr_args);
 	mutex_unlock(&ctx->uring_lock);
-	trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
-							ctx->cq_ev_fd != NULL, ret);
+	trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
 out_fput:
 	fdput(f);
 	return ret;