summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--ipc/msg.c133
1 files changed, 40 insertions, 93 deletions
diff --git a/ipc/msg.c b/ipc/msg.c
index c6521c205cb4..b1fb06a6a75b 100644
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -51,13 +51,7 @@ struct msg_receiver {
 	long			r_msgtype;
 	long			r_maxsize;
 
-	/*
-	 * Mark r_msg volatile so that the compiler
-	 * does not try to get smart and optimize
-	 * it. We rely on this for the lockless
-	 * receive algorithm.
-	 */
-	struct msg_msg		*volatile r_msg;
+	struct msg_msg		*r_msg;
 };
 
 /* one msg_sender for each sleeping sender */
@@ -183,21 +177,14 @@ static void ss_wakeup(struct list_head *h, int kill)
 	}
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+			struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
 	list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-		msr->r_msg = NULL; /* initialize expunge ordering */
-		wake_up_process(msr->r_tsk);
-		/*
-		 * Ensure that the wakeup is visible before setting r_msg as
-		 * the receiving end depends on it: either spinning on a nil,
-		 * or dealing with -EAGAIN cases. See lockless receive part 1
-		 * and 2 in do_msgrcv().
-		 */
-		smp_wmb(); /* barrier (B) */
-		msr->r_msg = ERR_PTR(res);
+		wake_q_add(wake_q, msr->r_tsk);
+		WRITE_ONCE(msr->r_msg, ERR_PTR(res));
 	}
 }
 
@@ -213,11 +200,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
 	struct msg_msg *msg, *t;
 	struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+	WAKE_Q(wake_q);
 
-	expunge_all(msq, -EIDRM);
+	expunge_all(msq, -EIDRM, &wake_q);
 	ss_wakeup(&msq->q_senders, 1);
 	msg_rmid(ns, msq);
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 	rcu_read_unlock();
 
 	list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -342,6 +331,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 	struct kern_ipc_perm *ipcp;
 	struct msqid64_ds uninitialized_var(msqid64);
 	struct msg_queue *msq;
+	WAKE_Q(wake_q);
 	int err;
 
 	if (cmd == IPC_SET) {
@@ -389,7 +379,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 		/* sleeping receivers might be excluded by
 		 * stricter permissions.
 		 */
-		expunge_all(msq, -EAGAIN);
+		expunge_all(msq, -EAGAIN, &wake_q);
 		/* sleeping senders might be able to send
 		 * due to a larger queue size.
 		 */
@@ -402,6 +392,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 out_up:
@@ -566,7 +557,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
 	return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+				 struct wake_q_head *wake_q)
 {
 	struct msg_receiver *msr, *t;
 
@@ -577,27 +569,14 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
 
 			list_del(&msr->r_list);
 			if (msr->r_maxsize < msg->m_ts) {
-				/* initialize pipelined send ordering */
-				msr->r_msg = NULL;
-				wake_up_process(msr->r_tsk);
-				/* barrier (B) see barrier comment below */
-				smp_wmb();
-				msr->r_msg = ERR_PTR(-E2BIG);
+				wake_q_add(wake_q, msr->r_tsk);
+				WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
 			} else {
-				msr->r_msg = NULL;
 				msq->q_lrpid = task_pid_vnr(msr->r_tsk);
 				msq->q_rtime = get_seconds();
-				wake_up_process(msr->r_tsk);
-				/*
-				 * Ensure that the wakeup is visible before
-				 * setting r_msg, as the receiving can otherwise
-				 * exit - once r_msg is set, the receiver can
-				 * continue. See lockless receive part 1 and 2
-				 * in do_msgrcv(). Barrier (B).
-				 */
-				smp_wmb();
-				msr->r_msg = msg;
 
+				wake_q_add(wake_q, msr->r_tsk);
+				WRITE_ONCE(msr->r_msg, msg);
 				return 1;
 			}
 		}
@@ -613,6 +592,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	struct msg_msg *msg;
 	int err;
 	struct ipc_namespace *ns;
+	WAKE_Q(wake_q);
 
 	ns = current->nsproxy->ipc_ns;
 
