summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--net/rds/ib_rdma.c214
-rw-r--r--net/rds/xlist.h110
2 files changed, 282 insertions, 42 deletions
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 64b5ede037c8..8c40391de5a2 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -36,7 +36,10 @@
 
 #include "rds.h"
 #include "ib.h"
+#include "xlist.h"
 
+static DEFINE_PER_CPU(unsigned long, clean_list_grace);
+#define CLEAN_LIST_BUSY_BIT 0
 
 /*
  * This is stored as mr->r_trans_private.
@@ -45,7 +48,11 @@ struct rds_ib_mr {
 	struct rds_ib_device	*device;
 	struct rds_ib_mr_pool	*pool;
 	struct ib_fmr		*fmr;
-	struct list_head	list;
+
+	struct xlist_head	xlist;
+
+	/* unmap_list is for freeing */
+	struct list_head	unmap_list;
 	unsigned int		remap_count;
 
 	struct scatterlist	*sg;
@@ -61,12 +68,14 @@ struct rds_ib_mr_pool {
 	struct mutex		flush_lock;		/* serialize fmr invalidate */
 	struct work_struct	flush_worker;		/* flush worker */
 
-	spinlock_t		list_lock;		/* protect variables below */
 	atomic_t		item_count;		/* total # of MRs */
 	atomic_t		dirty_count;		/* # dirty of MRs */
-	struct list_head	drop_list;		/* MRs that have reached their max_maps limit */
-	struct list_head	free_list;		/* unused MRs */
-	struct list_head	clean_list;		/* unused & unamapped MRs */
+
+	struct xlist_head	drop_list;		/* MRs that have reached their max_maps limit */
+	struct xlist_head	free_list;		/* unused MRs */
+	struct xlist_head	clean_list;		/* global unused & unamapped MRs */
+	wait_queue_head_t	flush_wait;
+
 	atomic_t		free_pinned;		/* memory pinned by free MRs */
 	unsigned long		max_items;
 	unsigned long		max_items_soft;
@@ -74,7 +83,7 @@ struct rds_ib_mr_pool {
 	struct ib_fmr_attr	fmr_attr;
 };
 
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all);
+static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **);
 static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
 static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
 
@@ -212,11 +221,11 @@ struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
 	if (!pool)
 		return ERR_PTR(-ENOMEM);
 
-	INIT_LIST_HEAD(&pool->free_list);
-	INIT_LIST_HEAD(&pool->drop_list);
-	INIT_LIST_HEAD(&pool->clean_list);
+	INIT_XLIST_HEAD(&pool->free_list);
+	INIT_XLIST_HEAD(&pool->drop_list);
+	INIT_XLIST_HEAD(&pool->clean_list);
 	mutex_init(&pool->flush_lock);
-	spin_lock_init(&pool->list_lock);
+	init_waitqueue_head(&pool->flush_wait);
 	INIT_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
 	pool->fmr_attr.max_pages = fmr_message_size;
@@ -246,27 +255,50 @@ void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
 {
 	cancel_work_sync(&pool->flush_worker);
-	rds_ib_flush_mr_pool(pool, 1);
+	rds_ib_flush_mr_pool(pool, 1, NULL);
 	WARN_ON(atomic_read(&pool->item_count));
 	WARN_ON(atomic_read(&pool->free_pinned));
 	kfree(pool);
 }
 
+static void refill_local(struct rds_ib_mr_pool *pool, struct xlist_head *xl,
+			 struct rds_ib_mr **ibmr_ret)
+{
+	struct xlist_head *ibmr_xl;
+	ibmr_xl = xlist_del_head_fast(xl);
+	*ibmr_ret = list_entry(ibmr_xl, struct rds_ib_mr, xlist);
+}
+
 static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
 {
 	struct rds_ib_mr *ibmr = NULL;
-	unsigned long flags;
+	struct xlist_head *ret;
+	unsigned long *flag;
 
-	spin_lock_irqsave(&pool->list_lock, flags);
-	if (!list_empty(&pool->clean_list)) {
-		ibmr = list_entry(pool->clean_list.next, struct rds_ib_mr, list);
-		list_del_init(&ibmr->list);
-	}
-	spin_unlock_irqrestore(&pool->list_lock, flags);
+	preempt_disable();
+	flag = &__get_cpu_var(clean_list_grace);
+	set_bit(CLEAN_LIST_BUSY_BIT, flag);
+	ret = xlist_del_head(&pool->clean_list);
+	if (ret)
+		ibmr = list_entry(ret, struct rds_ib_mr, xlist);
 
+	clear_bit(CLEAN_LIST_BUSY_BIT, flag);
+	preempt_enable();
 	return ibmr;
 }
 
