summary refs log tree commit diff
path: root/fs/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/aio.c')
-rw-r--r--fs/aio.c174
1 files changed, 114 insertions, 60 deletions
diff --git a/fs/aio.c b/fs/aio.c
index 93fbcc0f5696..8d217ed04e6e 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -141,6 +141,7 @@ struct kioctx {
 
 	struct {
 		unsigned	tail;
+		unsigned	completed_events;
 		spinlock_t	completion_lock;
 	} ____cacheline_aligned_in_smp;
 
@@ -192,7 +193,6 @@ static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
 	}
 
 	file->f_flags = O_RDWR;
-	file->private_data = ctx;
 	return file;
 }
 
@@ -202,7 +202,7 @@ static struct dentry *aio_mount(struct file_system_type *fs_type,
 	static const struct dentry_operations ops = {
 		.d_dname	= simple_dname,
 	};
-	return mount_pseudo(fs_type, "aio:", NULL, &ops, 0xa10a10a1);
+	return mount_pseudo(fs_type, "aio:", NULL, &ops, AIO_RING_MAGIC);
 }
 
 /* aio_setup
@@ -556,8 +556,7 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
 	struct aio_ring *ring;
 
 	spin_lock(&mm->ioctx_lock);
-	rcu_read_lock();
-	table = rcu_dereference(mm->ioctx_table);
+	table = rcu_dereference_raw(mm->ioctx_table);
 
 	while (1) {
 		if (table)
@@ -565,7 +564,6 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
 				if (!table->table[i]) {
 					ctx->id = i;
 					table->table[i] = ctx;
-					rcu_read_unlock();
 					spin_unlock(&mm->ioctx_lock);
 
 					/* While kioctx setup is in progress,
@@ -579,8 +577,6 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
 				}
 
 		new_nr = (table ? table->nr : 1) * 4;
-
-		rcu_read_unlock();
 		spin_unlock(&mm->ioctx_lock);
 
 		table = kzalloc(sizeof(*table) + sizeof(struct kioctx *) *
@@ -591,8 +587,7 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
 		table->nr = new_nr;
 
 		spin_lock(&mm->ioctx_lock);
-		rcu_read_lock();
-		old = rcu_dereference(mm->ioctx_table);
+		old = rcu_dereference_raw(mm->ioctx_table);
 
 		if (!old) {
 			rcu_assign_pointer(mm->ioctx_table, table);
@@ -739,12 +734,9 @@ static int kill_ioctx(struct mm_struct *mm, struct kioctx *ctx,
 
 
 	spin_lock(&mm->ioctx_lock);
-	rcu_read_lock();
-	table = rcu_dereference(mm->ioctx_table);
-
+	table = rcu_dereference_raw(mm->ioctx_table);
 	WARN_ON(ctx != table->table[ctx->id]);
 	table->table[ctx->id] = NULL;
-	rcu_read_unlock();
 	spin_unlock(&mm->ioctx_lock);
 
 	/* percpu_ref_kill() will do the necessary call_rcu() */
