2 * $Id: ORTEDomainApp.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 21 Domain application
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
25 /*****************************************************************************/
27 ORTEDomainAppCreate(int domain, ORTEDomainProp *prop,
28 ORTEDomainAppEvents *events,Boolean suspended) {
30 ObjectEntryOID *objectEntryOID;
32 CSTWriterParams cstWriterParams;
33 CSTReaderParams cstReaderParams;
34 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
35 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
40 debug(21,10) ("ORTEDomainAppCreate: start\n");
41 //Create domainApplication
42 d=MALLOC(sizeof(ORTEDomain));
43 if (!d) return NULL; //no memory
44 //initialization local values
46 d->taskRecvMetatraffic.terminate=ORTE_TRUE;
47 d->taskRecvUserdata.terminate=ORTE_TRUE;
48 d->taskSend.terminate=ORTE_TRUE;
49 d->taskRecvMetatraffic.sock.port=0;
50 d->taskRecvUserdata.sock.port=0;
51 d->taskSend.sock.port=0;
52 //init structure objectEntry
53 ObjectEntryHID_init_root_field(&d->objectEntry);
54 pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
55 htimerRoot_init_queue(&d->objectEntry);
56 pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
57 pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
58 pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
59 d->objectEntry.htimSendCondValue=0;
60 //publication,subscriptions
61 d->publications.counter=d->subscriptions.counter=0;
62 CSTWriter_init_root_field(&d->publications);
63 CSTReader_init_root_field(&d->subscriptions);
64 pthread_rwlock_init(&d->publications.lock,NULL);
65 pthread_rwlock_init(&d->subscriptions.lock,NULL);
66 //publication,subscriptions lists
67 PublicationList_init_root_field(&d->psEntry);
68 pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
69 SubscriptionList_init_root_field(&d->psEntry);
70 pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
73 pthread_rwlock_init(&d->patternEntry.lock,NULL);
74 ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
75 Pattern_init_head(&d->patternEntry);
79 memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
81 ORTEDomainPropDefaultGet(&d->domainProp);
84 //print local IP addresses
86 if (d->domainProp.IFCount) {
87 for(i=0;i<d->domainProp.IFCount;i++)
88 strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
89 debug(21,2) ("ORTEDomainAppCreate: localIPAddres(es) %s\n",iflocal);
91 debug(21,2) ("ORTEDomainAppCreate: no active interface card\n");
96 memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
98 memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
102 d->mbRecvMetatraffic.cdrStream.buffer=
103 (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
104 d->mbRecvUserdata.cdrStream.buffer=
105 (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
106 d->mbSend.cdrStream.buffer=
107 (uint8_t*)MALLOC(d->domainProp.sendBuffSize);
108 if ((!d->mbRecvMetatraffic.cdrStream.buffer) ||
109 (!d->mbRecvUserdata.cdrStream.buffer) ||
110 (!d->mbSend.cdrStream.buffer)) { //no memory
111 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
112 FREE(d->mbRecvUserdata.cdrStream.buffer);
113 FREE(d->mbSend.cdrStream.buffer);
117 d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
118 d->mbRecvMetatraffic.cdrStream.length=0;
119 d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
120 d->mbRecvUserdata.cdrStream.length=0;
121 d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
122 d->mbSend.cdrStream.length=0;
125 ORTEType_init_root_field(&d->typeEntry);
126 pthread_rwlock_init(&d->typeEntry.lock,NULL);
129 sock_init_udp(&d->taskRecvMetatraffic.sock);
130 sock_init_udp(&d->taskRecvUserdata.sock);
131 sock_init_udp(&d->taskSend.sock);
132 if (d->domainProp.multicast.enabled) {
133 Domain2PortMulticastMetatraffic(d->domain,port);
135 Domain2Port(d->domain,port);
137 sock_bind(&d->taskRecvMetatraffic.sock,0); //give me receiving port (metatraffic)
138 debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvMetatraffic):%u\n",
139 d->taskRecvMetatraffic.sock.port);
140 sock_bind(&d->taskRecvUserdata.sock,0); //give me receiving port (userdata)
141 debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvUserdata):%u\n",
142 d->taskRecvUserdata.sock.port);
143 sock_bind(&d->taskSend.sock,0); //give me sending port
144 debug(21,2) ("ORTEDomainAppCreate: Bind on port(Send):%u\n",
145 d->taskSend.sock.port);
146 if ((d->taskRecvMetatraffic.sock.fd<0) ||
147 (d->taskRecvUserdata.sock.fd<0) ||
148 (d->taskSend.sock.fd<0)) {
149 debug(21,0) ("Error creating socket(s).\n");
150 sock_cleanup(&d->taskRecvMetatraffic.sock);
151 sock_cleanup(&d->taskRecvUserdata.sock);
152 sock_cleanup(&d->taskSend.sock);
153 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
154 FREE(d->mbRecvUserdata.cdrStream.buffer);
155 FREE(d->mbSend.cdrStream.buffer);
159 if (d->domainProp.multicast.enabled) {
162 if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL,
163 &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
164 debug(21,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
165 d->domainProp.multicast.ttl);
167 // join multicast group
168 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
169 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
170 if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
171 (void *) &mreq, sizeof(mreq))>=0) {
172 debug(21,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
173 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
177 //Generates local GUID
178 if (d->domainProp.IFCount>0)
179 d->guid.hid=d->domainProp.IFProp[0].ipAddress;
181 d->guid.hid=StringToIPAddress("127.0.0.1");
182 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
184 debug(29,2) ("ORTEDomainAppCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
185 d->guid.hid,d->guid.aid,d->guid.oid);
187 //create HEADER of message for sending task
188 RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
189 d->mbSend.cdrStream.bufferPtr=
190 d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
191 d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;
192 d->mbSend.needSend=ORTE_FALSE;
193 d->mbSend.containsInfoReply=ORTE_FALSE;
195 //Self object data & fellow managers object data
196 appParams=(AppParams*)MALLOC(sizeof(AppParams));
197 AppParamsInit(appParams);
198 appParams->expirationTime=d->domainProp.baseProp.expirationTime;
199 VENDOR_ID_OCERA(appParams->vendorId);
200 appParams->hostId=d->guid.hid;
201 appParams->appId=d->guid.aid;
202 appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
203 appParams->userdataUnicastPort=d->taskRecvUserdata.sock.port;
204 if (d->domainProp.multicast.enabled) {
206 for(i=0;i<d->domainProp.IFCount;i++)
207 appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
208 appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
211 if (d->domainProp.IFCount) {
212 for(i=0;i<d->domainProp.IFCount;i++)
213 appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
214 appParams->unicastIPAddressCount=d->domainProp.IFCount;
216 appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
217 appParams->unicastIPAddressCount=1;
221 if (!d->domainProp.keys) {
222 appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
223 for(i=0;i<d->domainProp.IFCount;i++)
224 appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
225 appParams->managerKeyCount=d->domainProp.IFCount+1;
227 appParams->managerKeyCount=i=0;
228 while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
229 appParams->managerKeyList[appParams->managerKeyCount++]=
230 StringToIPAddress(sbuff);
233 d->appParams=appParams;
234 //insert object, doesn't need to be locked
235 d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
236 d->objectEntryOID->privateCreated=ORTE_TRUE;
239 // writerApplicationSelf (WAS)
240 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
241 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
242 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
243 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
244 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
245 cstWriterParams.fullAcknowledge=ORTE_TRUE;
246 CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
247 OID_WRITE_APPSELF,&cstWriterParams,NULL);
248 // add to WAS remote writer(s)
249 if (d->domainProp.appLocalManager) {
251 guid.hid=d->domainProp.appLocalManager;
252 guid.aid=AID_UNKNOWN;
254 if (!objectEntryFind(d,&guid)) {
255 appParams=(AppParams*)MALLOC(sizeof(AppParams));
256 AppParamsInit(appParams);
257 appParams->hostId=guid.hid;
258 appParams->appId=guid.aid;
259 appParams->metatrafficUnicastPort=port;
260 appParams->userdataUnicastPort=0; //Manager support only metatraffic
261 appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
262 appParams->unicastIPAddressCount=1;
263 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
264 CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
266 debug(21,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
267 IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
271 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
272 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
273 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
274 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
275 cstReaderParams.fullAcknowledge=ORTE_TRUE;
276 CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
277 OID_READ_MGR,&cstReaderParams,NULL);
278 // readerApplications
279 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
280 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
281 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
282 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
283 cstReaderParams.fullAcknowledge=ORTE_TRUE;
284 CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
285 OID_READ_APP,&cstReaderParams,NULL);
286 // writerPublications
287 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
288 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
289 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
290 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
291 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
292 cstWriterParams.fullAcknowledge=ORTE_TRUE;
293 CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
294 OID_WRITE_PUBL,&cstWriterParams,NULL);
295 // writerSubscriptions
296 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
297 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
298 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
299 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
300 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
301 cstWriterParams.fullAcknowledge=ORTE_TRUE;
302 CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
303 OID_WRITE_SUBS,&cstWriterParams,NULL);
304 // readerPublications
305 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
306 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
307 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
308 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
309 cstReaderParams.fullAcknowledge=ORTE_TRUE;
310 CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
311 OID_READ_PUBL,&cstReaderParams,NULL);
312 // readerSubscriptions
313 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
314 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
315 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
316 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
317 cstReaderParams.fullAcknowledge=ORTE_TRUE;
318 CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
319 OID_READ_SUBS,&cstReaderParams,NULL);
321 //add csChange for WAS
322 appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
326 ORTEDomainStart(d,ORTE_TRUE,ORTE_TRUE,ORTE_TRUE);
328 debug(21,10) ("ORTEDomainAppCreate: finished\n");
332 /*****************************************************************************/
334 ORTEDomainAppDestroy(ORTEDomain *d) {
335 CSTWriter *cstWriter;
336 CSTReader *cstReader;
338 debug(21,10) ("ORTEDomainAppDestroy: start\n");
339 if (!d) return ORTE_FALSE;
340 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
341 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
342 appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);
343 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
344 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
346 if (!d->taskRecvMetatraffic.terminate) {
347 d->taskRecvMetatraffic.terminate=ORTE_TRUE;
348 ORTEDomainWakeUpReceivingThread(d,
349 &d->taskSend.sock,d->taskRecvMetatraffic.sock.port);
350 pthread_join(d->taskRecvMetatraffic.thread,NULL);
352 if (!d->taskRecvUserdata.terminate) {
353 d->taskRecvUserdata.terminate=ORTE_TRUE;
354 ORTEDomainWakeUpReceivingThread(d,
355 &d->taskSend.sock,d->taskRecvUserdata.sock.port);
356 pthread_join(d->taskRecvUserdata.thread,NULL);
358 if (!d->taskSend.terminate) {
359 d->taskSend.terminate=ORTE_TRUE;
360 ORTEDomainWakeUpSendingThread(&d->objectEntry);
361 pthread_join(d->taskSend.thread,NULL);
363 debug(21,3) ("ORTEDomainAppDestroy: threads stoped\n");
366 sock_cleanup(&d->taskRecvMetatraffic.sock);
367 sock_cleanup(&d->taskRecvUserdata.sock);
368 sock_cleanup(&d->taskSend.sock);
371 pthread_cond_destroy(&d->objectEntry.htimSendCond);
372 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
375 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
376 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
377 pthread_rwlock_destroy(&d->publications.lock);
378 pthread_rwlock_destroy(&d->subscriptions.lock);
379 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
380 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
383 ORTETypeRegisterDestroyAll(d);
384 pthread_rwlock_destroy(&d->typeEntry.lock);
387 ORTEDomainAppSubscriptionPatternDestroy(d);
388 pthread_rwlock_unlock(&d->typeEntry.lock);
389 pthread_rwlock_destroy(&d->patternEntry.lock);
391 //CSTReaders and CSTWriters
392 CSTWriterDelete(d,&d->writerApplicationSelf);
393 CSTReaderDelete(d,&d->readerManagers);
394 CSTReaderDelete(d,&d->readerApplications);
395 CSTWriterDelete(d,&d->writerPublications);
396 CSTWriterDelete(d,&d->writerSubscriptions);
397 CSTReaderDelete(d,&d->readerPublications);
398 CSTReaderDelete(d,&d->readerSubscriptions);
400 while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
401 CSTWriterDelete(d,cstWriter);
404 while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
405 CSTReaderDelete(d,cstReader);
409 //objects in objectsEntry
410 objectEntryDeleteAll(d,&d->objectEntry);
412 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
413 FREE(d->mbRecvUserdata.cdrStream.buffer);
414 FREE(d->mbSend.cdrStream.buffer);
416 debug(21,10) ("ORTEDomainAppDestroy: finished\n");
420 /*****************************************************************************/
422 ORTEDomainAppSubscriptionPatternAdd(ORTEDomain *d,const char *topic,
423 const char *type,ORTESubscriptionPatternCallBack subscriptionCallBack,
427 if (!d) return ORTE_FALSE;
428 pnode=(PatternNode*)MALLOC(sizeof(PatternNode));
429 strcpy(pnode->topic,topic);
430 strcpy(pnode->type,type);
431 pnode->subscriptionCallBack=subscriptionCallBack;
433 pthread_rwlock_wrlock(&d->patternEntry.lock);
434 Pattern_insert(&d->patternEntry,pnode);
435 pthread_rwlock_unlock(&d->patternEntry.lock);
439 /*****************************************************************************/
441 ORTEDomainAppSubscriptionPatternRemove(ORTEDomain *d,const char *topic,
445 if (!d) return ORTE_FALSE;
446 pthread_rwlock_wrlock(&d->patternEntry.lock);
447 ul_list_for_each(Pattern,&d->patternEntry,pnode) {
448 if ((strcmp(pnode->topic,topic)==0) &&
449 (strcmp(pnode->type,type)==0)) {
450 Pattern_delete(&d->patternEntry,pnode);
455 pthread_rwlock_unlock(&d->patternEntry.lock);
459 /*****************************************************************************/
461 ORTEDomainAppSubscriptionPatternDestroy(ORTEDomain *d) {
464 if (!d) return ORTE_FALSE;
465 pthread_rwlock_wrlock(&d->patternEntry.lock);
466 while((pnode=Pattern_cut_first(&d->patternEntry))) {
469 pthread_rwlock_unlock(&d->patternEntry.lock);