+static inline void wait_clean_list_grace(void)
+{
+	int cpu;
+	unsigned long *flag;
+
+	for_each_online_cpu(cpu) {
+		flag = &per_cpu(clean_list_grace, cpu);
+		while (test_bit(CLEAN_LIST_BUSY_BIT, flag))
+			cpu_relax();
+	}
+}
+
 static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 {
 	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
@@ -299,7 +331,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
 
 		/* We do have some empty MRs. Flush them out. */
 		rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
-		rds_ib_flush_mr_pool(pool, 0);
+		rds_ib_flush_mr_pool(pool, 0, &ibmr);
+		if (ibmr)
+			return ibmr;
 	}
 
 	ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev));
@@ -494,33 +528,109 @@ static inline unsigned int rds_ib_flush_goal(struct rds_ib_mr_pool *pool, int fr
 }
 
 /*
+ * given an xlist of mrs, put them all into the list_head for more processing
+ */
+static void xlist_append_to_list(struct xlist_head *xlist, struct list_head *list)
+{
+	struct rds_ib_mr *ibmr;
+	struct xlist_head splice;
+	struct xlist_head *cur;
+	struct xlist_head *next;
+
+	splice.next = NULL;
+	xlist_splice(xlist, &splice);
+	cur = splice.next;
+	while (cur) {
+		next = cur->next;
+		ibmr = list_entry(cur, struct rds_ib_mr, xlist);
+		list_add_tail(&ibmr->unmap_list, list);
+		cur = next;
+	}
+}
+
+/*
+ * this takes a list head of mrs and turns it into an xlist of clusters.
+ * each cluster has an xlist of MR_CLUSTER_SIZE mrs that are ready for
+ * reuse.
+ */
+static void list_append_to_xlist(struct rds_ib_mr_pool *pool,
+				struct list_head *list, struct xlist_head *xlist,
+				struct xlist_head **tail_ret)
+{
+	struct rds_ib_mr *ibmr;
+	struct xlist_head *cur_mr = xlist;
+	struct xlist_head *tail_mr = NULL;
+
+	list_for_each_entry(ibmr, list, unmap_list) {
+		tail_mr = &ibmr->xlist;
+		tail_mr->next = NULL;
+		cur_mr->next = tail_mr;
+		cur_mr = tail_mr;
+	}
+	*tail_ret = tail_mr;
+}
+
+/*
  * Flush our pool of MRs.
  * At a minimum, all currently unused MRs are unmapped.
  * If the number of MRs allocated exceeds the limit, we also try
  * to free as many MRs as needed to get back to this limit.
  */
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
+static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
+			        int free_all, struct rds_ib_mr **ibmr_ret)
 {
 	struct rds_ib_mr *ibmr, *next;
+	struct xlist_head clean_xlist;
+	struct xlist_head *clean_tail;
 	LIST_HEAD(unmap_list);
 	LIST_HEAD(fmr_list);
 	unsigned long unpinned = 0;
-	unsigned long flags;
 	unsigned int nfreed = 0, ncleaned = 0, free_goal;
 	int ret = 0;
 
 	rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
 
-	mutex_lock(&pool->flush_lock);
+	if (ibmr_ret) {
+		DEFINE_WAIT(wait);
+		while(!mutex_trylock(&pool->flush_lock)) {
+			ibmr = rds_ib_reuse_fmr(pool);
+			if (ibmr) {
+				*ibmr_ret = ibmr;
+				finish_wait(&pool->flush_wait, &wait);
+				goto out_nolock;
+			}
+
+			prepare_to_wait(&pool->flush_wait, &wait,
+					TASK_UNINTERRUPTIBLE);
+			if (xlist_empty(&pool->clean_list))
+				schedule();
+
+			ibmr = rds_ib_reuse_fmr(pool);
+			if (ibmr) {
+				*ibmr_ret = ibmr;
+				finish_wait(&pool->flush_wait, &wait);
+				goto out_nolock;
+			}
+		}
+		finish_wait(&pool->flush_wait, &wait);
+	} else
+		mutex_lock(&pool->flush_lock);
+
+	if (ibmr_ret) {
+		ibmr = rds_ib_reuse_fmr(pool);
+		if (ibmr) {
+			*ibmr_ret = ibmr;
+			goto out;
+		}
+	}
 
-	spin_lock_irqsave(&pool->list_lock, flags);
 	/* Get the list of all MRs to be dropped. Ordering matters -
-	 * we want to put drop_list ahead of free_list. */
-	list_splice_init(&pool->free_list, &unmap_list);
-	list_splice_init(&pool->drop_list, &unmap_list);
+	 * we want to put drop_list ahead of free_list.
+	 */
+	xlist_append_to_list(&pool->drop_list, &unmap_list);
+	xlist_append_to_list(&pool->free_list, &unmap_list);
 	if (free_all)
-		list_splice_init(&pool->clean_list, &unmap_list);
-	spin_unlock_irqrestore(&pool->list_lock, flags);
+		xlist_append_to_list(&pool->clean_list, &unmap_list);
 
 	free_goal = rds_ib_flush_goal(pool, free_all);
 
@@ -528,19 +638,20 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 		goto out;
 
 	/* String all ib_mr's onto one list and hand them to ib_unmap_fmr */
-	list_for_each_entry(ibmr, &unmap_list, list)
+	list_for_each_entry(ibmr, &unmap_list, unmap_list)
 		list_add(&ibmr->fmr->list, &fmr_list);
+
 	ret = ib_unmap_fmr(&fmr_list);
 	if (ret)
 		printk(KERN_WARNING "RDS/IB: ib_unmap_fmr failed (err=%d)\n", ret);
 
 	/* Now we can destroy the DMA mapping and unpin any pages */
-	list_for_each_entry_safe(ibmr, next, &unmap_list, list) {
+	list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
 		unpinned += ibmr->sg_len;
 		__rds_ib_teardown_mr(ibmr);
 		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
 			rds_ib_stats_inc(s_ib_rdma_mr_free);
-			list_del(&ibmr->list);
+			list_del(&ibmr->unmap_list);
 			ib_dealloc_fmr(ibmr->fmr);
 			kfree(ibmr);
 			nfreed++;
@@ -548,9 +659,27 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 		ncleaned++;
 	}
 
-	spin_lock_irqsave(&pool->list_lock, flags);
-	list_splice(&unmap_list, &pool->clean_list);
-	spin_unlock_irqrestore(&pool->list_lock, flags);
+	if (!list_empty(&unmap_list)) {
+		/* we have to make sure that none of the things we're about
+		 * to put on the clean list would race with other cpus trying
+		 * to pull items off.  The xlist would explode if we managed to
+		 * remove something from the clean list and then add it back again
+		 * while another CPU was spinning on that same item in xlist_del_head.
+		 *
+		 * This is pretty unlikely, but just in case  wait for an xlist grace period
+		 * here before adding anything back into the clean list.
+		 */
+		wait_clean_list_grace();
+
+		list_append_to_xlist(pool, &unmap_list, &clean_xlist, &clean_tail);
+		if (ibmr_ret)
+			refill_local(pool, &clean_xlist, ibmr_ret);
+
+		/* refill_local may have emptied our list */
+		if (!xlist_empty(&clean_xlist))
+			xlist_add(clean_xlist.next, clean_tail, &pool->clean_list);
+
+	}
 
 	atomic_sub(unpinned, &pool->free_pinned);
 	atomic_sub(ncleaned, &pool->dirty_count);
@@ -558,6 +687,9 @@ static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all)
 
 out:
 	mutex_unlock(&pool->flush_lock);
