2 * $Id: ORTEDomainApp.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 21 Domain application
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
25 /*****************************************************************************/
27 ORTEDomainAppCreate(int domain, ORTEDomainProp *prop,
28 ORTEDomainAppEvents *events,Boolean suspended) {
30 ObjectEntryOID *objectEntryOID;
32 CSTWriterParams cstWriterParams;
33 CSTReaderParams cstReaderParams;
34 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
35 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
39 debug(21,10) ("ORTEDomainAppCreate: start\n");
40 //Create domainApplication
41 d=MALLOC(sizeof(ORTEDomain));
42 if (!d) return NULL; //no memory
43 //initialization local values
45 d->taskRecvMetatraffic.terminate=ORTE_TRUE;
46 d->taskRecvUserdata.terminate=ORTE_TRUE;
47 d->taskSend.terminate=ORTE_TRUE;
48 d->taskRecvMetatraffic.sock.port=0;
49 d->taskRecvUserdata.sock.port=0;
50 d->taskSend.sock.port=0;
51 PublParamsInit(&d->publPropDefault);
52 SubsParamsInit(&d->subsPropDefault);
53 //init structure objectEntry
54 ObjectEntryHID_init_root_field(&d->objectEntry);
55 pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
56 htimerRoot_init_queue(&d->objectEntry);
57 pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
58 pthread_mutex_init(&d->objectEntry.htimSendMutex, NULL);
59 //publication,subscriptions
60 d->publications.counter=d->subscriptions.counter=0;
61 CSTWriter_init_root_field(&d->publications);
62 CSTReader_init_root_field(&d->subscriptions);
63 pthread_rwlock_init(&d->publications.lock,NULL);
64 pthread_rwlock_init(&d->subscriptions.lock,NULL);
65 //publication,subscriptions lists
66 PublicationList_init_root_field(&d->psEntry);
67 pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
68 SubscriptionList_init_root_field(&d->psEntry);
69 pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
72 pthread_rwlock_init(&d->patternEntry.lock,NULL);
73 ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
74 Pattern_init_head(&d->patternEntry);
78 memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
80 ORTEDomainPropDefaultGet(&d->domainProp);
83 //print local IP addresses
85 if (d->domainProp.IFCount) {
86 for(i=0;i<d->domainProp.IFCount;i++)
87 strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
88 debug(21,2) ("ORTEDomainAppCreate: localIPAddres(es) %s\n",iflocal);
90 debug(21,2) ("ORTEDomainAppCreate: no activ interface card\n");
95 memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
97 memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
101 d->mbRecvMetatraffic.cdrStream.buffer=
102 (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
103 d->mbRecvUserdata.cdrStream.buffer=
104 (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
105 d->mbSend.cdrStream.buffer=
106 (u_int8_t*)MALLOC(d->domainProp.sendBuffSize);
107 if ((!d->mbRecvMetatraffic.cdrStream.buffer) ||
108 (!d->mbRecvUserdata.cdrStream.buffer) ||
109 (!d->mbSend.cdrStream.buffer)) { //no memory
110 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
111 FREE(d->mbRecvUserdata.cdrStream.buffer);
112 FREE(d->mbSend.cdrStream.buffer);
116 d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
117 d->mbRecvMetatraffic.cdrStream.length=0;
118 d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
119 d->mbRecvUserdata.cdrStream.length=0;
120 d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
121 d->mbSend.cdrStream.length=0;
124 ORTEType_init_root_field(&d->typeEntry);
125 pthread_rwlock_init(&d->typeEntry.lock,NULL);
128 sock_init_udp(&d->taskRecvMetatraffic.sock);
129 sock_init_udp(&d->taskRecvUserdata.sock);
130 sock_init_udp(&d->taskSend.sock);
131 if (d->domainProp.multicast.enabled) {
132 Domain2PortMulticastMetatraffic(d->domain,port);
134 Domain2Port(d->domain,port);
136 sock_bind(&d->taskRecvMetatraffic.sock,0); //give me receiving port (metatraffic)
137 debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvMetatraffic):%u\n",
138 d->taskRecvMetatraffic.sock.port);
139 sock_bind(&d->taskRecvUserdata.sock,0); //give me receiving port (userdata)
140 debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvUserdata):%u\n",
141 d->taskRecvUserdata.sock.port);
142 sock_bind(&d->taskSend.sock,0); //give me sending port
143 debug(21,2) ("ORTEDomainAppCreate: Bind on port(Send):%u\n",
144 d->taskSend.sock.port);
145 if ((d->taskRecvMetatraffic.sock.fd<0) ||
146 (d->taskRecvUserdata.sock.fd<0) ||
147 (d->taskSend.sock.fd<0)) {
148 debug(21,0) ("Error creating socket(s).\n");
149 sock_cleanup(&d->taskRecvMetatraffic.sock);
150 sock_cleanup(&d->taskRecvUserdata.sock);
151 sock_cleanup(&d->taskSend.sock);
152 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
153 FREE(d->mbRecvUserdata.cdrStream.buffer);
154 FREE(d->mbSend.cdrStream.buffer);
158 if (d->domainProp.multicast.enabled) {
161 if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL,
162 &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
163 debug(21,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
164 d->domainProp.multicast.ttl);
166 // join multicast group
167 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
168 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
169 if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
170 (void *) &mreq, sizeof(mreq))>=0) {
171 debug(21,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
172 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
176 //Generates local GUID
177 if (d->domainProp.IFCount>0)
178 d->guid.hid=d->domainProp.IFProp[0].ipAddress;
180 d->guid.hid=StringToIPAddress("127.0.0.1");
181 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
183 debug(29,2) ("ORTEDomainAppCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
184 d->guid.hid,d->guid.aid,d->guid.oid);
186 //create HEADER of message for sending task
187 RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
188 d->mbSend.cdrStream.bufferPtr=
189 d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
190 d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;
191 d->mbSend.needSend=ORTE_FALSE;
192 d->mbSend.containsInfoReply=ORTE_FALSE;
194 //Self object data & fellow managers object data
195 appParams=(AppParams*)MALLOC(sizeof(AppParams));
196 AppParamsInit(appParams);
197 appParams->expirationTime=d->domainProp.baseProp.expirationTime;
198 VENDOR_ID_OCERA(appParams->vendorId);
199 appParams->hostId=d->guid.hid;
200 appParams->appId=d->guid.aid;
201 appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
202 appParams->userdataUnicastPort=d->taskRecvUserdata.sock.port;
203 if (d->domainProp.multicast.enabled) {
205 for(i=0;i<d->domainProp.IFCount;i++)
206 appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
207 appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
210 if (d->domainProp.IFCount) {
211 for(i=0;i<d->domainProp.IFCount;i++)
212 appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
213 appParams->unicastIPAddressCount=d->domainProp.IFCount;
215 appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
216 appParams->unicastIPAddressCount=1;
220 appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
221 for(i=0;i<d->domainProp.IFCount;i++)
222 appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
223 appParams->managerKeyCount=d->domainProp.IFCount+1;
224 d->appParams=appParams;
225 //insert object, doesn't need to be locked
226 d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
227 d->objectEntryOID->private=ORTE_TRUE;
230 // writerApplicationSelf (WAS)
231 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
232 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
233 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
234 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
235 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
236 cstWriterParams.fullAcknowledge=ORTE_TRUE;
237 CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
238 OID_WRITE_APPSELF,&cstWriterParams,NULL);
239 // add to WAS remote writer(s)
240 if (d->domainProp.appLocalManager) {
242 guid.hid=d->domainProp.appLocalManager;
243 guid.aid=AID_UNKNOWN;
245 if (!objectEntryFind(d,&guid)) {
246 appParams=(AppParams*)MALLOC(sizeof(AppParams));
247 AppParamsInit(appParams);
248 appParams->hostId=guid.hid;
249 appParams->appId=guid.aid;
250 appParams->metatrafficUnicastPort=port;
251 appParams->userdataUnicastPort=0; //Manager support only metatraffic
252 appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
253 appParams->unicastIPAddressCount=1;
254 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
255 CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
257 debug(21,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
258 IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
262 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
263 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
264 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
265 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
266 cstReaderParams.fullAcknowledge=ORTE_TRUE;
267 CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
268 OID_READ_MGR,&cstReaderParams,NULL);
269 // readerApplications
270 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
271 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
272 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
273 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
274 cstReaderParams.fullAcknowledge=ORTE_TRUE;
275 CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
276 OID_READ_APP,&cstReaderParams,NULL);
277 // writerPublications
278 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
279 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
280 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
281 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
282 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
283 cstWriterParams.fullAcknowledge=ORTE_TRUE;
284 CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
285 OID_WRITE_PUBL,&cstWriterParams,NULL);
286 // writerSubscriptions
287 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
288 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
289 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
290 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
291 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
292 cstWriterParams.fullAcknowledge=ORTE_TRUE;
293 CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
294 OID_WRITE_SUBS,&cstWriterParams,NULL);
295 // readerPublications
296 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
297 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
298 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
299 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
300 cstReaderParams.fullAcknowledge=ORTE_TRUE;
301 CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
302 OID_READ_PUBL,&cstReaderParams,NULL);
303 // readerSubscriptions
304 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
305 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
306 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
307 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
308 cstReaderParams.fullAcknowledge=ORTE_TRUE;
309 CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
310 OID_READ_SUBS,&cstReaderParams,NULL);
312 //add csChange for WAS
313 appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE);
317 ORTEDomainStart(d,ORTE_TRUE,ORTE_TRUE,ORTE_TRUE);
319 debug(21,10) ("ORTEDomainAppCreate: finished\n");
323 /*****************************************************************************/
325 ORTEDomainAppDestroy(ORTEDomain *d) {
326 CSTWriter *cstWriter;
327 CSTReader *cstReader;
329 debug(21,10) ("ORTEDomainAppDestroy: start\n");
330 if (!d) return ORTE_FALSE;
332 if (!d->taskRecvMetatraffic.terminate) {
333 d->taskRecvMetatraffic.terminate=ORTE_TRUE;
334 ORTEDomainWakeUpReceivingThread(d,
335 &d->taskSend.sock,d->taskRecvMetatraffic.sock.port);
336 pthread_join(d->taskRecvMetatraffic.thread,NULL);
338 if (!d->taskRecvUserdata.terminate) {
339 d->taskRecvUserdata.terminate=ORTE_TRUE;
340 ORTEDomainWakeUpReceivingThread(d,
341 &d->taskSend.sock,d->taskRecvUserdata.sock.port);
342 pthread_join(d->taskRecvUserdata.thread,NULL);
344 if (!d->taskSend.terminate) {
345 d->taskSend.terminate=ORTE_TRUE;
346 ORTEDomainWakeUpSendingThread(&d->objectEntry);
347 pthread_join(d->taskSend.thread,NULL);
349 debug(21,3) ("ORTEDomainAppDestroy: threads stoped\n");
352 sock_cleanup(&d->taskRecvMetatraffic.sock);
353 sock_cleanup(&d->taskRecvUserdata.sock);
354 sock_cleanup(&d->taskSend.sock);
357 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
360 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
361 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
362 pthread_rwlock_destroy(&d->publications.lock);
363 pthread_rwlock_destroy(&d->subscriptions.lock);
364 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
365 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
368 ORTETypeRegisterDestroyAll(d);
369 pthread_rwlock_destroy(&d->typeEntry.lock);
372 ORTEDomainAppSubscriptionPatternDestroyAll(d);
373 pthread_rwlock_unlock(&d->typeEntry.lock);
374 pthread_rwlock_destroy(&d->patternEntry.lock);
376 //CSTReaders and CSTWriters
377 CSTWriterDelete(d,&d->writerApplicationSelf);
378 CSTReaderDelete(d,&d->readerManagers);
379 CSTReaderDelete(d,&d->readerApplications);
380 CSTWriterDelete(d,&d->writerPublications);
381 CSTWriterDelete(d,&d->writerSubscriptions);
382 CSTReaderDelete(d,&d->readerPublications);
383 CSTReaderDelete(d,&d->readerSubscriptions);
385 while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
386 CSTWriterDelete(d,cstWriter);
389 while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
390 CSTReaderDelete(d,cstReader);
394 //objects in objectsEntry
395 objectEntryDeleteAll(d,&d->objectEntry);
397 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
398 FREE(d->mbRecvUserdata.cdrStream.buffer);
399 FREE(d->mbSend.cdrStream.buffer);
401 debug(21,10) ("ORTEDomainAppDestroy: finished\n");
405 /*****************************************************************************/
407 ORTEDomainAppSubscriptionPatternAdd(ORTEDomain *d,const char *topic,
408 const char *type,ORTESubscriptionPatternCallBack subscriptionCallBack,
412 if (!d) return ORTE_FALSE;
413 pnode=(PatternNode*)MALLOC(sizeof(PatternNode));
414 strcpy(pnode->topic,topic);
415 strcpy(pnode->type,type);
416 pnode->subscriptionCallBack=subscriptionCallBack;
418 pthread_rwlock_wrlock(&d->patternEntry.lock);
419 Pattern_insert(&d->patternEntry,pnode);
420 pthread_rwlock_unlock(&d->patternEntry.lock);
424 /*****************************************************************************/
426 ORTEDomainAppSubscriptionPatternRemove(ORTEDomain *d,const char *topic,
430 if (!d) return ORTE_FALSE;
431 pthread_rwlock_wrlock(&d->patternEntry.lock);
432 ul_list_for_each(Pattern,&d->patternEntry,pnode) {
433 if ((strcmp(pnode->topic,topic)==0) &&
434 (strcmp(pnode->type,type)==0)) {
435 Pattern_delete(&d->patternEntry,pnode);
440 pthread_rwlock_unlock(&d->patternEntry.lock);
444 /*****************************************************************************/
446 ORTEDomainAppSubscriptionPatternDestroyAll(ORTEDomain *d) {
449 if (!d) return ORTE_FALSE;
450 pthread_rwlock_wrlock(&d->patternEntry.lock);
451 while((pnode=Pattern_cut_first(&d->patternEntry))) {
454 pthread_rwlock_unlock(&d->patternEntry.lock);