1 From bef5f97142e11a2abb8731c6f85f90dc886d04bb Mon Sep 17 00:00:00 2001
2 From: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
3 Date: Fri, 30 Oct 2015 11:59:07 +0100
4 Subject: [PATCH 075/366] ipc/msg: Implement lockless pipelined wakeups
6 This patch moves the wakeup_process() invocation so it is not done under
7 the perm->lock by making use of a lockless wake_q. With this change, the
8 waiter is woken up once the message has been assigned and it does not
9 need to loop on SMP if the message points to NULL. In the signal case we
10 still need to check the pointer under the lock to verify the state.
12 This change should also avoid the introduction of preempt_disable() in
13 -RT which avoids a busy-loop which pools for the NULL -> !NULL
14 change if the waiter has a higher priority compared to the waker.
16 Cc: Davidlohr Bueso <dave@stgolabs.net>
17 Cc: Manfred Spraul <manfred@colorfullife.com>
18 Cc: Andrew Morton <akpm@linux-foundation.org>
19 Cc: George Spelvin <linux@horizon.com>
20 Cc: Thomas Gleixner <tglx@linutronix.de>
21 Cc: Peter Zijlstra <peterz@infradead.org>
22 Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
24 ipc/msg.c | 101 +++++++++++++++++---------------------------------------------
25 1 file changed, 28 insertions(+), 73 deletions(-)
27 diff --git a/ipc/msg.c b/ipc/msg.c
28 index c6521c2..996d890 100644
31 @@ -183,20 +183,14 @@ static void ss_wakeup(struct list_head *h, int kill)
35 -static void expunge_all(struct msg_queue *msq, int res)
36 +static void expunge_all(struct msg_queue *msq, int res,
37 + struct wake_q_head *wake_q)
39 struct msg_receiver *msr, *t;
41 list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
42 - msr->r_msg = NULL; /* initialize expunge ordering */
43 - wake_up_process(msr->r_tsk);
45 - * Ensure that the wakeup is visible before setting r_msg as
46 - * the receiving end depends on it: either spinning on a nil,
47 - * or dealing with -EAGAIN cases. See lockless receive part 1
48 - * and 2 in do_msgrcv().
50 - smp_wmb(); /* barrier (B) */
52 + wake_q_add(wake_q, msr->r_tsk);
53 msr->r_msg = ERR_PTR(res);
56 @@ -213,11 +207,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
58 struct msg_msg *msg, *t;
59 struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
62 - expunge_all(msq, -EIDRM);
63 + expunge_all(msq, -EIDRM, &wake_q);
64 ss_wakeup(&msq->q_senders, 1);
66 ipc_unlock_object(&msq->q_perm);
70 list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
71 @@ -342,6 +338,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
72 struct kern_ipc_perm *ipcp;
73 struct msqid64_ds uninitialized_var(msqid64);
74 struct msg_queue *msq;
79 @@ -389,7 +386,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
80 /* sleeping receivers might be excluded by
81 * stricter permissions.
83 - expunge_all(msq, -EAGAIN);
84 + expunge_all(msq, -EAGAIN, &wake_q);
85 /* sleeping senders might be able to send
86 * due to a larger queue size.
88 @@ -402,6 +399,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
91 ipc_unlock_object(&msq->q_perm);
96 @@ -566,7 +564,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
100 -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
101 +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
102 + struct wake_q_head *wake_q)
104 struct msg_receiver *msr, *t;
106 @@ -577,27 +576,13 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
108 list_del(&msr->r_list);
109 if (msr->r_maxsize < msg->m_ts) {
110 - /* initialize pipelined send ordering */
112 - wake_up_process(msr->r_tsk);
113 - /* barrier (B) see barrier comment below */
115 + wake_q_add(wake_q, msr->r_tsk);
116 msr->r_msg = ERR_PTR(-E2BIG);
119 msq->q_lrpid = task_pid_vnr(msr->r_tsk);
120 msq->q_rtime = get_seconds();
121 - wake_up_process(msr->r_tsk);
123 - * Ensure that the wakeup is visible before
124 - * setting r_msg, as the receiving can otherwise
125 - * exit - once r_msg is set, the receiver can
126 - * continue. See lockless receive part 1 and 2
127 - * in do_msgrcv(). Barrier (B).
130 + wake_q_add(wake_q, msr->r_tsk);
136 @@ -613,6 +598,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
139 struct ipc_namespace *ns;
142 ns = current->nsproxy->ipc_ns;
144 @@ -698,7 +684,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
145 msq->q_lspid = task_tgid_vnr(current);
146 msq->q_stime = get_seconds();
148 - if (!pipelined_send(msq, msg)) {
149 + if (!pipelined_send(msq, msg, &wake_q)) {
150 /* no one is waiting for this message, enqueue it */
151 list_add_tail(&msg->m_list, &msq->q_messages);
152 msq->q_cbytes += msgsz;
153 @@ -712,6 +698,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
156 ipc_unlock_object(&msq->q_perm);
157 + wake_up_q(&wake_q);
161 @@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
164 /* Lockless receive, part 2:
165 - * Wait until pipelined_send or expunge_all are outside of
166 - * wake_up_process(). There is a race with exit(), see
167 - * ipc/mqueue.c for the details. The correct serialization
168 - * ensures that a receiver cannot continue without the wakeup
169 - * being visibible _before_ setting r_msg:
170 + * The work in pipelined_send() and expunge_all():
171 + * - Set pointer to message
172 + * - Queue the receiver task for later wakeup
173 + * - Wake up the process after the lock is dropped.
177 - * smp_rmb(); (A) <-- pair -. <waker thread>
178 - * <load ->r_msg> | msr->r_msg = NULL;
179 - * | wake_up_process();
180 - * <continue> `------> smp_wmb(); (B)
181 - * msr->r_msg = msg;
183 - * Where (A) orders the message value read and where (B) orders
184 - * the write to the r_msg -- done in both pipelined_send and
186 + * Should the process wake up before this wakeup (due to a
187 + * signal) it will either see the message and continue …
191 - * Pairs with writer barrier in pipelined_send
194 - smp_rmb(); /* barrier (A) */
195 - msg = (struct msg_msg *)msr_d.r_msg;
200 - * The cpu_relax() call is a compiler barrier
201 - * which forces everything in this loop to be
207 - /* Lockless receive, part 3:
208 - * If there is a message or an error then accept it without
211 + msg = (struct msg_msg *)msr_d.r_msg;
212 if (msg != ERR_PTR(-EAGAIN))
215 - /* Lockless receive, part 3:
216 - * Acquire the queue spinlock.
219 + * … or see -EAGAIN, acquire the lock to check the message
222 ipc_lock_object(&msq->q_perm);
224 - /* Lockless receive, part 4:
225 - * Repeat test after acquiring the spinlock.
227 msg = (struct msg_msg *)msr_d.r_msg;
228 if (msg != ERR_PTR(-EAGAIN))