]> rtime.felk.cvut.cz Git - hercules2020/nv-tegra/linux-4.4.git/blob - rt-patches/0075-ipc-msg-Implement-lockless-pipelined-wakeups.patch
Fix memguard and related syscalls
[hercules2020/nv-tegra/linux-4.4.git] / rt-patches / 0075-ipc-msg-Implement-lockless-pipelined-wakeups.patch
1 From bef5f97142e11a2abb8731c6f85f90dc886d04bb Mon Sep 17 00:00:00 2001
2 From: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
3 Date: Fri, 30 Oct 2015 11:59:07 +0100
4 Subject: [PATCH 075/366] ipc/msg: Implement lockless pipelined wakeups
5
6 This patch moves the wakeup_process() invocation so it is not done under
7 the perm->lock by making use of a lockless wake_q. With this change, the
8 waiter is woken up once the message has been assigned and it does not
9 need to loop on SMP if the message points to NULL. In the signal case we
10 still need to check the pointer under the lock to verify the state.
11
12 This change should also avoid the introduction of preempt_disable() in
13 -RT which avoids a busy-loop which pools for the NULL -> !NULL
14 change if the waiter has a higher priority compared to the waker.
15
16 Cc: Davidlohr Bueso <dave@stgolabs.net>
17 Cc: Manfred Spraul <manfred@colorfullife.com>
18 Cc: Andrew Morton <akpm@linux-foundation.org>
19 Cc: George Spelvin <linux@horizon.com>
20 Cc: Thomas Gleixner <tglx@linutronix.de>
21 Cc: Peter Zijlstra <peterz@infradead.org>
22 Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
23 ---
24  ipc/msg.c | 101 +++++++++++++++++---------------------------------------------
25  1 file changed, 28 insertions(+), 73 deletions(-)
26
27 diff --git a/ipc/msg.c b/ipc/msg.c
28 index c6521c2..996d890 100644
29 --- a/ipc/msg.c
30 +++ b/ipc/msg.c
31 @@ -183,20 +183,14 @@ static void ss_wakeup(struct list_head *h, int kill)
32         }
33  }
34  
35 -static void expunge_all(struct msg_queue *msq, int res)
36 +static void expunge_all(struct msg_queue *msq, int res,
37 +                       struct wake_q_head *wake_q)
38  {
39         struct msg_receiver *msr, *t;
40  
41         list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
42 -               msr->r_msg = NULL; /* initialize expunge ordering */
43 -               wake_up_process(msr->r_tsk);
44 -               /*
45 -                * Ensure that the wakeup is visible before setting r_msg as
46 -                * the receiving end depends on it: either spinning on a nil,
47 -                * or dealing with -EAGAIN cases. See lockless receive part 1
48 -                * and 2 in do_msgrcv().
49 -                */
50 -               smp_wmb(); /* barrier (B) */
51 +
52 +               wake_q_add(wake_q, msr->r_tsk);
53                 msr->r_msg = ERR_PTR(res);
54         }
55  }
56 @@ -213,11 +207,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
57  {
58         struct msg_msg *msg, *t;
59         struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
60 +       WAKE_Q(wake_q);
61  
62 -       expunge_all(msq, -EIDRM);
63 +       expunge_all(msq, -EIDRM, &wake_q);
64         ss_wakeup(&msq->q_senders, 1);
65         msg_rmid(ns, msq);
66         ipc_unlock_object(&msq->q_perm);
67 +       wake_up_q(&wake_q);
68         rcu_read_unlock();
69  
70         list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
71 @@ -342,6 +338,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
72         struct kern_ipc_perm *ipcp;
73         struct msqid64_ds uninitialized_var(msqid64);
74         struct msg_queue *msq;
75 +       WAKE_Q(wake_q);
76         int err;
77  
78         if (cmd == IPC_SET) {
79 @@ -389,7 +386,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
80                 /* sleeping receivers might be excluded by
81                  * stricter permissions.
82                  */
83 -               expunge_all(msq, -EAGAIN);
84 +               expunge_all(msq, -EAGAIN, &wake_q);
85                 /* sleeping senders might be able to send
86                  * due to a larger queue size.
87                  */
88 @@ -402,6 +399,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
89  
90  out_unlock0:
91         ipc_unlock_object(&msq->q_perm);
92 +       wake_up_q(&wake_q);
93  out_unlock1:
94         rcu_read_unlock();
95  out_up:
96 @@ -566,7 +564,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
97         return 0;
98  }
99  
100 -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
101 +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
102 +                                struct wake_q_head *wake_q)
103  {
104         struct msg_receiver *msr, *t;
105  
106 @@ -577,27 +576,13 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
107  
108                         list_del(&msr->r_list);
109                         if (msr->r_maxsize < msg->m_ts) {
110 -                               /* initialize pipelined send ordering */
111 -                               msr->r_msg = NULL;
112 -                               wake_up_process(msr->r_tsk);
113 -                               /* barrier (B) see barrier comment below */
114 -                               smp_wmb();
115 +                               wake_q_add(wake_q, msr->r_tsk);
116                                 msr->r_msg = ERR_PTR(-E2BIG);
117                         } else {
118 -                               msr->r_msg = NULL;
119                                 msq->q_lrpid = task_pid_vnr(msr->r_tsk);
120                                 msq->q_rtime = get_seconds();
121 -                               wake_up_process(msr->r_tsk);
122 -                               /*
123 -                                * Ensure that the wakeup is visible before
124 -                                * setting r_msg, as the receiving can otherwise
125 -                                * exit - once r_msg is set, the receiver can
126 -                                * continue. See lockless receive part 1 and 2
127 -                                * in do_msgrcv(). Barrier (B).
128 -                                */
129 -                               smp_wmb();
130 +                               wake_q_add(wake_q, msr->r_tsk);
131                                 msr->r_msg = msg;
132 -
133                                 return 1;
134                         }
135                 }
136 @@ -613,6 +598,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
137         struct msg_msg *msg;
138         int err;
139         struct ipc_namespace *ns;
140 +       WAKE_Q(wake_q);
141  
142         ns = current->nsproxy->ipc_ns;
143  
144 @@ -698,7 +684,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
145         msq->q_lspid = task_tgid_vnr(current);
146         msq->q_stime = get_seconds();
147  
148 -       if (!pipelined_send(msq, msg)) {
149 +       if (!pipelined_send(msq, msg, &wake_q)) {
150                 /* no one is waiting for this message, enqueue it */
151                 list_add_tail(&msg->m_list, &msq->q_messages);
152                 msq->q_cbytes += msgsz;
153 @@ -712,6 +698,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
154  
155  out_unlock0:
156         ipc_unlock_object(&msq->q_perm);
157 +       wake_up_q(&wake_q);
158  out_unlock1:
159         rcu_read_unlock();
160         if (msg != NULL)
161 @@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
162                 rcu_read_lock();
163  
164                 /* Lockless receive, part 2:
165 -                * Wait until pipelined_send or expunge_all are outside of
166 -                * wake_up_process(). There is a race with exit(), see
167 -                * ipc/mqueue.c for the details. The correct serialization
168 -                * ensures that a receiver cannot continue without the wakeup
169 -                * being visibible _before_ setting r_msg:
170 +                * The work in pipelined_send() and expunge_all():
171 +                * - Set pointer to message
172 +                * - Queue the receiver task for later wakeup
173 +                * - Wake up the process after the lock is dropped.
174                  *
175 -                * CPU 0                             CPU 1
176 -                * <loop receiver>
177 -                *   smp_rmb(); (A) <-- pair -.      <waker thread>
178 -                *   <load ->r_msg>           |        msr->r_msg = NULL;
179 -                *                            |        wake_up_process();
180 -                * <continue>                 `------> smp_wmb(); (B)
181 -                *                                     msr->r_msg = msg;
182 -                *
183 -                * Where (A) orders the message value read and where (B) orders
184 -                * the write to the r_msg -- done in both pipelined_send and
185 -                * expunge_all.
186 +                * Should the process wake up before this wakeup (due to a
187 +                * signal) it will either see the message and continue …
188                  */
189 -               for (;;) {
190 -                       /*
191 -                        * Pairs with writer barrier in pipelined_send
192 -                        * or expunge_all.
193 -                        */
194 -                       smp_rmb(); /* barrier (A) */
195 -                       msg = (struct msg_msg *)msr_d.r_msg;
196 -                       if (msg)
197 -                               break;
198  
199 -                       /*
200 -                        * The cpu_relax() call is a compiler barrier
201 -                        * which forces everything in this loop to be
202 -                        * re-loaded.
203 -                        */
204 -                       cpu_relax();
205 -               }
206 -
207 -               /* Lockless receive, part 3:
208 -                * If there is a message or an error then accept it without
209 -                * locking.
210 -                */
211 +               msg = (struct msg_msg *)msr_d.r_msg;
212                 if (msg != ERR_PTR(-EAGAIN))
213                         goto out_unlock1;
214  
215 -               /* Lockless receive, part 3:
216 -                * Acquire the queue spinlock.
217 -                */
218 +                /*
219 +                 * … or see -EAGAIN, acquire the lock to check the message
220 +                 * again.
221 +                 */
222                 ipc_lock_object(&msq->q_perm);
223  
224 -               /* Lockless receive, part 4:
225 -                * Repeat test after acquiring the spinlock.
226 -                */
227                 msg = (struct msg_msg *)msr_d.r_msg;
228                 if (msg != ERR_PTR(-EAGAIN))
229                         goto out_unlock0;
230 -- 
231 1.9.1
232