]> rtime.felk.cvut.cz Git - lincan.git/blob - lincan/src/can_queue.c
Added support for local message processing and some cleanups.
[lincan.git] / lincan / src / can_queue.c
1 /* can_queue.c - CAN message queues
2  * Linux CAN-bus device driver.
3  * New CAN queues by Pavel Pisa - OCERA team member
4  * email:pisa@cmp.felk.cvut.cz
5  * This software is released under the GPL-License.
6  * Version lincan-0.2  9 Jul 2003
7  */
8
9 #define __NO_VERSION__
10 #include <linux/module.h>
11 #include <linux/version.h>
12 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,4,0))
13 #include <linux/malloc.h>
14 #else
15 #include <linux/slab.h>
16 #endif
17 #include <linux/wait.h>
18 #include "../include/can.h"
19 #include "../include/can_queue.h"
20
21 /* 
22  * Modifies Tx message processing 
23  *  0 .. local message processing disabled
24  *  1 .. local messages disabled by default but can be enabled by canque_set_filt
25  *  2 .. local messages enabled by default, can be disabled by canque_set_filt
26  */
27 extern int processlocal;
28
29 /*#define CAN_DEBUG*/
30
31 #ifdef CAN_DEBUG
32         #define DEBUGQUE(fmt,args...) printk(KERN_ERR "can_queue (debug): " fmt,\
33         ##args)
34         
35   atomic_t edge_num_cnt;
36 #else
37         #define DEBUGQUE(fmt,args...)
38 #endif
39
40 #define CANQUE_ROUNDROB 1
41
42 /**
43  * canque_fifo_flush_slots - free all ready slots from the FIFO
44  * @fifo: pointer to the FIFO structure
45  *
46  * The caller should be prepared to handle situations, when some
47  * slots are held by input or output side slots processing.
48  * These slots cannot be flushed or their processing interrupted.
49  *
50  * Return Value: The nonzero value indicates, that queue has not been
51  *      empty before the function call.
52  */
53 int canque_fifo_flush_slots(struct canque_fifo_t *fifo)
54 {
55         int ret;
56         unsigned long flags;
57         struct canque_slot_t *slot;
58         spin_lock_irqsave(&fifo->fifo_lock, flags);
59         slot=fifo->head;
60         if(slot){
61                 *fifo->tail=fifo->flist;
62                 fifo->flist=slot;
63                 fifo->head=NULL;
64                 fifo->tail=&fifo->head;
65         }
66         canque_fifo_clear_fl(fifo,FULL);
67         ret=canque_fifo_test_and_set_fl(fifo,EMPTY)?0:1;
68         spin_unlock_irqrestore(&fifo->fifo_lock, flags);
69         return ret;
70 }
71
72
73 /**
74  * canque_fifo_init_slots - initialize one CAN FIFO
75  * @fifo: pointer to the FIFO structure
76  * @slotsnr: number of requested slots
77  *
78  * Return Value: The negative value indicates, that there is no memory
79  *      to allocate space for the requested number of the slots.
80  */
81 int canque_fifo_init_slots(struct canque_fifo_t *fifo, int slotsnr)
82 {
83         int size;
84         struct canque_slot_t *slot;
85         if(!slotsnr) slotsnr=MAX_BUF_LENGTH;
86         size=sizeof(struct canque_slot_t)*slotsnr;
87         fifo->entry=kmalloc(size,GFP_KERNEL);
88         if(!fifo->entry) return -1;
89         slot=fifo->entry;
90         fifo->flist=slot;
91         while(--slotsnr){
92                 slot->next=slot+1;
93                 slot++;
94         }
95         slot->next=NULL;
96         fifo->head=NULL;
97         fifo->tail=&fifo->head;
98         canque_fifo_set_fl(fifo,EMPTY);
99         return 1;
100 }
101
102 /**
103  * canque_fifo_done - frees slots allocated for CAN FIFO
104  * @fifo: pointer to the FIFO structure
105  */
106 int canque_fifo_done(struct canque_fifo_t *fifo)
107 {
108         if(fifo->entry)
109                 kfree(fifo->entry);
110         fifo->entry=NULL;
111         return 1;
112 }
113
114 /* atomic_dec_and_test(&qedge->edge_used);
115  void atomic_inc(&qedge->edge_used);
116  list_add_tail(struct list_head *new, struct list_head *head)
117  list_for_each(edge,qends->inlist);
118  list_entry(ptr, type, member)
119 */
120
121 /**
122  * canque_get_inslot - finds one outgoing edge and allocates slot from it
123  * @qends: ends structure belonging to calling communication object
124  * @qedgep: place to store pointer to found edge
125  * @slotp: place to store pointer to  allocated slot
126  * @cmd: command type for slot
127  *
128  * Function looks for the first non-blocked outgoing edge in @qends structure
129  * and tries to allocate slot from it.
130  * Return Value: If there is no usable edge or there is no free slot in edge
131  *      negative value is returned.
132  */
133 int canque_get_inslot(struct canque_ends_t *qends,
134         struct canque_edge_t **qedgep, struct canque_slot_t **slotp, int cmd)
135 {
136         int ret=-2;
137         unsigned long flags;
138         struct canque_edge_t *edge;
139         
140         spin_lock_irqsave(&qends->ends_lock, flags);
141         if(!list_empty(&qends->inlist)){
142                 edge=list_entry(qends->inlist.next,struct canque_edge_t,inpeers);
143                 if(!canque_fifo_test_fl(&edge->fifo,BLOCK)&&!canque_fifo_test_fl(&edge->fifo,DEAD)){
144                         atomic_inc(&edge->edge_used);
145                         spin_unlock_irqrestore(&qends->ends_lock, flags);
146                         ret=canque_fifo_get_inslot(&edge->fifo, slotp, cmd);
147                         if(ret>0){
148                                 *qedgep=edge;
149                                 DEBUGQUE("canque_get_inslot cmd=%d found edge %d\n",cmd,edge->edge_num);
150                                 return ret;
151
152                         }
153                         spin_lock_irqsave(&qends->ends_lock, flags);
154                         if(atomic_dec_and_test(&edge->edge_used))
155                                 canque_notify_bothends(edge,CANQUEUE_NOTIFY_NOUSR);
156                 }
157         }
158         spin_unlock_irqrestore(&qends->ends_lock, flags);
159         *qedgep=NULL;
160         DEBUGQUE("canque_get_inslot cmd=%d failed\n",cmd);
161         return ret;
162 }
163
164 /**
165  * canque_get_inslot4id - finds best outgoing edge and slot for given ID
166  * @qends: ends structure belonging to calling communication object
167  * @qedgep: place to store pointer to found edge
168  * @slotp: place to store pointer to  allocated slot
169  * @cmd: command type for slot
170  * @id: communication ID of message to send into edge
171  * @prio: optional priority of message
172  *
173  * Function looks for the non-blocked outgoing edge accepting messages
174  * with given ID. If edge is found, slot is allocated from that edge.
175  * The edges with non-zero mask are preferred over edges open to all messages.
176  * If more edges with mask accepts given message ID, the edge with
177  * highest priority below or equal to required priority is selected.
178  * Return Value: If there is no usable edge or there is no free slot in edge
179  *      negative value is returned.
180  */
181 int canque_get_inslot4id(struct canque_ends_t *qends,
182         struct canque_edge_t **qedgep, struct canque_slot_t **slotp,
183         int cmd, unsigned long id, int prio)
184 {
185         int ret=-2;
186         unsigned long flags;
187         struct canque_edge_t *edge, *bestedge=NULL;
188         struct list_head *entry;
189         
190         spin_lock_irqsave(&qends->ends_lock, flags);
191         list_for_each(entry,&qends->inlist){
192                 edge=list_entry(entry,struct canque_edge_t,inpeers);
193                 if(canque_fifo_test_fl(&edge->fifo,BLOCK)||canque_fifo_test_fl(&edge->fifo,DEAD))
194                         continue;
195                 if((id^edge->filtid)&edge->filtmask)
196                         continue;
197                 if(bestedge){
198                         if(bestedge->filtmask){
199                                 if (!edge->filtmask) continue;
200                         } else {
201                                 if(edge->filtmask){
202                                         bestedge=edge;
203                                         continue;
204                                 }
205                         }
206                         if(bestedge->edge_prio<edge->edge_prio){
207                                 if(edge->edge_prio>prio) continue;
208                         } else {
209                                 if(bestedge->edge_prio<=prio) continue;
210                         }
211                 }
212                 bestedge=edge;
213         }
214         if((edge=bestedge)!=NULL){
215                 atomic_inc(&edge->edge_used);
216                 spin_unlock_irqrestore(&qends->ends_lock, flags);
217                 ret=canque_fifo_get_inslot(&edge->fifo, slotp, cmd);
218                 if(ret>0){
219                         *qedgep=edge;
220                         DEBUGQUE("canque_get_inslot4id cmd=%d id=%ld prio=%d found edge %d\n",cmd,id,prio,edge->edge_num);
221                         return ret;
222                 }
223                 spin_lock_irqsave(&qends->ends_lock, flags);
224                 if(atomic_dec_and_test(&edge->edge_used))
225                         canque_notify_bothends(edge,CANQUEUE_NOTIFY_NOUSR);
226         }
227         spin_unlock_irqrestore(&qends->ends_lock, flags);
228         *qedgep=NULL;
229         DEBUGQUE("canque_get_inslot4id cmd=%d id=%ld prio=%d failed\n",cmd,id,prio);
230         return ret;
231 }
232
233
234 /**
235  * canque_put_inslot - schedules filled slot for processing
236  * @qends: ends structure belonging to calling communication object
237  * @qedge: edge slot belong to
238  * @slot: pointer to the prepared slot
239  *
240  * Puts slot previously acquired by canque_get_inslot() or canque_get_inslot4id()
241  * function call into FIFO queue and activates edge processing if needed.
242  * Return Value: Positive value informs, that activation of output end
243  *      has been necessary
244  */
245 int canque_put_inslot(struct canque_ends_t *qends,
246         struct canque_edge_t *qedge, struct canque_slot_t *slot)
247 {
248         int ret;
249         unsigned long flags;
250         ret=canque_fifo_put_inslot(&qedge->fifo,slot);
251         if(ret) {
252                 canque_activate_edge(qends,qedge);
253                 canque_notify_outends(qedge,CANQUEUE_NOTIFY_PROC);
254         }
255         spin_lock_irqsave(&qends->ends_lock, flags);
256         if(atomic_dec_and_test(&qedge->edge_used))
257                 canque_notify_bothends(qedge,CANQUEUE_NOTIFY_NOUSR);
258         spin_unlock_irqrestore(&qends->ends_lock, flags);
259         DEBUGQUE("canque_put_inslot for edge %d returned %d\n",qedge->edge_num,ret);
260         return ret;
261 }
262
263 /**
264  * canque_abort_inslot - aborts preparation of the message in the slot
265  * @qends: ends structure belonging to calling communication object
266  * @qedge: edge slot belong to
267  * @slot: pointer to the previously allocated slot
268  *
269  * Frees slot previously acquired by canque_get_inslot() or canque_get_inslot4id()
270  * function call. Used when message copying into slot fails.
271  * Return Value: Positive value informs, that queue full state has been negated.
272  */
273 int canque_abort_inslot(struct canque_ends_t *qends,
274         struct canque_edge_t *qedge, struct canque_slot_t *slot)
275 {
276         int ret;
277         unsigned long flags;
278         ret=canque_fifo_abort_inslot(&qedge->fifo,slot);
279         if(ret) {
280                 canque_notify_outends(qedge,CANQUEUE_NOTIFY_SPACE);
281         }
282         spin_lock_irqsave(&qends->ends_lock, flags);
283         if(atomic_dec_and_test(&qedge->edge_used))
284                 canque_notify_bothends(qedge,CANQUEUE_NOTIFY_NOUSR);
285         spin_unlock_irqrestore(&qends->ends_lock, flags);
286         DEBUGQUE("canque_abort_inslot for edge %d returned %d\n",qedge->edge_num,ret);
287         return ret;
288 }
289
290 /**
291  * canque_filter_msg2edges - sends message into all edges which accept its ID
292  * @qends: ends structure belonging to calling communication object
293  * @msg: pointer to CAN message
294  *
295  * Sends message to all outgoing edges connected to the given ends, which accepts
296  * message communication ID.
297  * Return Value: Returns number of edges message has been send to
298  */
299 int canque_filter_msg2edges(struct canque_ends_t *qends, struct canmsg_t *msg)
300 {
301         int destnr=0;
302         int ret;
303         unsigned long flags;
304         unsigned long msgid;
305         struct canque_edge_t *edge;
306         struct list_head *entry;
307         struct canque_slot_t *slot;
308         
309         DEBUGQUE("canque_filter_msg2edges for msg ID 0x%08lx and flags 0x%02x\n",
310                         msg->id, msg->flags);
311         msgid = canque_filtid2internal(msg->id, msg->flags);
312
313         spin_lock_irqsave(&qends->ends_lock, flags);
314         list_for_each(entry,&qends->inlist){
315                 edge=list_entry(entry,struct canque_edge_t,inpeers);
316                 if(canque_fifo_test_fl(&edge->fifo,BLOCK)||canque_fifo_test_fl(&edge->fifo,DEAD))
317                         continue;
318                 /* FIXME: the next comparison should be outside of ends lock */
319                 if((msgid^edge->filtid)&edge->filtmask)
320                         continue;
321                 atomic_inc(&edge->edge_used);
322                 spin_unlock_irqrestore(&qends->ends_lock, flags);
323                 ret=canque_fifo_get_inslot(&edge->fifo, &slot, 0);
324                 if(ret>0){
325                         slot->msg=*msg;
326                         destnr++;
327                         ret=canque_fifo_put_inslot(&edge->fifo,slot);
328                         if(ret) {
329                                 canque_activate_edge(qends,edge);
330                                 canque_notify_outends(edge,CANQUEUE_NOTIFY_PROC);
331                         }
332
333                 }
334                 spin_lock_irqsave(&qends->ends_lock, flags);
335                 if(atomic_dec_and_test(&edge->edge_used))
336                         canque_notify_bothends(edge,CANQUEUE_NOTIFY_NOUSR);
337         }
338         spin_unlock_irqrestore(&qends->ends_lock, flags);
339         DEBUGQUE("canque_filter_msg2edges sent msg ID %ld to %d edges\n",msg->id,destnr);
340         return destnr;
341 }
342
343 /**
344  * canque_test_outslot - test and retrieve ready slot for given ends
345  * @qends: ends structure belonging to calling communication object
346  * @qedgep: place to store pointer to found edge
347  * @slotp: place to store pointer to received slot
348  *
349  * Function takes highest priority active incoming edge and retrieves
350  * oldest ready slot from it.
351  * Return Value: Negative value informs, that there is no ready output
352  *      slot for given ends. Positive value is equal to the command
353  *      slot has been allocated by the input side.
354  */
355 int canque_test_outslot(struct canque_ends_t *qends,
356         struct canque_edge_t **qedgep, struct canque_slot_t **slotp)
357 {
358         unsigned long flags;
359         int prio;
360         struct canque_edge_t *edge;
361         
362         spin_lock_irqsave(&qends->ends_lock, flags);
363         for(prio=CANQUEUE_PRIO_NR;--prio>=0;){
364                 if(!list_empty(&qends->active[prio])){
365                         edge=list_entry(qends->active[prio].next,struct canque_edge_t,outpeers);
366                         atomic_inc(&edge->edge_used);
367                         spin_unlock_irqrestore(&qends->ends_lock, flags);
368                         *qedgep=edge;
369                         DEBUGQUE("canque_test_outslot found edge %d\n",edge->edge_num);
370                         return canque_fifo_test_outslot(&edge->fifo, slotp);
371                 }
372         }
373         spin_unlock_irqrestore(&qends->ends_lock, flags);
374         *qedgep=NULL;
375         DEBUGQUE("canque_test_outslot no ready slot\n");
376         return -1;
377 }
378
379 /**
380  * canque_free_outslot - frees processed output slot
381  * @qends: ends structure belonging to calling communication object
382  * @qedge: edge slot belong to
383  * @slot: pointer to the processed slot
384  *
385  * Function releases processed slot previously acquired by canque_test_outslot()
386  * function call.
387  * Return Value: Return value informs if input side has been notified
388  *      to know about change of edge state
389  */
390 int canque_free_outslot(struct canque_ends_t *qends,
391         struct canque_edge_t *qedge, struct canque_slot_t *slot)
392 {
393         int ret;
394         unsigned long flags;
395         ret=canque_fifo_free_outslot(&qedge->fifo, slot);
396         if(ret&CAN_FIFOF_EMPTY){
397                 canque_notify_inends(qedge,CANQUEUE_NOTIFY_EMPTY);
398         }
399         if(ret&CAN_FIFOF_FULL)
400                 canque_notify_inends(qedge,CANQUEUE_NOTIFY_SPACE);
401         spin_lock_irqsave(&qends->ends_lock, flags);
402         if((ret&CAN_FIFOF_EMPTY) || CANQUE_ROUNDROB){
403                 spin_lock(&qedge->fifo.fifo_lock);
404                 if(canque_fifo_test_fl(&qedge->fifo,EMPTY)){
405                         list_del(&qedge->outpeers);
406                         list_add(&qedge->outpeers,&qends->idle);
407                 }
408             #if CANQUE_ROUNDROB
409                 else{
410                         list_del(&qedge->outpeers);
411                         list_add_tail(&qedge->outpeers,&qends->active[qedge->edge_prio]);
412                 }
413             #endif /*CANQUE_ROUNDROB*/
414                 spin_unlock(&qedge->fifo.fifo_lock);
415         }
416         if(atomic_dec_and_test(&qedge->edge_used))
417                 canque_notify_bothends(qedge,CANQUEUE_NOTIFY_NOUSR);
418         spin_unlock_irqrestore(&qends->ends_lock, flags);
419         DEBUGQUE("canque_free_outslot for edge %d returned %d\n",qedge->edge_num,ret);
420         return ret;
421 }
422
423 /**
424  * canque_again_outslot - reschedule output slot to process it again later
425  * @qends: ends structure belonging to calling communication object
426  * @qedge: edge slot belong to
427  * @slot: pointer to the slot for re-processing
428  *
429  * Function reschedules slot previously acquired by canque_test_outslot()
430  * function call for second time processing.
431  * Return Value: Function cannot fail.
432  */
433 int canque_again_outslot(struct canque_ends_t *qends,
434         struct canque_edge_t *qedge, struct canque_slot_t *slot)
435 {
436         int ret;
437         unsigned long flags;
438         ret=canque_fifo_again_outslot(&qedge->fifo, slot);
439         spin_lock_irqsave(&qends->ends_lock, flags);
440         if(atomic_dec_and_test(&qedge->edge_used))
441                 canque_notify_bothends(qedge,CANQUEUE_NOTIFY_NOUSR);
442         spin_unlock_irqrestore(&qends->ends_lock, flags);
443         DEBUGQUE("canque_again_outslot for edge %d returned %d\n",qedge->edge_num,ret);
444         return ret;
445 }
446
447 /**
448  * canque_set_filt - sets filter for specified edge
449  * @qedge: pointer to the edge
450  * @filtid: ID to set for the edge
451  * @filtmask: mask used for ID match check
452  * @filtflags: required filer flags
453  *
454  * Return Value: Negative value is returned if edge is in the process of delete.
455  */
456 int canque_set_filt(struct canque_edge_t *qedge,
457         unsigned long filtid, unsigned long filtmask, int filtflags)
458 {
459         int ret;
460         unsigned long flags;
461
462         spin_lock_irqsave(&qedge->fifo.fifo_lock,flags);
463         
464         if(!(filtflags&MSG_PROCESSLOCAL) && (processlocal<2))
465                 filtflags |= MSG_LOCAL_MASK;
466         
467         qedge->filtid=canque_filtid2internal(filtid, filtflags);
468         qedge->filtmask=canque_filtid2internal(filtmask, filtflags>>MSG_FILT_MASK_SHIFT);
469         
470         if(canque_fifo_test_fl(&qedge->fifo,DEAD)) ret=-1;
471         else ret=canque_fifo_test_and_set_fl(&qedge->fifo,BLOCK)?1:0;
472
473         spin_unlock_irqrestore(&qedge->fifo.fifo_lock,flags);
474         if(ret>=0){
475                 canque_notify_bothends(qedge,CANQUEUE_NOTIFY_FILTCH);
476         }
477         spin_lock_irqsave(&qedge->fifo.fifo_lock,flags);
478         if(!ret)canque_fifo_clear_fl(&qedge->fifo,BLOCK);
479         spin_unlock_irqrestore(&qedge->fifo.fifo_lock,flags);
480         
481         DEBUGQUE("canque_set_filt for edge %d, ID %ld, mask %ld, flags %d returned %d\n",
482                   qedge->edge_num,filtid,filtmask,filtflags,ret);
483         return ret;
484 }
485
486 /**
487  * canque_flush - fluesh all ready slots in the edge
488  * @qedge: pointer to the edge
489  *
490  * Tries to flush all allocated slots from the edge, but there could
491  * exist some slots associated to edge which are processed by input
492  * or output side and cannot be flushed at this moment.
493  * Return Value: The nonzero value indicates, that queue has not been
494  *      empty before the function call.
495  */
496 int canque_flush(struct canque_edge_t *qedge)
497 {
498         int ret;
499         unsigned long flags;
500
501         ret=canque_fifo_flush_slots(&qedge->fifo);
502         if(ret){
503                 canque_notify_inends(qedge,CANQUEUE_NOTIFY_EMPTY);
504                 canque_notify_inends(qedge,CANQUEUE_NOTIFY_SPACE);
505                 spin_lock_irqsave(&qedge->outends->ends_lock, flags);
506                 spin_lock(&qedge->fifo.fifo_lock);
507                 if(canque_fifo_test_fl(&qedge->fifo,EMPTY)){
508                         list_del(&qedge->outpeers);
509                         list_add(&qedge->outpeers,&qedge->outends->idle);
510                 }
511                 spin_unlock(&qedge->fifo.fifo_lock);
512                 spin_unlock_irqrestore(&qedge->outends->ends_lock, flags);
513         }
514         DEBUGQUE("canque_flush for edge %d returned %d\n",qedge->edge_num,ret);
515         return ret;
516 }
517
518 /**
519  * canqueue_ends_init_gen - subsystem independent routine to initialize ends state
520  * @qends: pointer to the ends structure
521  *
522  * Return Value: Cannot fail.
523  */
524 int canqueue_ends_init_gen(struct canque_ends_t *qends)
525 {
526         int i;
527         for(i=CANQUEUE_PRIO_NR;--i>=0;){
528                 INIT_LIST_HEAD(&qends->active[i]);
529         }
530         INIT_LIST_HEAD(&qends->idle);
531         INIT_LIST_HEAD(&qends->inlist);
532         spin_lock_init(&qends->ends_lock);
533         return 0;
534 }
535
536
537 /**
538  * canqueue_notify_kern - notification callback handler for Linux userspace clients
539  * @qends: pointer to the callback side ends structure
540  * @qedge: edge which invoked notification 
541  * @what: notification type
542  */
543 void canqueue_notify_kern(struct canque_ends_t *qends, struct canque_edge_t *qedge, int what)
544 {
545         DEBUGQUE("canqueue_notify_kern for edge %d and event %d\n",qedge->edge_num,what);
546         switch(what){
547                 case CANQUEUE_NOTIFY_EMPTY:
548                         wake_up_interruptible(&qends->endinfo.fileinfo.emptyq);
549                         break;
550                 case CANQUEUE_NOTIFY_SPACE:
551                         wake_up_interruptible(&qends->endinfo.fileinfo.writeq);
552                         break;
553                 case CANQUEUE_NOTIFY_PROC:
554                         wake_up_interruptible(&qends->endinfo.fileinfo.readq);
555                         break;
556                 case CANQUEUE_NOTIFY_NOUSR:
557                         wake_up_interruptible(&qends->endinfo.fileinfo.readq);
558                         wake_up_interruptible(&qends->endinfo.fileinfo.writeq);
559                         wake_up_interruptible(&qends->endinfo.fileinfo.emptyq);
560                         break;
561                 case CANQUEUE_NOTIFY_DEAD:
562                         if(atomic_read(&qedge->edge_used)>0)
563                                 atomic_dec(&qedge->edge_used);
564                         break;
565                 case CANQUEUE_NOTIFY_ATACH:
566                         atomic_inc(&qedge->edge_used);
567                         break;
568         }
569 }
570
571 /**
572  * canqueue_ends_init_kern - Linux userspace clients specific ends initialization
573  * @qends: pointer to the callback side ends structure
574  */
575 int canqueue_ends_init_kern(struct canque_ends_t *qends)
576 {
577         canqueue_ends_init_gen(qends);
578         qends->context=NULL;
579         init_waitqueue_head(&qends->endinfo.fileinfo.readq);
580         init_waitqueue_head(&qends->endinfo.fileinfo.writeq);
581         init_waitqueue_head(&qends->endinfo.fileinfo.emptyq);
582         qends->notify=canqueue_notify_kern;
583         DEBUGQUE("canqueue_ends_init_kern\n");
584         return 0;
585 }
586
587
588 /**
589  * canque_get_inslot4id_wait_kern - find or wait for best outgoing edge and slot for given ID
590  * @qends: ends structure belonging to calling communication object
591  * @qedgep: place to store pointer to found edge
592  * @slotp: place to store pointer to  allocated slot
593  * @cmd: command type for slot
594  * @id: communication ID of message to send into edge
595  * @prio: optional priority of message
596  *
597  * Same as canque_get_inslot4id(), except, that it waits for free slot
598  * in case, that queue is full. Function is specific for Linux userspace clients.
599  * Return Value: If there is no usable edge negative value is returned.
600  */
601 int canque_get_inslot4id_wait_kern(struct canque_ends_t *qends,
602         struct canque_edge_t **qedgep, struct canque_slot_t **slotp,
603         int cmd, unsigned long id, int prio)
604 {
605         int ret=-1;
606         DEBUGQUE("canque_get_inslot4id_wait_kern for cmd %d, id %ld, prio %d\n",cmd,id,prio);
607         wait_event_interruptible((qends->endinfo.fileinfo.writeq), 
608                 (ret=canque_get_inslot4id(qends,qedgep,slotp,cmd,id,prio))!=-1);
609         return ret;
610 }
611
612 /**
613  * canque_get_outslot_wait_kern - receive or wait for ready slot for given ends
614  * @qends: ends structure belonging to calling communication object
615  * @qedgep: place to store pointer to found edge
616  * @slotp: place to store pointer to received slot
617  *
618  * The same as canque_test_outslot(), except it waits in the case, that there is
619  * no ready slot for given ends. Function is specific for Linux userspace clients.
620  * Return Value: Negative value informs, that there is no ready output
621  *      slot for given ends. Positive value is equal to the command
622  *      slot has been allocated by the input side.
623  */
624 int canque_get_outslot_wait_kern(struct canque_ends_t *qends,
625         struct canque_edge_t **qedgep, struct canque_slot_t **slotp)
626 {
627         int ret=-1;
628         DEBUGQUE("canque_get_outslot_wait_kern\n");
629         wait_event_interruptible((qends->endinfo.fileinfo.readq), 
630                 (ret=canque_test_outslot(qends,qedgep,slotp))!=-1);
631         return ret;
632 }
633
634 /**
635  * canque_sync_wait_kern - wait for all slots processing
636  * @qends: ends structure belonging to calling communication object
637  * @qedge: pointer to edge
638  *
639  * Functions waits for ends transition into empty state.
640  * Return Value: Positive value indicates, that edge empty state has been reached.
641  *      Negative or zero value informs about interrupted wait or other problem.
642  */
643 int canque_sync_wait_kern(struct canque_ends_t *qends, struct canque_edge_t *qedge)
644 {
645         int ret=-1;
646         DEBUGQUE("canque_sync_wait_kern\n");
647         wait_event_interruptible((qends->endinfo.fileinfo.emptyq), 
648                 (ret=canque_fifo_test_fl(&qedge->fifo,EMPTY)?1:0));
649         return ret;
650 }
651
652
653 /**
654  * canque_new_edge_kern - allocate new edge structure in the Linux kernel context
655  * @slotsnr: required number of slots in the newly allocated edge structure
656  *
657  * Return Value: Returns pointer to allocated slot structure or %NULL if
658  *      there is not enough memory to process operation.
659  */
660 struct canque_edge_t *canque_new_edge_kern(int slotsnr)
661 {
662         struct canque_edge_t *qedge;
663         qedge = (struct canque_edge_t *)kmalloc(sizeof(struct canque_edge_t), GFP_KERNEL);
664         if(qedge == NULL) return NULL;
665
666         memset(qedge,0,sizeof(struct canque_edge_t));
667         spin_lock_init(&qedge->fifo.fifo_lock);
668         if(canque_fifo_init_slots(&qedge->fifo, slotsnr)<0){
669                 kfree(qedge);
670                 DEBUGQUE("canque_new_edge_kern failed\n");
671                 return NULL;
672         }
673         atomic_set(&qedge->edge_used,0);
674         qedge->filtid = 0;
675         qedge->filtmask = canque_filtid2internal(0l, (processlocal<2)? MSG_LOCAL:0);
676         qedge->edge_prio = 0;
677     #ifdef CAN_DEBUG
678         /* not exactly clean, but enough for debugging */
679         atomic_inc(&edge_num_cnt);
680         qedge->edge_num=atomic_read(&edge_num_cnt);
681     #endif /* CAN_DEBUG */
682         DEBUGQUE("canque_new_edge_kern %d\n",qedge->edge_num);
683         return qedge;
684 }
685
686 /**
687  * canqueue_connect_edge - connect edge between two communication entities
688  * @qedge: pointer to edge
689  * @inends: pointer to ends the input of the edge should be connected to
690  * @outends: pointer to ends the output of the edge should be connected to
691  *
692  * Return Value: Negative value informs about failed operation.
693  */
694 int canqueue_connect_edge(struct canque_edge_t *qedge, struct canque_ends_t *inends, struct canque_ends_t *outends)
695 {
696         unsigned long flags;
697         if(qedge == NULL) return -1;
698         DEBUGQUE("canqueue_connect_edge %d\n",qedge->edge_num);
699         atomic_inc(&qedge->edge_used);
700         spin_lock_irqsave(&inends->ends_lock, flags);
701         spin_lock(&outends->ends_lock);
702         spin_lock(&qedge->fifo.fifo_lock);
703         qedge->inends=inends;
704         list_add(&qedge->inpeers,&inends->inlist);
705         qedge->outends=outends;
706         list_add(&qedge->outpeers,&outends->idle);
707         spin_unlock(&qedge->fifo.fifo_lock);
708         spin_unlock(&outends->ends_lock);
709         spin_unlock_irqrestore(&inends->ends_lock, flags);
710         canque_notify_bothends(qedge, CANQUEUE_NOTIFY_ATACH);
711         
712         spin_lock_irqsave(&qedge->fifo.fifo_lock, flags);
713         if(atomic_dec_and_test(&qedge->edge_used))
714                 canque_notify_bothends(qedge,CANQUEUE_NOTIFY_NOUSR);
715         spin_unlock_irqrestore(&qedge->fifo.fifo_lock, flags);
716         return 0;
717 }
718
719 /**
720  * canqueue_disconnect_edge - disconnect edge from communicating entities
721  * @qedge: pointer to edge
722  *
723  * Return Value: Negative value means, that edge is used and cannot
724  *      be disconnected. Operation has to be delayed.
725  */
726 int canqueue_disconnect_edge(struct canque_edge_t *qedge)
727 {
728         int ret;
729         unsigned long flags;
730         struct canque_ends_t *inends, *outends;
731
732         inends=qedge->inends;
733         if(inends) spin_lock_irqsave(&inends->ends_lock,flags);
734         outends=qedge->outends;
735         if(outends) spin_lock(&outends->ends_lock);
736         spin_lock(&qedge->fifo.fifo_lock);
737         if(atomic_read(&qedge->edge_used)==0) {
738                 if(qedge->outends){
739                         list_del(&qedge->outpeers);
740                         qedge->outends=NULL;
741                 }
742                 if(qedge->inends){
743                         list_del(&qedge->inpeers);
744                         qedge->inends=NULL;
745                 }
746                 ret=1;
747         } else ret=-1;
748         spin_unlock(&qedge->fifo.fifo_lock);
749         if(outends) spin_unlock(&outends->ends_lock);
750         if(inends) spin_unlock_irqrestore(&inends->ends_lock,flags);
751         DEBUGQUE("canqueue_disconnect_edge %d returned %d\n",qedge->edge_num,ret);
752         return ret;
753 }
754
755 /**
756  * canqueue_disconnect_edge_kern - disconnect edge from communicating entities with wait
757  * @qends: ends structure belonging to calling communication object
758  * @qedge: pointer to edge
759  *
760  * Same as canqueue_disconnect_edge(), but tries to wait for state with zero
761  * use counter.
762  * Return Value: Negative value means, that edge is used and cannot
763  *      be disconnected yet. Operation has to be delayed.
764  */
765 int canqueue_disconnect_edge_kern(struct canque_ends_t *qends, struct canque_edge_t *qedge)
766 {
767         canque_fifo_set_fl(&qedge->fifo,BLOCK);
768         DEBUGQUE("canqueue_disconnect_edge_kern %d called\n",qedge->edge_num);
769         if(!canque_fifo_test_and_set_fl(&qedge->fifo,DEAD)){
770                 canque_notify_bothends(qedge, CANQUEUE_NOTIFY_DEAD);
771                 if(atomic_read(&qedge->edge_used)>0)
772                         atomic_dec(&qedge->edge_used);
773                 DEBUGQUE("canqueue_disconnect_edge_kern %d waiting\n",qedge->edge_num);
774                 wait_event_interruptible((qends->endinfo.fileinfo.emptyq), 
775                         (canqueue_disconnect_edge(qedge)>=0));
776                 return 0;
777         } else {
778                 DEBUGQUE("canqueue_disconnect_edge_kern failed\n");
779                 return -1;
780         }
781 }
782
783
784 int canqueue_disconnect_list_kern(struct canque_ends_t *qends, struct list_head *list)
785 {
786         struct canque_edge_t *edge;
787         unsigned long flags;
788         for(;;){
789                 spin_lock_irqsave(&qends->ends_lock,flags);
790                 if(list_empty(list)){
791                         spin_unlock_irqrestore(&qends->ends_lock,flags);
792                         return 0;
793                 }
794                 if(list == &qends->inlist)
795                         edge=list_entry(list->next,struct canque_edge_t,inpeers);
796                 else
797                         edge=list_entry(list->next,struct canque_edge_t,outpeers);
798                 atomic_inc(&edge->edge_used);
799                 spin_unlock_irqrestore(&qends->ends_lock,flags);
800                 if(canqueue_disconnect_edge_kern(qends, edge)>=0) {
801                         /* Free edge memory */
802                         canque_fifo_done(&edge->fifo);
803                         kfree(edge);
804                 }else{
805                         DEBUGQUE("canqueue_disconnect_list_kern in troubles\n");
806                         DEBUGQUE("the edge %d has usage count %d and flags %ld\n",edge->edge_num,atomic_read(&edge->edge_used),edge->fifo.fifo_flags);
807                         return -1;
808                 }
809         }
810 }
811
812 void canqueue_block_list(struct canque_ends_t *qends, struct list_head *list)
813 {
814         struct canque_edge_t *edge;
815         struct list_head *entry;
816         
817         /* has to be called with qends->ends_lock already locked */
818         list_for_each(entry,&qends->inlist){
819                 if(list == &qends->inlist)
820                         edge=list_entry(list->next,struct canque_edge_t,inpeers);
821                 else
822                         edge=list_entry(list->next,struct canque_edge_t,outpeers);
823                 canque_fifo_set_fl(&edge->fifo,BLOCK);
824         }
825 }
826
827
828 /**
829  * canqueue_ends_done_kern - finalizing of the ends structure for Linux kernel clients
830  * @qends: pointer to ends structure
831  * @sync: flag indicating, that user wants to wait for processing of all remaining
832  *      messages
833  *
834  * Return Value: Function should be designed such way to not fail.
835  */
836 int canqueue_ends_done_kern(struct canque_ends_t *qends, int sync)
837 {
838         unsigned long flags;
839         int i;
840
841         DEBUGQUE("canqueue_ends_done_kern\n");
842         spin_lock_irqsave(&qends->ends_lock,flags);
843         canqueue_block_list(qends, &qends->idle);
844         for(i=CANQUEUE_PRIO_NR;--i>=0;){
845                 canqueue_block_list(qends, &qends->active[i]);
846         }
847         canqueue_block_list(qends, &qends->idle);
848         canqueue_block_list(qends, &qends->inlist);
849         spin_unlock_irqrestore(&qends->ends_lock,flags);
850
851         for(i=CANQUEUE_PRIO_NR;--i>=0;){
852                 canqueue_disconnect_list_kern(qends, &qends->active[i]);
853         }
854         canqueue_disconnect_list_kern(qends, &qends->idle);
855         canqueue_disconnect_list_kern(qends, &qends->inlist);
856
857         wake_up_interruptible(&qends->endinfo.fileinfo.readq);
858         wake_up_interruptible(&qends->endinfo.fileinfo.writeq);
859         wake_up_interruptible(&qends->endinfo.fileinfo.emptyq);
860         
861
862         return 0;
863 }
864