summary refs log tree commit diff
path: root/fs/aio.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/aio.c')
-rw-r--r--fs/aio.c134
1 files changed, 51 insertions, 83 deletions
diff --git a/fs/aio.c b/fs/aio.c
index 823efcbb6ccd..08159ed13649 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -80,6 +80,8 @@ struct kioctx {
 	struct percpu_ref	users;
 	atomic_t		dead;
 
+	struct percpu_ref	reqs;
+
 	unsigned long		user_id;
 
 	struct __percpu kioctx_cpu *cpu;
@@ -107,7 +109,6 @@ struct kioctx {
 	struct page		**ring_pages;
 	long			nr_pages;
 
-	struct rcu_head		rcu_head;
 	struct work_struct	free_work;
 
 	struct {
@@ -250,8 +251,10 @@ static void aio_free_ring(struct kioctx *ctx)
 
 	put_aio_ring_file(ctx);
 
-	if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages)
+	if (ctx->ring_pages && ctx->ring_pages != ctx->internal_pages) {
 		kfree(ctx->ring_pages);
+		ctx->ring_pages = NULL;
+	}
 }
 
 static int aio_ring_mmap(struct file *file, struct vm_area_struct *vma)
@@ -463,26 +466,34 @@ static int kiocb_cancel(struct kioctx *ctx, struct kiocb *kiocb)
 	return cancel(kiocb);
 }
 
-static void free_ioctx_rcu(struct rcu_head *head)
+static void free_ioctx(struct work_struct *work)
 {
-	struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+	struct kioctx *ctx = container_of(work, struct kioctx, free_work);
 
+	pr_debug("freeing %p\n", ctx);
+
+	aio_free_ring(ctx);
 	free_percpu(ctx->cpu);
 	kmem_cache_free(kioctx_cachep, ctx);
 }
 
+static void free_ioctx_reqs(struct percpu_ref *ref)
+{
+	struct kioctx *ctx = container_of(ref, struct kioctx, reqs);
+
+	INIT_WORK(&ctx->free_work, free_ioctx);
+	schedule_work(&ctx->free_work);
+}
+
 /*
  * When this function runs, the kioctx has been removed from the "hash table"
  * and ctx->users has dropped to 0, so we know no more kiocbs can be submitted -
  * now it's safe to cancel any that need to be.
  */
-static void free_ioctx(struct work_struct *work)
+static void free_ioctx_users(struct percpu_ref *ref)
 {
-	struct kioctx *ctx = container_of(work, struct kioctx, free_work);
-	struct aio_ring *ring;
+	struct kioctx *ctx = container_of(ref, struct kioctx, users);
 	struct kiocb *req;
-	unsigned cpu, avail;
-	DEFINE_WAIT(wait);
 
 	spin_lock_irq(&ctx->ctx_lock);
 
@@ -496,54 +507,8 @@ static void free_ioctx(struct work_struct *work)
 
 	spin_unlock_irq(&ctx->ctx_lock);
 
-	for_each_possible_cpu(cpu) {
-		struct kioctx_cpu *kcpu = per_cpu_ptr(ctx->cpu, cpu);
-
-		atomic_add(kcpu->reqs_available, &ctx->reqs_available);
-		kcpu->reqs_available = 0;
-	}
-
-	while (1) {
-		prepare_to_wait(&ctx->wait, &wait, TASK_UNINTERRUPTIBLE);
-
-		ring = kmap_atomic(ctx->ring_pages[0]);
-		avail = (ring->head <= ring->tail)
-			 ? ring->tail - ring->head
-			 : ctx->nr_events - ring->head + ring->tail;
-
-		atomic_add(avail, &ctx->reqs_available);
-		ring->head = ring->tail;
-		kunmap_atomic(ring);
-
-		if (atomic_read(&ctx->reqs_available) >= ctx->nr_events - 1)
-			break;
-
-		schedule();
-	}
-	finish_wait(&ctx->wait, &wait);
-
-	WARN_ON(atomic_read(&ctx->reqs_available) > ctx->nr_events - 1);
-
-	aio_free_ring(ctx);
-
-	pr_debug("freeing %p\n", ctx);
-
-	/*
-	 * Here the call_rcu() is between the wait_event() for reqs_active to
-	 * hit 0, and freeing the ioctx.
-	 *
-	 * aio_complete() decrements reqs_active, but it has to touch the ioctx
-	 * after to issue a wakeup so we use rcu.
-	 */
-	call_rcu(&ctx->rcu_head, free_ioctx_rcu);
-}
-
-static void free_ioctx_ref(struct percpu_ref *ref)
-{
-	struct kioctx *ctx = container_of(ref, struct kioctx, users);
-
-	INIT_WORK(&ctx->free_work, free_ioctx);
-	schedule_work(&ctx->free_work);
+	percpu_ref_kill(&ctx->reqs);
+	percpu_ref_put(&ctx->reqs);
 }
 
 static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