+	if (waitqueue_active(&pool->flush_wait))
+		wake_up(&pool->flush_wait);
+out_nolock:
 	return ret;
 }
 
@@ -565,7 +697,7 @@ static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
 {
 	struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker);
 
-	rds_ib_flush_mr_pool(pool, 0);
+	rds_ib_flush_mr_pool(pool, 0, NULL);
 }
 
 void rds_ib_free_mr(void *trans_private, int invalidate)
@@ -573,20 +705,17 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
 	struct rds_ib_mr *ibmr = trans_private;
 	struct rds_ib_device *rds_ibdev = ibmr->device;
 	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
-	unsigned long flags;
 
 	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
 
 	/* Return it to the pool's free list */
-	spin_lock_irqsave(&pool->list_lock, flags);
 	if (ibmr->remap_count >= pool->fmr_attr.max_maps)
-		list_add(&ibmr->list, &pool->drop_list);
+		xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->drop_list);
 	else
-		list_add(&ibmr->list, &pool->free_list);
+		xlist_add(&ibmr->xlist, &ibmr->xlist, &pool->free_list);
 
 	atomic_add(ibmr->sg_len, &pool->free_pinned);
 	atomic_inc(&pool->dirty_count);
-	spin_unlock_irqrestore(&pool->list_lock, flags);
 
 	/* If we've pinned too many pages, request a flush */
 	if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
