2 * $Id: ORTEDomain.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 30 Domain functions
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
26 ORTEDomainRecvThreadStart(TaskProp *tp)
29 tp->terminate=ORTE_FALSE;
30 pthread_create(&(tp->thread), NULL,
31 (void*)&ORTEAppRecvThread, (void *)tp);
35 /*****************************************************************************/
37 ORTEDomainSendThreadStart(TaskProp *tp)
40 tp->terminate=ORTE_FALSE;
41 pthread_create(&(tp->thread), NULL,
42 (void*)&ORTEAppSendThread, (void *)tp);
46 /*****************************************************************************/
48 ORTEDomainRecvThreadStop(TaskProp *tp)
53 tp->terminate=ORTE_TRUE;
54 ORTEDomainWakeUpReceivingThread(d,
55 &d->taskSend.sock,tp->sock.port);
56 pthread_join(tp->thread,NULL);
60 /*****************************************************************************/
62 ORTEDomainSendThreadStop(TaskProp *tp)
67 tp->terminate=ORTE_TRUE;
68 ORTEDomainWakeUpSendingThread(&d->objectEntry);
69 pthread_join(tp->thread,NULL);
73 /*****************************************************************************/
75 ORTEDomainStart(ORTEDomain *d,
76 Boolean recvUnicastMetatrafficThread,
77 Boolean recvMulticastMetatrafficThread,
78 Boolean recvUnicastUserdataThread,
79 Boolean recvMulticastUserdataThread,
84 if (recvUnicastMetatrafficThread)
85 ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
87 if (recvMulticastMetatrafficThread)
88 ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
90 if (recvUnicastUserdataThread)
91 ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
93 if (recvMulticastUserdataThread)
94 ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
97 ORTEDomainSendThreadStart(&d->taskSend);
100 /*****************************************************************************/
102 ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
105 memset(prop, 0, sizeof(*prop));
107 prop->multicast.enabled=ORTE_FALSE;
108 prop->multicast.ttl=1;
109 prop->multicast.loopBackEnabled=ORTE_TRUE;
112 sock_init_udp(&sock);
114 sock_get_local_interfaces(&sock,prop->IFProp,&prop->IFCount);
117 prop->mgrs=NULL; //only from localhost
118 prop->appLocalManager=StringToIPAddress("127.0.0.1");
119 prop->keys=NULL; //are assign be orte
120 sprintf(prop->version,ORTE_PACKAGE_STRING\
126 prop->recvBuffSize=0x4000;
127 prop->sendBuffSize=0x4000;
128 prop->wireProp.metaBytesPerPacket=1500;
129 prop->wireProp.metaBytesPerFastPacket=1000; //not used
130 prop->wireProp.metabitsPerACKBitmap=32; //not used
131 prop->wireProp.userBytesPerPacket=0x3000;
134 prop->baseProp.registrationMgrRetries=0;
135 NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod,0);//0s
136 prop->baseProp.registrationAppRetries=3;
137 NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod,0,500);//500ms
138 NTPTIME_BUILD(prop->baseProp.expirationTime,180); //180s
139 NTPTIME_BUILD(prop->baseProp.refreshPeriod,72); //72s - refresh self parameters
140 NTPTIME_BUILD(prop->baseProp.purgeTime,60); //60s - purge time of parameters
141 NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime,72);//72s - announcement by HB
142 NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime,72);//72s - announcement by ACK
143 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin,0,10);//10ms - delay before send ACK
144 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax,1,0);//1s
145 NtpTimeAssembFromMs(prop->baseProp.maxBlockTime,20,0);//20s
146 prop->baseProp.ACKMaxRetries=10;
147 prop->baseProp.HBMaxRetries=10;
149 PublParamsInit(&prop->publPropDefault);
150 SubsParamsInit(&prop->subsPropDefault);
155 /*****************************************************************************/
157 ORTEDomainInitEvents(ORTEDomainAppEvents *events) {
158 memset(events,0,sizeof(ORTEDomainAppEvents));
163 /*****************************************************************************/
165 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
166 ORTEDomainAppEvents *events,Boolean manager) {
168 ObjectEntryOID *objectEntryOID;
169 AppParams *appParams;
170 CSTWriterParams cstWriterParams;
171 CSTReaderParams cstReaderParams;
172 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
173 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
177 Boolean error=ORTE_FALSE;
179 debug(30,2) ("ORTEDomainCreate: %s compiled: %s,%s\n",
180 ORTE_PACKAGE_STRING,__DATE__,__TIME__);
182 debug(30,10) ("ORTEDomainCreate: start\n");
183 //Create domainApplication
184 d=MALLOC(sizeof(ORTEDomain));
185 if (!d) return NULL; //no memory
186 //initialization local values
188 d->taskRecvUnicastMetatraffic.d=d;
189 d->taskRecvUnicastMetatraffic.terminate=ORTE_TRUE;
190 d->taskRecvMulticastMetatraffic.d=d;
191 d->taskRecvMulticastMetatraffic.terminate=ORTE_TRUE;
192 d->taskRecvUnicastUserdata.d=d;
193 d->taskRecvUnicastUserdata.terminate=ORTE_TRUE;
194 d->taskRecvMulticastUserdata.d=d;
195 d->taskRecvMulticastUserdata.terminate=ORTE_TRUE;
197 d->taskSend.terminate=ORTE_TRUE;
198 d->taskRecvUnicastMetatraffic.sock.port=0;
199 d->taskRecvMulticastMetatraffic.sock.port=0;
200 d->taskRecvUnicastUserdata.sock.port=0;
201 d->taskRecvMulticastUserdata.sock.port=0;
202 d->taskSend.sock.port=0;
203 //init structure objectEntry
204 ObjectEntryHID_init_root_field(&d->objectEntry);
205 pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
206 htimerRoot_init_queue(&d->objectEntry);
207 pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
208 pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
209 pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
210 d->objectEntry.htimSendCondValue=0;
211 //publication,subscriptions
212 d->publications.counter=d->subscriptions.counter=0;
213 CSTWriter_init_root_field(&d->publications);
214 CSTReader_init_root_field(&d->subscriptions);
215 pthread_rwlock_init(&d->publications.lock,NULL);
216 pthread_rwlock_init(&d->subscriptions.lock,NULL);
217 //publication,subscriptions lists
218 PublicationList_init_root_field(&d->psEntry);
219 pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
220 SubscriptionList_init_root_field(&d->psEntry);
221 pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
224 pthread_rwlock_init(&d->patternEntry.lock,NULL);
225 ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
226 Pattern_init_head(&d->patternEntry);
230 memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
232 ORTEDomainPropDefaultGet(&d->domainProp);
235 //print local IP addresses
237 if (d->domainProp.IFCount) {
238 for(i=0;i<d->domainProp.IFCount;i++)
239 strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
240 debug(30,2) ("ORTEDomainCreate: localIPAddres(es) %s\n",iflocal);
242 debug(30,2) ("ORTEDomainCreate: no active interface card\n");
243 if (d->domainProp.multicast.enabled) {
244 debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
252 memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
254 memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
258 CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
259 CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
260 CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
261 CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
262 CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
263 CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
264 d->domainProp.recvBuffSize);
265 CDR_buffer_init(&d->taskSend.mb.cdrCodec,
266 d->domainProp.sendBuffSize);
267 d->taskSend.mb.cdrCodec.wptr_max=d->domainProp.wireProp.metaBytesPerPacket;
269 CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
270 d->domainProp.recvBuffSize);
271 if (d->domainProp.multicast.enabled) {
272 CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
273 d->domainProp.recvBuffSize);
274 CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
275 d->domainProp.recvBuffSize);
278 d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
281 ORTEType_init_root_field(&d->typeEntry);
282 pthread_rwlock_init(&d->typeEntry.lock,NULL);
285 sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
286 sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
287 sock_init_udp(&d->taskRecvUnicastUserdata.sock);
288 sock_init_udp(&d->taskRecvMulticastUserdata.sock);
289 sock_init_udp(&d->taskSend.sock);
291 /************************************************************************/
292 /* UnicastMetatraffic */
293 Domain2Port(d->domain,port);
295 if (d->domainProp.multicast.enabled) {
296 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
301 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET,
302 SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
303 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
307 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP,
308 IP_MULTICAST_LOOP, (char*)&loop,
310 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
313 //joint to multicast group
314 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
315 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
316 if(sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock,IPPROTO_IP,
317 IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
318 debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
319 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
322 sock_bind(&d->taskRecvUnicastMetatraffic.sock,port);
324 /* give me receiving port (metatraffic) */
325 sock_bind(&d->taskRecvUnicastMetatraffic.sock,0);
327 debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
328 d->taskRecvUnicastMetatraffic.sock.port);
330 /************************************************************************/
331 /* MulticastMetatraffic */
332 if (d->domainProp.multicast.enabled && !manager) {
333 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
339 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET,
340 SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
341 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
345 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
346 IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled,
347 sizeof(d->domainProp.multicast.loopBackEnabled));
348 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
349 d->domainProp.multicast.loopBackEnabled);
351 //joint to multicast group
352 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
353 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
354 if(sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock,IPPROTO_IP,
355 IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
356 debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
357 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
360 /* receiving multicast port (metatraffic) */
361 Domain2PortMulticastMetatraffic(d->domain,mport);
362 sock_bind(&d->taskRecvMulticastMetatraffic.sock,(uint16_t)mport);
363 debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
364 d->taskRecvMulticastMetatraffic.sock.port);
367 /************************************************************************/
370 /* give me receiving port (userdata) */
371 sock_bind(&d->taskRecvUnicastUserdata.sock,0);
372 debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
373 d->taskRecvUnicastUserdata.sock.port);
375 if (d->domainProp.multicast.enabled) {
380 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET,
381 SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
382 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
386 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
387 IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled,
388 sizeof(d->domainProp.multicast.loopBackEnabled));
389 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
390 d->domainProp.multicast.loopBackEnabled);
392 /* receiving multicast port (userdata) */
393 Domain2PortMulticastUserdata(d->domain,mport);
394 sock_bind(&d->taskRecvMulticastUserdata.sock,(uint16_t)mport);
395 debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
396 d->taskRecvMulticastUserdata.sock.port);
400 /************************************************************************/
402 /* give me sending port */
403 sock_bind(&d->taskSend.sock,0);
404 debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
405 d->taskSend.sock.port);
406 if (d->domainProp.multicast.enabled) {
408 if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL,
409 &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
410 debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
411 d->domainProp.multicast.ttl);
415 /************************************************************************/
416 /* tests for valid resources */
417 if ((d->taskRecvUnicastMetatraffic.sock.fd<0) ||
418 (d->taskSend.sock.fd<0) ||
419 (d->domainProp.multicast.enabled &&
420 (d->taskRecvUnicastUserdata.sock.fd<0)) ||
421 (d->domainProp.multicast.enabled &&
422 (d->taskRecvMulticastUserdata.sock.fd<0)) ||
423 (d->domainProp.multicast.enabled &&
424 (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
425 debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
429 if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) ||
430 (!d->taskSend.mb.cdrCodec.buffer) ||
431 (d->domainProp.multicast.enabled && !manager &&
432 !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) ||
433 (d->domainProp.multicast.enabled && !manager &&
434 !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) ||
435 (d->domainProp.multicast.enabled && !manager &&
436 !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) { //no a memory
437 debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
440 /* a problem occure with resources */
442 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
443 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
444 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
445 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
446 sock_cleanup(&d->taskSend.sock);
447 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
448 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
449 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
450 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
451 CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
455 /************************************************************************/
456 //Generates local GUID
457 if (d->domainProp.IFCount>0)
458 d->guid.hid=d->domainProp.IFProp[0].ipAddress;
460 d->guid.hid=StringToIPAddress("127.0.0.1");
462 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER;
464 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
467 debug(30,2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
468 GUID_PRINTF(d->guid));
470 //create HEADER of message for sending task
471 RTPSHeaderCreate(&d->taskSend.mb.cdrCodec,d->guid.hid,d->guid.aid);
472 d->taskSend.mb.needSend=ORTE_FALSE;
473 d->taskSend.mb.containsInfoReply=ORTE_FALSE;
474 d->taskSend.mb.cdrCodecDirect=NULL;
476 //Self object data & fellow managers object data
477 appParams=(AppParams*)MALLOC(sizeof(AppParams));
478 AppParamsInit(appParams);
479 appParams->expirationTime=d->domainProp.baseProp.expirationTime;
480 VENDOR_ID_OCERA(appParams->vendorId);
481 appParams->hostId=d->guid.hid;
482 appParams->appId=d->guid.aid;
483 appParams->metatrafficUnicastPort=d->taskRecvUnicastMetatraffic.sock.port;
484 appParams->userdataUnicastPort=d->taskRecvUnicastUserdata.sock.port;
485 //fill unicast/multicast ip addresses
486 if (d->domainProp.IFCount) {
487 for(i=0;i<d->domainProp.IFCount;i++)
488 appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
489 appParams->unicastIPAddressCount=d->domainProp.IFCount;
491 if (d->domainProp.multicast.enabled &&
492 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
493 appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount]=
494 d->domainProp.multicast.ipAddress;
495 appParams->metatrafficMulticastIPAddressCount++;
497 if (!d->domainProp.IFCount) {
498 appParams->unicastIPAddressList[appParams->unicastIPAddressCount]=
499 StringToIPAddress("127.0.0.1");
500 appParams->unicastIPAddressCount++;
504 if (!d->domainProp.keys) {
505 appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
506 for(i=0;i<d->domainProp.IFCount;i++)
507 appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
508 if (d->domainProp.multicast.enabled &&
509 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
510 appParams->managerKeyList[i+1]=d->domainProp.multicast.ipAddress;
513 appParams->managerKeyCount=i+1;
515 appParams->managerKeyCount=i=0;
516 while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
517 appParams->managerKeyList[appParams->managerKeyCount++]=
518 StringToIPAddress(sbuff);
521 d->appParams=appParams;
522 //insert object, doesn't need to be locked
523 d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
524 d->objectEntryOID->privateCreated=ORTE_TRUE;
527 /************************************************************************/
529 // writerApplicationSelf (WAS)
530 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
531 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
532 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
533 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
534 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
536 cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationMgrRetries;
537 cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationMgrPeriod;
538 cstWriterParams.fullAcknowledge=ORTE_FALSE;
540 cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationAppRetries;
541 cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationAppPeriod;
542 cstWriterParams.fullAcknowledge=ORTE_TRUE;
544 CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
545 OID_WRITE_APPSELF,&cstWriterParams,NULL);
548 while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
550 IPAddress ipAddress=StringToIPAddress(sbuff);
552 guid.aid=AID_UNKNOWN;
554 if (!objectEntryFind(d,&guid)) {
555 CSTRemoteReader *cstRemoteReader;
556 appParams=(AppParams*)MALLOC(sizeof(AppParams));
557 AppParamsInit(appParams);
558 appParams->hostId=guid.hid;
559 appParams->appId=guid.aid;
560 appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
561 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
562 if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
563 appParams->metatrafficMulticastIPAddressList[0]=ipAddress;
564 appParams->metatrafficMulticastIPAddressCount=1;
565 objectEntryOID->multicastPort=port;
567 appParams->unicastIPAddressList[0]=ipAddress;
568 appParams->unicastIPAddressCount=1;
569 objectEntryOID->multicastPort=0;
571 appParams->userdataUnicastPort=0; //Manager support only metatraffic
572 cstRemoteReader=CSTWriterAddRemoteReader(d,
573 &d->writerApplicationSelf,
577 debug(29,2) ("ORTEDomainCreate: add fellow manager (%s)\n",
578 IPAddressToString(ipAddress,sIPAddress));
582 // add to WAS remote writer(s)
583 if (d->domainProp.appLocalManager) {
585 guid.hid=d->domainProp.appLocalManager;
586 guid.aid=AID_UNKNOWN;
588 if (!objectEntryFind(d,&guid)) {
589 appParams=(AppParams*)MALLOC(sizeof(AppParams));
590 AppParamsInit(appParams);
591 appParams->hostId=guid.hid;
592 appParams->appId=guid.aid;
593 appParams->metatrafficUnicastPort=port;
594 appParams->userdataUnicastPort=0; //Manager support only metatraffic
595 appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
596 appParams->unicastIPAddressCount=1;
597 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
598 CSTWriterAddRemoteReader(d,
599 &d->writerApplicationSelf,
603 debug(30,2) ("ORTEDomainCreate: add manager (%s)\n",
604 IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
610 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
611 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
612 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
614 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
615 cstReaderParams.repeatActiveQueryTime=iNtpTime; //RM cann't repeatly send ACKf
617 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
618 cstReaderParams.fullAcknowledge=ORTE_TRUE;
620 CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
621 OID_READ_MGR,&cstReaderParams,NULL);
623 // readerApplications
624 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
625 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
626 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
627 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
628 cstReaderParams.fullAcknowledge=ORTE_TRUE;
629 CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
630 OID_READ_APP,&cstReaderParams,NULL);
633 // writerApplications
634 cstWriterParams.registrationRetries=0;
635 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
636 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
637 cstWriterParams.refreshPeriod=iNtpTime; //only WAS,WM can refresh csChange(s)
638 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
639 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
640 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
641 cstWriterParams.fullAcknowledge=ORTE_FALSE;
642 CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
643 OID_WRITE_APP,&cstWriterParams,NULL);
646 cstWriterParams.registrationRetries=0;
647 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
648 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
649 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
650 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
651 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
652 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
653 cstWriterParams.fullAcknowledge=ORTE_TRUE;
654 CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
655 OID_WRITE_MGR,&cstWriterParams,NULL);
659 // writerPublications
660 cstWriterParams.registrationRetries=0;
661 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
662 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
663 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
664 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
665 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
666 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
667 cstWriterParams.fullAcknowledge=ORTE_TRUE;
668 CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
669 OID_WRITE_PUBL,&cstWriterParams,NULL);
670 // writerSubscriptions
671 cstWriterParams.registrationRetries=0;
672 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
673 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
674 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
675 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
676 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
677 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
678 cstWriterParams.fullAcknowledge=ORTE_TRUE;
679 CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
680 OID_WRITE_SUBS,&cstWriterParams,NULL);
681 // readerPublications
682 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
683 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
684 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
685 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
686 cstReaderParams.fullAcknowledge=ORTE_TRUE;
687 CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
688 OID_READ_PUBL,&cstReaderParams,NULL);
689 // readerSubscriptions
690 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
691 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
692 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
693 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
694 cstReaderParams.fullAcknowledge=ORTE_TRUE;
695 CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
696 OID_READ_SUBS,&cstReaderParams,NULL);
699 //add csChange for WAS
700 appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
702 debug(30,10) ("ORTEDomainCreate: finished\n");
706 /*****************************************************************************/
708 ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
709 CSTWriter *cstWriter;
710 CSTReader *cstReader;
712 debug(30,10) ("ORTEDomainDestroy: start\n");
713 if (!d) return ORTE_FALSE;
715 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
716 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
717 appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);
718 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
719 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
722 ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
723 ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
724 ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
725 ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
726 ORTEDomainSendThreadStop(&d->taskSend);
727 debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
729 //CSTReaders and CSTWriters
730 CSTWriterDelete(d,&d->writerApplicationSelf);
731 CSTReaderDelete(d,&d->readerManagers);
732 CSTReaderDelete(d,&d->readerApplications);
734 CSTWriterDelete(d,&d->writerManagers);
735 CSTWriterDelete(d,&d->writerApplications);
737 CSTWriterDelete(d,&d->writerPublications);
738 CSTWriterDelete(d,&d->writerSubscriptions);
739 CSTReaderDelete(d,&d->readerPublications);
740 CSTReaderDelete(d,&d->readerSubscriptions);
742 while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
743 CSTWriterDelete(d,cstWriter);
746 while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
747 CSTReaderDelete(d,cstReader);
752 //objects in objectsEntry
753 objectEntryDeleteAll(d,&d->objectEntry);
754 debug(30,3) ("ORTEDomainDestroy: deleted all objects\n");
757 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
758 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
759 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
760 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
761 sock_cleanup(&d->taskSend.sock);
765 pthread_cond_destroy(&d->objectEntry.htimSendCond);
766 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
769 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
770 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
771 pthread_rwlock_destroy(&d->publications.lock);
772 pthread_rwlock_destroy(&d->subscriptions.lock);
773 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
774 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
777 ORTETypeRegisterDestroyAll(d);
780 ORTEDomainAppSubscriptionPatternDestroy(d);
781 pthread_rwlock_destroy(&d->patternEntry.lock);
784 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
785 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
786 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
787 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
788 CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
790 //Free domain instance
793 debug(30,10) ("ORTEDomainDestroy: finished\n");