@@ -686,7 +666,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 			err = -EIDRM;
 			goto out_unlock0;
 		}
-
 		ss_del(&s);
 
 		if (signal_pending(current)) {
@@ -698,7 +677,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 	msq->q_lspid = task_tgid_vnr(current);
 	msq->q_stime = get_seconds();
 
-	if (!pipelined_send(msq, msg)) {
+	if (!pipelined_send(msq, msg, &wake_q)) {
 		/* no one is waiting for this message, enqueue it */
 		list_add_tail(&msg->m_list, &msq->q_messages);
 		msq->q_cbytes += msgsz;
@@ -712,6 +691,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
 	ipc_unlock_object(&msq->q_perm);
+	wake_up_q(&wake_q);
 out_unlock1:
 	rcu_read_unlock();
 	if (msg != NULL)
@@ -919,71 +899,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 		rcu_read_unlock();
 		schedule();
 
-		/* Lockless receive, part 1:
-		 * Disable preemption.  We don't hold a reference to the queue
-		 * and getting a reference would defeat the idea of a lockless
-		 * operation, thus the code relies on rcu to guarantee the
-		 * existence of msq:
+		/*
+		 * Lockless receive, part 1:
+		 * We don't hold a reference to the queue and getting a
+		 * reference would defeat the idea of a lockless operation,
+		 * thus the code relies on rcu to guarantee the existence of
+		 * msq:
 		 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
 		 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
-		 * rcu_read_lock() prevents preemption between reading r_msg
-		 * and acquiring the q_perm.lock in ipc_lock_object().
 		 */
 		rcu_read_lock();
 
-		/* Lockless receive, part 2:
-		 * Wait until pipelined_send or expunge_all are outside of
-		 * wake_up_process(). There is a race with exit(), see
-		 * ipc/mqueue.c for the details. The correct serialization
-		 * ensures that a receiver cannot continue without the wakeup
-		 * being visibible _before_ setting r_msg:
-		 *
-		 * CPU 0                             CPU 1
-		 * <loop receiver>
-		 *   smp_rmb(); (A) <-- pair -.      <waker thread>
-		 *   <load ->r_msg>           |        msr->r_msg = NULL;
-		 *                            |        wake_up_process();
-		 * <continue>                 `------> smp_wmb(); (B)
-		 *                                     msr->r_msg = msg;
+		/*
+		 * Lockless receive, part 2:
+		 * The work in pipelined_send() and expunge_all():
+		 * - Set pointer to message
+		 * - Queue the receiver task for later wakeup
+		 * - Wake up the process after the lock is dropped.
 		 *
-		 * Where (A) orders the message value read and where (B) orders
-		 * the write to the r_msg -- done in both pipelined_send and
-		 * expunge_all.
-		 */
-		for (;;) {
-			/*
-			 * Pairs with writer barrier in pipelined_send
-			 * or expunge_all.
-			 */
-			smp_rmb(); /* barrier (A) */
-			msg = (struct msg_msg *)msr_d.r_msg;
-			if (msg)
-				break;
-
-			/*
-			 * The cpu_relax() call is a compiler barrier
-			 * which forces everything in this loop to be
-			 * re-loaded.
-			 */
-			cpu_relax();
-		}
-
-		/* Lockless receive, part 3:
-		 * If there is a message or an error then accept it without
-		 * locking.
+		 * Should the process wake up before this wakeup (due to a
+		 * signal) it will either see the message and continue ...
 		 */
+		msg = READ_ONCE(msr_d.r_msg);
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock1;
 
-		/* Lockless receive, part 3:
-		 * Acquire the queue spinlock.
-		 */
+		 /*
+		  * ... or see -EAGAIN, acquire the lock to check the message
+		  * again.
+		  */
 		ipc_lock_object(&msq->q_perm);
 
-		/* Lockless receive, part 4:
-		 * Repeat test after acquiring the spinlock.
-		 */
-		msg = (struct msg_msg *)msr_d.r_msg;
+		msg = msr_d.r_msg;
 		if (msg != ERR_PTR(-EAGAIN))
 			goto out_unlock0;