@@ -595,7 +724,7 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
 
 	if (invalidate) {
 		if (likely(!in_interrupt())) {
-			rds_ib_flush_mr_pool(pool, 0);
+			rds_ib_flush_mr_pool(pool, 0, NULL);
 		} else {
 			/* We get here if the user created a MR marked
 			 * as use_once and invalidate at the same time. */
@@ -614,7 +743,7 @@ void rds_ib_flush_mrs(void)
 		struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
 
 		if (pool)
-			rds_ib_flush_mr_pool(pool, 0);
+			rds_ib_flush_mr_pool(pool, 0, NULL);
 	}
 }
 
@@ -659,3 +788,4 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 		rds_ib_dev_put(rds_ibdev);
 	return ibmr;
 }
+
diff --git a/net/rds/xlist.h b/net/rds/xlist.h
new file mode 100644
index 000000000000..8c21aca49d50
--- /dev/null
+++ b/net/rds/xlist.h
@@ -0,0 +1,110 @@
+#ifndef _LINUX_XLIST_H
+#define _LINUX_XLIST_H
+
+#include <linux/stddef.h>
+#include <linux/poison.h>
+#include <linux/prefetch.h>
+#include <asm/system.h>
+
+struct xlist_head {
+	struct xlist_head *next;
+};
+
+/*
+ * XLIST_PTR_TAIL can be used to prevent double insertion.  See
+ * xlist_protect()
+ */
+#define XLIST_PTR_TAIL ((struct xlist_head *)0x1)
+
+static inline void xlist_add(struct xlist_head *new, struct xlist_head *tail, struct xlist_head *head)
+{
+	struct xlist_head *cur;
+	struct xlist_head *check;
+
+	while (1) {
+		cur = head->next;
+		tail->next = cur;
+		check = cmpxchg(&head->next, cur, new);
+		if (check == cur)
+			break;
+	}
+}
+
+/*
+ * To avoid duplicate insertion by two CPUs of the same xlist item
+ * you can call xlist_protect.  It will stuff XLIST_PTR_TAIL
+ * into the entry->next pointer with xchg, and only return 1
+ * if there was a NULL there before.
+ *
+ * if xlist_protect returns zero, someone else is busy working
+ * on this entry.  Getting a NULL into the entry in a race
+ * free manner is the caller's job.
+ */
+static inline int xlist_protect(struct xlist_head *entry)
+{
+	struct xlist_head *val;
+
+	val = xchg(&entry->next, XLIST_PTR_TAIL);
+	if (val == NULL)
+		return 1;
+	return 0;
+}
+
+static inline struct xlist_head *xlist_del_head(struct xlist_head *head)
+{
+	struct xlist_head *cur;
+	struct xlist_head *check;
+	struct xlist_head *next;
+
+	while (1) {
+		cur = head->next;
+		if (!cur)
+			goto out;
+
+		if (cur == XLIST_PTR_TAIL) {
+			cur = NULL;
+			goto out;
+		}
+
+		next = cur->next;
+		check = cmpxchg(&head->next, cur, next);
+		if (check == cur)
+			goto out;
+	}
+out:
+	return cur;
+}
+
+static inline struct xlist_head *xlist_del_head_fast(struct xlist_head *head)
+{
+	struct xlist_head *cur;
+
+	cur = head->next;
+	if (!cur || cur == XLIST_PTR_TAIL)
+		return NULL;
+
+	head->next = cur->next;
+	return cur;
+}
+
+static inline void xlist_splice(struct xlist_head *list,
+				struct xlist_head *head)
+{
+	struct xlist_head *cur;
+
+	WARN_ON(head->next);
+	cur = xchg(&list->next, NULL);
+	head->next = cur;
+}
+
+static inline void INIT_XLIST_HEAD(struct xlist_head *list)
+{
+	list->next = NULL;
+}
+
+static inline int xlist_empty(struct xlist_head *head)
+{
+	return head->next == NULL || head->next == XLIST_PTR_TAIL;
+}
+
+#endif