]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomainApp.c
2049991ffe7d91ab049e17c02b2820a9be07f24a
[orte.git] / orte / liborte / ORTEDomainApp.c
1 /*
2  *  $Id: ORTEDomainApp.c,v 0.0.0.1      2003/08/21 
3  *
4  *  DEBUG:  section 21                  Domain application
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 ORTEDomain * 
26 ORTEDomainAppCreate(int domain, ORTEDomainProp *prop,
27                     ORTEDomainAppEvents *events) {
28   ORTEDomain        *d;
29   ObjectEntryOID    *objectEntryOID;
30   AppParams         *appParams;
31   CSTWriterParams   cstWriterParams;
32   CSTReaderParams   cstReaderParams;
33   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
34   int               i;
35   u_int16_t         port=0;
36
37   debug(21,10) ("ORTEDomainAppCreate: start\n");
38   //Create domainApplication
39   d=MALLOC(sizeof(ORTEDomain));
40   if (!d) return NULL;  //no memory
41   //initialization local values
42   d->domain=domain;
43   d->taskRecvMetatraffic.terminate=ORTE_FALSE;
44   d->taskRecvUserdata.terminate=ORTE_FALSE;
45   d->taskSend.terminate=ORTE_FALSE;
46   d->taskRecvMetatraffic.sock.port=0;
47   d->taskRecvUserdata.sock.port=0;
48   d->taskSend.sock.port=0;
49   PublParamsInit(&d->publPropDefault);
50   SubsParamsInit(&d->subsPropDefault);
51   //init structure objectEntry
52   ObjectEntryHID_init_root_field(&d->objectEntry);
53   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
54   htimerRoot_init_queue(&d->objectEntry);
55   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
56   pthread_mutex_init(&d->objectEntry.htimSendMutex, NULL);
57   //publication,subscriptions
58   d->publications.counter=d->subscriptions.counter=0;
59   CSTWriter_init_root_field(&d->publications);
60   CSTReader_init_root_field(&d->subscriptions);
61   pthread_rwlock_init(&d->publications.lock,NULL);
62   pthread_rwlock_init(&d->subscriptions.lock,NULL);
63   //publication,subscriptions lists
64   PublicationList_init_root_field(&d->psEntry);
65   pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
66   SubscriptionList_init_root_field(&d->psEntry);
67   pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
68   
69   //pattern
70   ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
71   SubscriptionPattern_init_head(&d->patternEntry);
72     
73   //create domainProp 
74   if (prop!=NULL) {
75     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
76   } else {
77     ORTEDomainPropDefaultGet(&d->domainProp);
78   }
79   
80   //print local IP addresses
81   iflocal[0]=0;
82   if (d->domainProp.IFCount) {
83     for(i=0;i<d->domainProp.IFCount;i++)
84       strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress));
85     debug(21,2) ("ORTEDomainAppCreate: localIPAddres(es) %s\n",iflocal);
86   } else{
87     debug(21,2) ("ORTEDomainAppCreate: no activ interface card\n");
88   }
89
90   //DomainEvents
91   if (events!=NULL) {
92     memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
93   } else {
94     memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
95   }
96
97   //local buffers
98   d->mbRecvMetatraffic.cdrStream.buffer=
99       (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
100   d->mbRecvUserdata.cdrStream.buffer=
101       (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
102   d->mbSend.cdrStream.buffer=
103       (u_int8_t*)MALLOC(d->domainProp.sendBuffSize);
104   if ((!d->mbRecvMetatraffic.cdrStream.buffer) || 
105       (!d->mbRecvUserdata.cdrStream.buffer) || 
106       (!d->mbSend.cdrStream.buffer)) {    //no memory
107     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
108     FREE(d->mbRecvUserdata.cdrStream.buffer);
109     FREE(d->mbSend.cdrStream.buffer);
110     FREE(d);
111     return NULL;
112   }
113   d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
114   d->mbRecvMetatraffic.cdrStream.length=0;
115   d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
116   d->mbRecvUserdata.cdrStream.length=0;
117   d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
118   d->mbSend.cdrStream.length=0;
119
120   //TypeRegister
121   ORTEType_init_root_field(&d->typeEntry);
122   pthread_rwlock_init(&d->typeEntry.lock,NULL);
123
124   //Sockets
125   sock_init_udp(&d->taskRecvMetatraffic.sock);
126   sock_init_udp(&d->taskRecvUserdata.sock);
127   sock_init_udp(&d->taskSend.sock);
128   if (d->domainProp.multicast.enabled) {
129     Domain2PortMulticastMetatraffic(d->domain,port);   
130   } else {
131     Domain2Port(d->domain,port);
132   }
133   sock_bind(&d->taskRecvMetatraffic.sock,0); //give me receiving port (metatraffic)
134   debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvMetatraffic):%u\n",
135                d->taskRecvMetatraffic.sock.port);
136   sock_bind(&d->taskRecvUserdata.sock,0); //give me receiving port (userdata)
137   debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvUserdata):%u\n",
138                d->taskRecvUserdata.sock.port);
139   sock_bind(&d->taskSend.sock,0);           //give me sending port
140   debug(21,2) ("ORTEDomainAppCreate: Bind on port(Send):%u\n",
141                d->taskSend.sock.port);
142   if ((d->taskRecvMetatraffic.sock.fd<0) || 
143       (d->taskRecvUserdata.sock.fd<0) ||
144       (d->taskSend.sock.fd<0)) {
145     debug(21,0) ("Error creating socket(s).\n");
146     sock_cleanup(&d->taskRecvMetatraffic.sock);
147     sock_cleanup(&d->taskRecvUserdata.sock);
148     sock_cleanup(&d->taskSend.sock);
149     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
150     FREE(d->mbRecvUserdata.cdrStream.buffer);
151     FREE(d->mbSend.cdrStream.buffer);
152     FREE(d);
153     return NULL;
154   }
155   if (d->domainProp.multicast.enabled) {
156     struct ip_mreq mreq;
157     //ttl
158     if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL, 
159         &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
160       debug(21,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
161            d->domainProp.multicast.ttl);
162     } 
163     // join multicast group
164     mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
165     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
166     if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
167         (void *) &mreq, sizeof(mreq))>=0) {
168       debug(21,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
169            IPAddressToString(d->domainProp.multicast.ipAddress));
170     }
171   }
172
173   //Generates local GUID
174   if (d->domainProp.IFCount>0) 
175     d->guid.hid=d->domainProp.IFProp[0].ipAddress;
176   else
177     d->guid.hid=StringToIPAddress("127.0.0.1");
178   d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION; 
179   d->guid.oid=OID_APP;
180   debug(29,2) ("ORTEDomainAppCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
181                d->guid.hid,d->guid.aid,d->guid.oid); 
182
183   //create HEADER of message for sending task
184   RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
185   d->mbSend.cdrStream.bufferPtr=
186       d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
187   d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;    
188   d->mbSend.needSend=ORTE_FALSE;
189   d->mbSend.containsInfoReply=ORTE_FALSE;  
190   
191   //Self object data & fellow managers object data
192   appParams=(AppParams*)MALLOC(sizeof(AppParams));
193   AppParamsInit(appParams);
194   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
195   VENDOR_ID_OCERA(appParams->vendorId);
196   appParams->hostId=d->guid.hid;
197   appParams->appId=d->guid.aid;
198   appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
199   appParams->userdataUnicastPort=d->taskRecvUserdata.sock.port;  
200   if (d->domainProp.multicast.enabled) {
201     //multicast
202     for(i=0;i<d->domainProp.IFCount;i++)
203       appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
204     appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
205   } else {
206     //unicast
207     if (d->domainProp.IFCount) {
208       for(i=0;i<d->domainProp.IFCount;i++)
209         appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
210       appParams->unicastIPAddressCount=d->domainProp.IFCount;
211     } else {
212       appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
213       appParams->unicastIPAddressCount=1;
214     }
215   }
216   //ApplicatonKeyList
217   appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
218   for(i=0;i<d->domainProp.IFCount;i++)
219     appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
220   appParams->managerKeyCount=d->domainProp.IFCount+1;
221   d->appParams=appParams;
222   //insert object, doesn't need to be locked
223   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
224   d->objectEntryOID->private=ORTE_TRUE;
225
226   //CST objects
227   //  writerApplicationSelf (WAS)
228   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
229   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
230   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
231   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
232   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
233   cstWriterParams.fullAcknowledge=ORTE_TRUE;
234   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
235       OID_WRITE_APPSELF,&cstWriterParams,NULL);
236   //  add to WAS remote writer(s)
237   if (d->domainProp.appLocalManager) {
238     GUID_RTPS guid;
239     guid.hid=d->domainProp.appLocalManager;
240     guid.aid=AID_UNKNOWN;
241     guid.oid=OID_APP;
242     if (!objectEntryFind(d,&guid)) {
243       appParams=(AppParams*)MALLOC(sizeof(AppParams));
244       AppParamsInit(appParams);
245       appParams->hostId=guid.hid;
246       appParams->appId=guid.aid;
247       appParams->metatrafficUnicastPort=port;
248       appParams->userdataUnicastPort=0;  //Manager support only metatraffic
249       appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
250       appParams->unicastIPAddressCount=1;
251       objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
252       CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
253           OID_READ_MGR);
254       debug(21,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
255                   IPAddressToString(d->domainProp.appLocalManager));
256     }
257   }
258   //  readerManagers
259   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
260   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
261   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
262   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
263   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
264   CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
265       OID_READ_MGR,&cstReaderParams,NULL);
266   //  readerApplications
267   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
268   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
269   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
270   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
271   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
272   CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
273       OID_READ_APP,&cstReaderParams,NULL);
274   //  writerPublications
275   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
276   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
277   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
278   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
279   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
280   cstWriterParams.fullAcknowledge=ORTE_TRUE;
281   CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
282       OID_WRITE_PUBL,&cstWriterParams,NULL);
283   //  writerSubscriptions
284   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
285   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
286   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
287   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
288   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
289   cstWriterParams.fullAcknowledge=ORTE_TRUE;
290   CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
291       OID_WRITE_SUBS,&cstWriterParams,NULL);
292   //  readerPublications
293   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
294   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
295   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
296   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
297   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
298   CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
299       OID_READ_PUBL,&cstReaderParams,NULL);
300   //  readerSubscriptions
301   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
302   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
303   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
304   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
305   cstReaderParams.fullAcknowledge=ORTE_TRUE;      
306   CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
307       OID_READ_SUBS,&cstReaderParams,NULL);
308   
309   //add csChange for WAS
310   appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE);
311   
312   //Start threads
313   pthread_mutex_lock(&d->objectEntry.htimSendMutex);
314   pthread_create(&d->taskRecvMetatraffic.thread, NULL,
315                  (void*)&ORTEAppRecvMetatrafficThread, (void *)d); 
316   pthread_create(&d->taskRecvUserdata.thread, NULL,
317                  (void*)&ORTEAppRecvUserdataThread, (void *)d); 
318   pthread_create(&d->taskSend.thread, NULL,
319                  (void*)&ORTEAppSendThread, (void *)d); 
320
321   debug(21,10) ("ORTEDomainAppCreate: finished\n");
322   return d;
323 }
324
325 /*****************************************************************************/
326 Boolean
327 ORTEDomainAppDestroy(ORTEDomain *d) {
328   CSTWriter             *cstWriter;
329   CSTReader             *cstReader;
330
331   debug(21,10) ("ORTEDomainAppDestroy: start\n");
332   if (!d) return ORTE_FALSE;
333   //Stoping threads
334   d->taskRecvMetatraffic.terminate=ORTE_TRUE;
335   d->taskRecvUserdata.terminate=ORTE_TRUE;
336   d->taskSend.terminate=ORTE_TRUE;
337   ORTEDomainWakeUpReceivingThread(d,
338       &d->taskSend.sock,d->taskRecvMetatraffic.sock.port); 
339   pthread_join(d->taskRecvMetatraffic.thread,NULL); 
340   ORTEDomainWakeUpReceivingThread(d,
341       &d->taskSend.sock,d->taskRecvUserdata.sock.port); 
342   pthread_join(d->taskRecvUserdata.thread,NULL); 
343   ORTEDomainWakeUpSendingThread(&d->objectEntry); 
344   pthread_join(d->taskSend.thread,NULL); 
345   debug(21,3) ("ORTEDomainAppDestroy: threads stoped\n");
346   
347   //Sockets
348   sock_cleanup(&d->taskRecvMetatraffic.sock);
349   sock_cleanup(&d->taskRecvUserdata.sock);
350   sock_cleanup(&d->taskSend.sock);
351
352   //Mutex(es)
353   pthread_mutex_destroy(&d->objectEntry.htimSendMutex); 
354
355   //rwLocks
356   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
357   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
358   pthread_rwlock_destroy(&d->publications.lock);
359   pthread_rwlock_destroy(&d->subscriptions.lock);
360   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
361   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
362
363   //TypeRegister
364   ORTETypeRegisterDestroyAll(d);
365   pthread_rwlock_destroy(&d->typeEntry.lock);
366   
367   //Pattern
368   ORTEDomainAppSubscriptionPatternDestroyAll(d);
369   
370   //CSTReaders and CSTWriters
371   CSTWriterDelete(d,&d->writerApplicationSelf);
372   CSTReaderDelete(d,&d->readerManagers);
373   CSTReaderDelete(d,&d->readerApplications);
374   CSTWriterDelete(d,&d->writerPublications);
375   CSTWriterDelete(d,&d->writerSubscriptions);
376   CSTReaderDelete(d,&d->readerPublications);
377   CSTReaderDelete(d,&d->readerSubscriptions);
378   gavl_cust_for_each(CSTWriter,&d->publications,cstWriter) {
379     CSTWriterDelete(d,cstWriter);
380     FREE(cstWriter);
381   }  
382   gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
383     CSTReaderDelete(d,cstReader);
384     FREE(cstReader);
385   }  
386     
387   //objects in objectsEntry
388   objectEntryDeleteAll(d,&d->objectEntry);
389   
390   FREE(d->mbRecvMetatraffic.cdrStream.buffer);
391   FREE(d->mbRecvUserdata.cdrStream.buffer);
392   FREE(d->mbSend.cdrStream.buffer);
393   FREE(d);
394   debug(21,10) ("ORTEDomainAppDestroy: finished\n");
395   return ORTE_TRUE;
396 }
397
398 /*****************************************************************************/
399 Boolean 
400 ORTEDomainAppSubscriptionPatternAdd(ORTEDomain *d,const char *topic,
401     const char *type,ORTESubscriptionPatternCallBack subscriptionCallBack, 
402     void *param) {
403   SubscriptionPatternNode *psnode;
404   
405   if (!d) return ORTE_FALSE;
406   psnode=(SubscriptionPatternNode*)MALLOC(sizeof(SubscriptionPatternNode));
407   strcpy(psnode->topic,topic);
408   strcpy(psnode->type,type);
409   psnode->subscriptionCallBack=subscriptionCallBack;
410   psnode->param=param;
411   SubscriptionPattern_insert(&d->patternEntry,psnode);
412   return ORTE_TRUE;
413 }
414
415 /*****************************************************************************/
416 Boolean 
417 ORTEDomainAppSubscriptionPatternRemove(ORTEDomain *d,const char *topic,
418     const char *type) {
419   SubscriptionPatternNode *psnode;
420   
421   if (!d) return ORTE_FALSE;
422   ul_list_for_each(SubscriptionPattern,&d->patternEntry,psnode) {
423     if ((strcmp(psnode->topic,topic)==0) &&
424         (strcmp(psnode->type,type)==0)) {
425       SubscriptionPattern_delete(&d->patternEntry,psnode);
426       FREE(psnode);
427       return ORTE_TRUE;
428     }
429   }
430   return ORTE_FALSE;
431 }
432
433 /*****************************************************************************/
434 Boolean 
435 ORTEDomainAppSubscriptionPatternDestroyAll(ORTEDomain *d) {
436   SubscriptionPatternNode *psnode;
437   
438   if (!d) return ORTE_FALSE;
439   while((psnode=SubscriptionPattern_cut_first(&d->patternEntry))) {
440     FREE(psnode);
441   }
442   return ORTE_TRUE;
443 }