2 * $Id: ORTEDomain.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 30 Domain functions
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /*****************************************************************************/
36 ORTEDomainRecvThreadStart(TaskProp *tp)
39 tp->terminate=ORTE_FALSE;
40 pthread_create(&(tp->thread), NULL,
41 (void*)&ORTEAppRecvThread, (void *)tp);
45 /*****************************************************************************/
47 ORTEDomainSendThreadStart(TaskProp *tp)
50 tp->terminate=ORTE_FALSE;
51 pthread_create(&(tp->thread), NULL,
52 (void*)&ORTEAppSendThread, (void *)tp);
56 /*****************************************************************************/
58 ORTEDomainRecvThreadStop(TaskProp *tp)
63 tp->terminate=ORTE_TRUE;
64 ORTEDomainWakeUpReceivingThread(d,
65 &d->taskSend.sock,tp->sock.port);
66 pthread_join(tp->thread,NULL);
70 /*****************************************************************************/
72 ORTEDomainSendThreadStop(TaskProp *tp)
77 tp->terminate=ORTE_TRUE;
78 ORTEDomainWakeUpSendingThread(&d->objectEntry);
79 pthread_join(tp->thread,NULL);
83 /*****************************************************************************/
85 ORTEDomainStart(ORTEDomain *d,
86 Boolean recvUnicastMetatrafficThread,
87 Boolean recvMulticastMetatrafficThread,
88 Boolean recvUnicastUserdataThread,
89 Boolean recvMulticastUserdataThread,
94 if (recvUnicastMetatrafficThread)
95 ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
97 if (recvMulticastMetatrafficThread)
98 ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
100 if (recvUnicastUserdataThread)
101 ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
103 if (recvMulticastUserdataThread)
104 ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
107 ORTEDomainSendThreadStart(&d->taskSend);
110 /*****************************************************************************/
112 ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
115 memset(prop, 0, sizeof(*prop));
117 prop->multicast.enabled=ORTE_FALSE;
118 prop->multicast.ttl=1;
119 prop->multicast.loopBackEnabled=ORTE_TRUE;
122 sock_init_udp(&sock);
123 if (sock_bind(&sock,0,INADDR_ANY) == -1) {
126 sock_get_local_interfaces(&sock,prop->IFProp, (char *)&prop->IFCount);
129 prop->mgrs=NULL; //only from localhost
130 prop->appLocalManager=StringToIPAddress("127.0.0.1");
131 prop->listen=INADDR_ANY;
132 prop->keys=NULL; //are assign be orte
133 sprintf(prop->version,ORTE_PACKAGE_STRING\
139 prop->recvBuffSize=0x4000;
140 prop->sendBuffSize=0x4000;
141 prop->wireProp.metaBytesPerPacket=1500;
142 prop->wireProp.metaBytesPerFastPacket=1000; //not used
143 prop->wireProp.metabitsPerACKBitmap=32; //not used
144 prop->wireProp.userBytesPerPacket=0x3000;
147 prop->baseProp.registrationMgrRetries=0;
148 NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod,0);//0s
149 prop->baseProp.registrationAppRetries=3;
150 NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod,0,500);//500ms
151 NTPTIME_BUILD(prop->baseProp.expirationTime,180); //180s
152 NTPTIME_BUILD(prop->baseProp.refreshPeriod,72); //72s - refresh self parameters
153 NTPTIME_BUILD(prop->baseProp.purgeTime,60); //60s - purge time of parameters
154 NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime,72);//72s - announcement by HB
155 NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime,72);//72s - announcement by ACK
156 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin,0,10);//10ms - delay before send ACK
157 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax,1,0);//1s
158 NtpTimeAssembFromMs(prop->baseProp.maxBlockTime,20,0);//20s
159 prop->baseProp.ACKMaxRetries=10;
160 prop->baseProp.HBMaxRetries=10;
162 PublParamsInit(&prop->publPropDefault);
163 SubsParamsInit(&prop->subsPropDefault);
168 /*****************************************************************************/
170 ORTEDomainInitEvents(ORTEDomainAppEvents *events) {
171 memset(events,0,sizeof(ORTEDomainAppEvents));
176 /*****************************************************************************/
178 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
179 ORTEDomainAppEvents *events,Boolean manager) {
181 ObjectEntryOID *objectEntryOID;
182 AppParams *appParams;
183 CSTWriterParams cstWriterParams;
184 CSTReaderParams cstReaderParams;
185 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
186 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
192 debug(30,2) ("ORTEDomainCreate: %s compiled: %s,%s\n",
193 ORTE_PACKAGE_STRING,__DATE__,__TIME__);
195 debug(30,10) ("ORTEDomainCreate: start\n");
196 //Create domainApplication
197 d=MALLOC(sizeof(ORTEDomain));
198 if (!d) return NULL; //no memory
199 //initialization local values
201 d->taskRecvUnicastMetatraffic.d=d;
202 d->taskRecvUnicastMetatraffic.terminate=ORTE_TRUE;
203 d->taskRecvMulticastMetatraffic.d=d;
204 d->taskRecvMulticastMetatraffic.terminate=ORTE_TRUE;
205 d->taskRecvUnicastUserdata.d=d;
206 d->taskRecvUnicastUserdata.terminate=ORTE_TRUE;
207 d->taskRecvMulticastUserdata.d=d;
208 d->taskRecvMulticastUserdata.terminate=ORTE_TRUE;
210 d->taskSend.terminate=ORTE_TRUE;
211 d->taskRecvUnicastMetatraffic.sock.port=0;
212 d->taskRecvMulticastMetatraffic.sock.port=0;
213 d->taskRecvUnicastUserdata.sock.port=0;
214 d->taskRecvMulticastUserdata.sock.port=0;
215 d->taskSend.sock.port=0;
216 //init structure objectEntry
217 ObjectEntryHID_init_root_field(&d->objectEntry);
218 pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
219 htimerRoot_init_queue(&d->objectEntry);
220 pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
221 pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
222 pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
223 d->objectEntry.htimSendCondValue=0;
224 d->objectEntry.htimNeedWakeUp=ORTE_TRUE;
225 //publication,subscriptions
226 d->publications.counter=d->subscriptions.counter=0;
227 CSTWriter_init_root_field(&d->publications);
228 CSTReader_init_root_field(&d->subscriptions);
229 pthread_rwlock_init(&d->publications.lock,NULL);
230 pthread_rwlock_init(&d->subscriptions.lock,NULL);
231 //publication,subscriptions lists
232 PublicationList_init_root_field(&d->psEntry);
233 pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
234 SubscriptionList_init_root_field(&d->psEntry);
235 pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
238 pthread_rwlock_init(&d->patternEntry.lock,NULL);
239 ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
240 Pattern_init_head(&d->patternEntry);
244 memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
246 if (!ORTEDomainPropDefaultGet(&d->domainProp)) {
251 //print local IP addresses
253 if (d->domainProp.IFCount) {
254 for(i=0;i<d->domainProp.IFCount;i++)
255 strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
256 debug(30,2) ("ORTEDomainCreate: localIPAddres(es) %s\n",iflocal);
258 debug(30,2) ("ORTEDomainCreate: no active interface card\n");
259 if (d->domainProp.multicast.enabled) {
260 debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
267 memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
269 memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
273 CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
274 CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
275 CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
276 CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
277 CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
278 CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
279 d->domainProp.recvBuffSize);
280 CDR_buffer_init(&d->taskSend.mb.cdrCodec,
281 d->domainProp.sendBuffSize);
282 d->taskSend.mb.cdrCodec.wptr_max=d->domainProp.wireProp.metaBytesPerPacket;
284 CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
285 d->domainProp.recvBuffSize);
286 if (d->domainProp.multicast.enabled) {
287 CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
288 d->domainProp.recvBuffSize);
289 CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
290 d->domainProp.recvBuffSize);
293 d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
296 ORTEType_init_root_field(&d->typeEntry);
297 pthread_rwlock_init(&d->typeEntry.lock,NULL);
300 sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
301 sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
302 sock_init_udp(&d->taskRecvUnicastUserdata.sock);
303 sock_init_udp(&d->taskRecvMulticastUserdata.sock);
304 sock_init_udp(&d->taskSend.sock);
306 /************************************************************************/
307 /* UnicastMetatraffic */
308 Domain2Port(d->domain,port);
310 if (d->domainProp.multicast.enabled) {
311 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
316 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET,
317 SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
318 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
322 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP,
323 IP_MULTICAST_LOOP, (const char *)&loop,
325 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
328 //joint to multicast group
329 memset(&mreq, 0, sizeof(mreq));
330 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
331 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
332 if(sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock,IPPROTO_IP,
333 IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
334 debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
335 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
339 if (sock_bind(&d->taskRecvUnicastMetatraffic.sock,port,d->domainProp.listen) == -1) {
343 /* give me receiving port (metatraffic) */
344 if (sock_bind(&d->taskRecvUnicastMetatraffic.sock,0,d->domainProp.listen) == -1) {
348 debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
349 d->taskRecvUnicastMetatraffic.sock.port);
351 /************************************************************************/
352 /* MulticastMetatraffic */
353 if (d->domainProp.multicast.enabled && !manager) {
354 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
360 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET,
361 SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
362 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
366 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
367 IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
368 sizeof(d->domainProp.multicast.loopBackEnabled));
369 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
370 d->domainProp.multicast.loopBackEnabled);
372 //joint to multicast group
373 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
374 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
375 if(sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock,IPPROTO_IP,
376 IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
377 debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
378 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
381 /* receiving multicast port (metatraffic) */
382 Domain2PortMulticastMetatraffic(d->domain,mport);
383 if (sock_bind(&d->taskRecvMulticastMetatraffic.sock,(uint16_t)mport,d->domainProp.listen) == -1) {
386 debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
387 d->taskRecvMulticastMetatraffic.sock.port);
390 /************************************************************************/
393 /* give me receiving port (userdata) */
394 if (sock_bind(&d->taskRecvUnicastUserdata.sock,0,d->domainProp.listen) == -1) {
397 debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
398 d->taskRecvUnicastUserdata.sock.port);
400 if (d->domainProp.multicast.enabled) {
405 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET,
406 SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
407 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
411 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
412 IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
413 sizeof(d->domainProp.multicast.loopBackEnabled));
414 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
415 d->domainProp.multicast.loopBackEnabled);
417 /* receiving multicast port (userdata) */
418 Domain2PortMulticastUserdata(d->domain,mport);
419 if (sock_bind(&d->taskRecvMulticastUserdata.sock,(uint16_t)mport,d->domainProp.listen) == -1) {
422 debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
423 d->taskRecvMulticastUserdata.sock.port);
427 /************************************************************************/
429 /* give me sending port */
430 if (sock_bind(&d->taskSend.sock,0,d->domainProp.listen) == -1) {
433 debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
434 d->taskSend.sock.port);
435 if (d->domainProp.multicast.enabled) {
437 if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL, (const char *)&d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
438 debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
439 d->domainProp.multicast.ttl);
443 /************************************************************************/
444 /* tests for valid resources */
445 if ((d->taskRecvUnicastMetatraffic.sock.fd<0) ||
446 (d->taskSend.sock.fd<0) ||
447 (d->domainProp.multicast.enabled &&
448 (d->taskRecvUnicastUserdata.sock.fd<0)) ||
449 (d->domainProp.multicast.enabled &&
450 (d->taskRecvMulticastUserdata.sock.fd<0)) ||
451 (d->domainProp.multicast.enabled &&
452 (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
453 debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
457 if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) ||
458 (!d->taskSend.mb.cdrCodec.buffer) ||
459 (d->domainProp.multicast.enabled && !manager &&
460 !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) ||
461 (d->domainProp.multicast.enabled && !manager &&
462 !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) ||
463 (d->domainProp.multicast.enabled && !manager &&
464 !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) { //no a memory
465 debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
469 /************************************************************************/
470 //Generates local GUID
471 if (d->domainProp.IFCount>0)
472 d->guid.hid=d->domainProp.IFProp[0].ipAddress;
474 d->guid.hid=StringToIPAddress("127.0.0.1");
476 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER;
478 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
481 debug(30,2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
482 GUID_PRINTF(d->guid));
484 //create HEADER of message for sending task
485 RTPSHeaderCreate(&d->taskSend.mb.cdrCodec,d->guid.hid,d->guid.aid);
486 d->taskSend.mb.needSend=ORTE_FALSE;
487 d->taskSend.mb.containsInfoReply=ORTE_FALSE;
488 d->taskSend.mb.cdrCodecDirect=NULL;
490 //Self object data & fellow managers object data
491 appParams=(AppParams*)MALLOC(sizeof(AppParams));
495 AppParamsInit(appParams);
496 appParams->expirationTime=d->domainProp.baseProp.expirationTime;
497 VENDOR_ID_OCERA(appParams->vendorId);
498 appParams->hostId=d->guid.hid;
499 appParams->appId=d->guid.aid;
500 appParams->metatrafficUnicastPort=d->taskRecvUnicastMetatraffic.sock.port;
501 appParams->userdataUnicastPort=d->taskRecvUnicastUserdata.sock.port;
502 //fill unicast/multicast ip addresses
503 if (d->domainProp.IFCount) {
504 for(i=0;i<d->domainProp.IFCount;i++)
505 appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
506 appParams->unicastIPAddressCount=d->domainProp.IFCount;
508 if (d->domainProp.multicast.enabled &&
509 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
510 appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount]=
511 d->domainProp.multicast.ipAddress;
512 appParams->metatrafficMulticastIPAddressCount++;
514 if (!d->domainProp.IFCount) {
515 appParams->unicastIPAddressList[appParams->unicastIPAddressCount]=
516 StringToIPAddress("127.0.0.1");
517 appParams->unicastIPAddressCount++;
521 if (!d->domainProp.keys) {
522 appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
523 for(i=0;i<d->domainProp.IFCount;i++)
524 appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
525 if (d->domainProp.multicast.enabled &&
526 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
527 appParams->managerKeyList[i+1]=d->domainProp.multicast.ipAddress;
530 appParams->managerKeyCount=i+1;
532 appParams->managerKeyCount=i=0;
533 while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
534 appParams->managerKeyList[appParams->managerKeyCount++]=
535 StringToIPAddress(sbuff);
538 d->appParams=appParams;
539 //insert object, doesn't need to be locked
540 d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
541 d->objectEntryOID->privateCreated=ORTE_TRUE;
544 /************************************************************************/
546 // writerApplicationSelf (WAS)
547 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
548 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
549 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
550 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
551 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
553 cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationMgrRetries;
554 cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationMgrPeriod;
555 cstWriterParams.fullAcknowledge=ORTE_FALSE;
557 cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationAppRetries;
558 cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationAppPeriod;
559 cstWriterParams.fullAcknowledge=ORTE_TRUE;
561 CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
562 OID_WRITE_APPSELF,&cstWriterParams,NULL);
565 while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
567 IPAddress ipAddress=StringToIPAddress(sbuff);
569 guid.aid=AID_UNKNOWN;
571 if (!objectEntryFind(d,&guid)) {
572 CSTRemoteReader *cstRemoteReader;
573 appParams=(AppParams*)MALLOC(sizeof(AppParams));
574 AppParamsInit(appParams);
575 appParams->hostId=guid.hid;
576 appParams->appId=guid.aid;
577 appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
578 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
579 if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
580 appParams->metatrafficMulticastIPAddressList[0]=ipAddress;
581 appParams->metatrafficMulticastIPAddressCount=1;
582 objectEntryOID->multicastPort=port;
584 appParams->unicastIPAddressList[0]=ipAddress;
585 appParams->unicastIPAddressCount=1;
586 objectEntryOID->multicastPort=0;
588 appParams->userdataUnicastPort=0; //Manager support only metatraffic
589 cstRemoteReader=CSTWriterAddRemoteReader(d,
590 &d->writerApplicationSelf,
594 debug(29,2) ("ORTEDomainCreate: add fellow manager (%s)\n",
595 IPAddressToString(ipAddress,sIPAddress));
599 // add to WAS remote writer(s)
600 if (d->domainProp.appLocalManager) {
602 guid.hid=d->domainProp.appLocalManager;
603 guid.aid=AID_UNKNOWN;
605 if (!objectEntryFind(d,&guid)) {
606 appParams=(AppParams*)MALLOC(sizeof(AppParams));
607 AppParamsInit(appParams);
608 appParams->hostId=guid.hid;
609 appParams->appId=guid.aid;
610 appParams->metatrafficUnicastPort=port;
611 appParams->userdataUnicastPort=0; //Manager support only metatraffic
612 appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
613 appParams->unicastIPAddressCount=1;
614 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
615 CSTWriterAddRemoteReader(d,
616 &d->writerApplicationSelf,
620 debug(30,2) ("ORTEDomainCreate: add manager (%s)\n",
621 IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
627 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
628 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
629 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
631 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
632 cstReaderParams.repeatActiveQueryTime=iNtpTime; //RM cann't repeatly send ACKf
634 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
635 cstReaderParams.fullAcknowledge=ORTE_TRUE;
637 CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
638 OID_READ_MGR,&cstReaderParams,NULL);
640 // readerApplications
641 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
642 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
643 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
644 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
645 cstReaderParams.fullAcknowledge=ORTE_TRUE;
646 CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
647 OID_READ_APP,&cstReaderParams,NULL);
650 // writerApplications
651 cstWriterParams.registrationRetries=0;
652 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
653 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
654 cstWriterParams.refreshPeriod=iNtpTime; //only WAS,WM can refresh csChange(s)
655 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
656 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
657 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
658 cstWriterParams.fullAcknowledge=ORTE_FALSE;
659 CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
660 OID_WRITE_APP,&cstWriterParams,NULL);
663 cstWriterParams.registrationRetries=0;
664 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
665 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
666 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
667 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
668 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
669 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
670 cstWriterParams.fullAcknowledge=ORTE_TRUE;
671 CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
672 OID_WRITE_MGR,&cstWriterParams,NULL);
676 // writerPublications
677 cstWriterParams.registrationRetries=0;
678 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
679 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
680 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
681 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
682 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
683 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
684 cstWriterParams.fullAcknowledge=ORTE_TRUE;
685 CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
686 OID_WRITE_PUBL,&cstWriterParams,NULL);
687 // writerSubscriptions
688 cstWriterParams.registrationRetries=0;
689 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
690 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
691 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
692 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
693 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
694 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
695 cstWriterParams.fullAcknowledge=ORTE_TRUE;
696 CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
697 OID_WRITE_SUBS,&cstWriterParams,NULL);
698 // readerPublications
699 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
700 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
701 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
702 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
703 cstReaderParams.fullAcknowledge=ORTE_TRUE;
704 CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
705 OID_READ_PUBL,&cstReaderParams,NULL);
706 // readerSubscriptions
707 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
708 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
709 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
710 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
711 cstReaderParams.fullAcknowledge=ORTE_TRUE;
712 CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
713 OID_READ_SUBS,&cstReaderParams,NULL);
716 //add csChange for WAS
717 appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
719 debug(30,10) ("ORTEDomainCreate: finished\n");
723 if (!errno_save) errno_save = errno;
727 if (!errno_save) errno_save = errno;
728 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
729 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
730 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
731 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
732 sock_cleanup(&d->taskSend.sock);
733 pthread_rwlock_destroy(&d->typeEntry.lock);
734 if (d->domainProp.multicast.enabled) {
735 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
736 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
739 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
741 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
743 if (!errno_save) errno_save = errno;
744 pthread_rwlock_destroy(&d->patternEntry.lock);
745 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
746 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
747 pthread_rwlock_destroy(&d->subscriptions.lock);
748 pthread_rwlock_destroy(&d->publications.lock);
749 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
750 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
751 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
757 /*****************************************************************************/
759 ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
760 CSTWriter *cstWriter;
761 CSTReader *cstReader;
763 debug(30,10) ("ORTEDomainDestroy: start\n");
764 if (!d) return ORTE_FALSE;
766 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
767 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
768 appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);
769 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
770 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
773 ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
774 ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
775 ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
776 ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
777 ORTEDomainSendThreadStop(&d->taskSend);
778 debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
780 //CSTReaders and CSTWriters
781 CSTWriterDelete(d,&d->writerApplicationSelf);
782 CSTReaderDelete(d,&d->readerManagers);
783 CSTReaderDelete(d,&d->readerApplications);
785 CSTWriterDelete(d,&d->writerManagers);
786 CSTWriterDelete(d,&d->writerApplications);
788 CSTWriterDelete(d,&d->writerPublications);
789 CSTWriterDelete(d,&d->writerSubscriptions);
790 CSTReaderDelete(d,&d->readerPublications);
791 CSTReaderDelete(d,&d->readerSubscriptions);
793 while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
794 CSTWriterDelete(d,cstWriter);
797 while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
798 CSTReaderDelete(d,cstReader);
803 //objects in objectsEntry
804 objectEntryDeleteAll(d,&d->objectEntry);
805 debug(30,3) ("ORTEDomainDestroy: deleted all objects\n");
808 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
809 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
810 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
811 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
812 sock_cleanup(&d->taskSend.sock);
816 pthread_cond_destroy(&d->objectEntry.htimSendCond);
817 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
820 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
821 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
822 pthread_rwlock_destroy(&d->publications.lock);
823 pthread_rwlock_destroy(&d->subscriptions.lock);
824 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
825 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
828 ORTETypeRegisterDestroyAll(d);
831 ORTEDomainAppSubscriptionPatternDestroy(d);
832 pthread_rwlock_destroy(&d->patternEntry.lock);
835 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
836 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
837 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
838 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
839 CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
841 //Free domain instance
844 debug(30,10) ("ORTEDomainDestroy: finished\n");