@@ -793,40 +785,35 @@ EXPORT_SYMBOL(wait_on_sync_kiocb);
  */
 void exit_aio(struct mm_struct *mm)
 {
-	struct kioctx_table *table;
-	struct kioctx *ctx;
-	unsigned i = 0;
-
-	while (1) {
-		rcu_read_lock();
-		table = rcu_dereference(mm->ioctx_table);
-
-		do {
-			if (!table || i >= table->nr) {
-				rcu_read_unlock();
-				rcu_assign_pointer(mm->ioctx_table, NULL);
-				if (table)
-					kfree(table);
-				return;
-			}
+	struct kioctx_table *table = rcu_dereference_raw(mm->ioctx_table);
+	int i;
 
-			ctx = table->table[i++];
-		} while (!ctx);
+	if (!table)
+		return;
 
-		rcu_read_unlock();
+	for (i = 0; i < table->nr; ++i) {
+		struct kioctx *ctx = table->table[i];
+		struct completion requests_done =
+			COMPLETION_INITIALIZER_ONSTACK(requests_done);
 
+		if (!ctx)
+			continue;
 		/*
-		 * We don't need to bother with munmap() here -
-		 * exit_mmap(mm) is coming and it'll unmap everything.
-		 * Since aio_free_ring() uses non-zero ->mmap_size
-		 * as indicator that it needs to unmap the area,
-		 * just set it to 0; aio_free_ring() is the only
-		 * place that uses ->mmap_size, so it's safe.
+		 * We don't need to bother with munmap() here - exit_mmap(mm)
+		 * is coming and it'll unmap everything. And we simply can't,
+		 * this is not necessarily our ->mm.
+		 * Since kill_ioctx() uses non-zero ->mmap_size as indicator
+		 * that it needs to unmap the area, just set it to 0.
 		 */
 		ctx->mmap_size = 0;
+		kill_ioctx(mm, ctx, &requests_done);
 
-		kill_ioctx(mm, ctx, NULL);
+		/* Wait until all IO for the context are done. */
+		wait_for_completion(&requests_done);
 	}
+
+	RCU_INIT_POINTER(mm->ioctx_table, NULL);
+	kfree(table);
 }
 
 static void put_reqs_available(struct kioctx *ctx, unsigned nr)
@@ -834,10 +821,8 @@ static void put_reqs_available(struct kioctx *ctx, unsigned nr)
 	struct kioctx_cpu *kcpu;
 	unsigned long flags;
 
-	preempt_disable();
-	kcpu = this_cpu_ptr(ctx->cpu);
-
 	local_irq_save(flags);
+	kcpu = this_cpu_ptr(ctx->cpu);
 	kcpu->reqs_available += nr;
 
 	while (kcpu->reqs_available >= ctx->req_batch * 2) {
@@ -846,7 +831,6 @@ static void put_reqs_available(struct kioctx *ctx, unsigned nr)
 	}
 
 	local_irq_restore(flags);
-	preempt_enable();
 }
 
 static bool get_reqs_available(struct kioctx *ctx)
@@ -855,10 +839,8 @@ static bool get_reqs_available(struct kioctx *ctx)
 	bool ret = false;
 	unsigned long flags;
 
-	preempt_disable();
-	kcpu = this_cpu_ptr(ctx->cpu);
-
 	local_irq_save(flags);
+	kcpu = this_cpu_ptr(ctx->cpu);
 	if (!kcpu->reqs_available) {
 		int old, avail = atomic_read(&ctx->reqs_available);
 
@@ -878,10 +860,71 @@ static bool get_reqs_available(struct kioctx *ctx)
 	kcpu->reqs_available--;
 out:
 	local_irq_restore(flags);
-	preempt_enable();
 	return ret;
 }
 
+/* refill_reqs_available
+ *	Updates the reqs_available reference counts used for tracking the
+ *	number of free slots in the completion ring.  This can be called
+ *	from aio_complete() (to optimistically update reqs_available) or
+ *	from aio_get_req() (the we're out of events case).  It must be
+ *	called holding ctx->completion_lock.
+ */
+static void refill_reqs_available(struct kioctx *ctx, unsigned head,
+                                  unsigned tail)
+{
+	unsigned events_in_ring, completed;
+
+	/* Clamp head since userland can write to it. */
+	head %= ctx->nr_events;
+	if (head <= tail)
+		events_in_ring = tail - head;
+	else
+		events_in_ring = ctx->nr_events - (head - tail);
+
+	completed = ctx->completed_events;
+	if (events_in_ring < completed)
+		completed -= events_in_ring;
+	else
+		completed = 0;
+
+	if (!completed)
+		return;
+
+	ctx->completed_events -= completed;
+	put_reqs_available(ctx, completed);
+}
+
+/* user_refill_reqs_available
+ *	Called to refill reqs_available when aio_get_req() encounters an
+ *	out of space in the completion ring.
+ */
+static void user_refill_reqs_available(struct kioctx *ctx)
+{
+	spin_lock_irq(&ctx->completion_lock);
+	if (ctx->completed_events) {
+		struct aio_ring *ring;
+		unsigned head;
+
+		/* Access of ring->head may race with aio_read_events_ring()
+		 * here, but that's okay since whether we read the old version
+		 * or the new version, and either will be valid.  The important
+		 * part is that head cannot pass tail since we prevent
+		 * aio_complete() from updating tail by holding
+		 * ctx->completion_lock.  Even if head is invalid, the check
+		 * against ctx->completed_events below will make sure we do the
+		 * safe/right thing.
+		 */
+		ring = kmap_atomic(ctx->ring_pages[0]);
+		head = ring->head;
+		kunmap_atomic(ring);
+
+		refill_reqs_available(ctx, head, ctx->tail);
+	}
+
+	spin_unlock_irq(&ctx->completion_lock);
+}
+
 /* aio_get_req
  *	Allocate a slot for an aio request.
  * Returns NULL if no requests are free.
@@ -890,8 +933,11 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
 {
 	struct kiocb *req;
 
-	if (!get_reqs_available(ctx))
-		return NULL;
+	if (!get_reqs_available(ctx)) {
+		user_refill_reqs_available(ctx);
+		if (!get_reqs_available(ctx))
+			return NULL;
+	}
 
 	req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL|__GFP_ZERO);
 	if (unlikely(!req))
@@ -950,8 +996,8 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 	struct kioctx	*ctx = iocb->ki_ctx;
 	struct aio_ring	*ring;
 	struct io_event	*ev_page, *event;
+	unsigned tail, pos, head;
 	unsigned long	flags;
-	unsigned tail, pos;
 
 	/*
 	 * Special case handling for sync iocbs:
@@ -1012,10 +1058,14 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 	ctx->tail = tail;
 
 	ring = kmap_atomic(ctx->ring_pages[0]);
+	head = ring->head;
 	ring->tail = tail;
 	kunmap_atomic(ring);
 	flush_dcache_page(ctx->ring_pages[0]);
 
+	ctx->completed_events++;
+	if (ctx->completed_events > 1)
+		refill_reqs_available(ctx, head, tail);
 	spin_unlock_irqrestore(&ctx->completion_lock, flags);
 
 	pr_debug("added to ring %p at [%u]\n", iocb, tail);
@@ -1030,7 +1080,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 
 	/* everything turned out well, dispose of the aiocb. */
 	kiocb_free(iocb);