@@ -602,6 +567,16 @@ static int ioctx_add_table(struct kioctx *ctx, struct mm_struct *mm)
 	}
 }
 
+static void aio_nr_sub(unsigned nr)
+{
+	spin_lock(&aio_nr_lock);
+	if (WARN_ON(aio_nr - nr > aio_nr))
+		aio_nr = 0;
+	else
+		aio_nr -= nr;
+	spin_unlock(&aio_nr_lock);
+}
+
 /* ioctx_alloc
  *	Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
  */
@@ -639,8 +614,11 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 
 	ctx->max_reqs = nr_events;
 
-	if (percpu_ref_init(&ctx->users, free_ioctx_ref))
-		goto out_freectx;
+	if (percpu_ref_init(&ctx->users, free_ioctx_users))
+		goto err;
+
+	if (percpu_ref_init(&ctx->reqs, free_ioctx_reqs))
+		goto err;
 
 	spin_lock_init(&ctx->ctx_lock);
 	spin_lock_init(&ctx->completion_lock);
@@ -651,10 +629,10 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 
 	ctx->cpu = alloc_percpu(struct kioctx_cpu);
 	if (!ctx->cpu)
-		goto out_freeref;
+		goto err;
 
 	if (aio_setup_ring(ctx) < 0)
-		goto out_freepcpu;
+		goto err;
 
 	atomic_set(&ctx->reqs_available, ctx->nr_events - 1);
 	ctx->req_batch = (ctx->nr_events - 1) / (num_possible_cpus() * 4);
@@ -666,7 +644,8 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 	if (aio_nr + nr_events > (aio_max_nr * 2UL) ||
 	    aio_nr + nr_events < aio_nr) {
 		spin_unlock(&aio_nr_lock);
-		goto out_cleanup;
+		err = -EAGAIN;
+		goto err;
 	}
 	aio_nr += ctx->max_reqs;
 	spin_unlock(&aio_nr_lock);
@@ -675,23 +654,18 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 
 	err = ioctx_add_table(ctx, mm);
 	if (err)
-		goto out_cleanup_put;
+		goto err_cleanup;
 
 	pr_debug("allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
 		 ctx, ctx->user_id, mm, ctx->nr_events);
 	return ctx;
 
-out_cleanup_put:
-	percpu_ref_put(&ctx->users);
-out_cleanup:
-	err = -EAGAIN;
-	aio_free_ring(ctx);
-out_freepcpu:
+err_cleanup:
+	aio_nr_sub(ctx->max_reqs);
+err:
 	free_percpu(ctx->cpu);
-out_freeref:
+	free_percpu(ctx->reqs.pcpu_count);
 	free_percpu(ctx->users.pcpu_count);
-out_freectx:
-	put_aio_ring_file(ctx);
 	kmem_cache_free(kioctx_cachep, ctx);
 	pr_debug("error allocating ioctx %d\n", err);
 	return ERR_PTR(err);
@@ -726,10 +700,7 @@ static void kill_ioctx(struct mm_struct *mm, struct kioctx *ctx)
 		 * -EAGAIN with no ioctxs actually in use (as far as userspace
 		 *  could tell).
 		 */
-		spin_lock(&aio_nr_lock);
-		BUG_ON(aio_nr - ctx->max_reqs > aio_nr);
-		aio_nr -= ctx->max_reqs;
-		spin_unlock(&aio_nr_lock);
+		aio_nr_sub(ctx->max_reqs);
 
 		if (ctx->mmap_size)
 			vm_munmap(ctx->mmap_base, ctx->mmap_size);
@@ -861,6 +832,8 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx)
 	if (unlikely(!req))
 		goto out_put;
 
+	percpu_ref_get(&ctx->reqs);
+
 	req->ki_ctx = ctx;
 	return req;
 out_put:
@@ -930,12 +903,6 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 		return;
 	}
 
-	/*
-	 * Take rcu_read_lock() in case the kioctx is being destroyed, as we
-	 * need to issue a wakeup after incrementing reqs_available.
-	 */
-	rcu_read_lock();
-
 	if (iocb->ki_list.next) {
 		unsigned long flags;
 
@@ -1010,7 +977,7 @@ void aio_complete(struct kiocb *iocb, long res, long res2)
 	if (waitqueue_active(&ctx->wait))
 		wake_up(&ctx->wait);
 
-	rcu_read_unlock();
+	percpu_ref_put(&ctx->reqs);
 }
 EXPORT_SYMBOL(aio_complete);
 
@@ -1421,6 +1388,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
 	return 0;
 out_put_req:
 	put_reqs_available(ctx, 1);
+	percpu_ref_put(&ctx->reqs);
 	kiocb_free(req);
 	return ret;
 }