-	put_reqs_available(ctx, 1);
 
 	/*
 	 * We have to order our ring_info tail store above and test
@@ -1047,7 +1096,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 }
 EXPORT_SYMBOL(aio_complete);
 
-/* aio_read_events
+/* aio_read_events_ring
  *	Pull an event off of the ioctx's event ring.  Returns the number of
  *	events fetched
  */
@@ -1067,6 +1116,12 @@ static long aio_read_events_ring(struct kioctx *ctx,
 	tail = ring->tail;
 	kunmap_atomic(ring);
 
+	/*
+	 * Ensure that once we've read the current tail pointer, that
+	 * we also see the events that were stored up to the tail.
+	 */
+	smp_rmb();
+
 	pr_debug("h%u t%u m%u\n", head, tail, ctx->nr_events);
 
 	if (head == tail)
@@ -1270,12 +1325,12 @@ static ssize_t aio_setup_vectored_rw(struct kiocb *kiocb,
 	if (compat)
 		ret = compat_rw_copy_check_uvector(rw,
 				(struct compat_iovec __user *)buf,
-				*nr_segs, 1, *iovec, iovec);
+				*nr_segs, UIO_FASTIOV, *iovec, iovec);
 	else
 #endif
 		ret = rw_copy_check_uvector(rw,
 				(struct iovec __user *)buf,
-				*nr_segs, 1, *iovec, iovec);
+				*nr_segs, UIO_FASTIOV, *iovec, iovec);
 	if (ret < 0)
 		return ret;
 
@@ -1299,9 +1354,8 @@ static ssize_t aio_setup_single_vector(struct kiocb *kiocb,
 }
 
 /*
- * aio_setup_iocb:
- *	Performs the initial checks and aio retry method
- *	setup for the kiocb at the time of io submission.
+ * aio_run_iocb:
+ *	Performs the initial checks and io submission.
  */
 static ssize_t aio_run_iocb(struct kiocb *req, unsigned opcode,
 			    char __user *buf, bool compat)
@@ -1313,7 +1367,7 @@ static ssize_t aio_run_iocb(struct kiocb *req, unsigned opcode,
 	fmode_t mode;
 	aio_rw_op *rw_op;
 	rw_iter_op *iter_op;
-	struct iovec inline_vec, *iovec = &inline_vec;
+	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
 	struct iov_iter iter;
 
 	switch (opcode) {
@@ -1348,7 +1402,7 @@ rw_common:
 		if (!ret)
 			ret = rw_verify_area(rw, file, &req->ki_pos, req->ki_nbytes);
 		if (ret < 0) {
-			if (iovec != &inline_vec)
+			if (iovec != inline_vecs)
 				kfree(iovec);
 			return ret;
 		}
@@ -1395,7 +1449,7 @@ rw_common:
 		return -EINVAL;
 	}
 
-	if (iovec != &inline_vec)
+	if (iovec != inline_vecs)
 		kfree(iovec);
 
 	if (ret != -EIOCBQUEUED) {