2 * $Id: ORTEDomain.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 30 Domain functions
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr.smolik@wo.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /*****************************************************************************/
36 ORTEDomainRecvThreadStart(TaskProp *tp)
39 tp->terminate=ORTE_FALSE;
40 pthread_create(&(tp->thread), NULL,
41 (void*)&ORTEAppRecvThread, (void *)tp);
45 /*****************************************************************************/
47 ORTEDomainSendThreadStart(TaskProp *tp)
50 tp->terminate=ORTE_FALSE;
51 pthread_create(&(tp->thread), NULL,
52 (void*)&ORTEAppSendThread, (void *)tp);
56 /*****************************************************************************/
58 ORTEDomainRecvThreadStop(TaskProp *tp)
63 tp->terminate=ORTE_TRUE;
64 ORTEDomainWakeUpReceivingThread(d,
65 &d->taskSend.sock,tp->sock.port);
66 pthread_join(tp->thread,NULL);
70 /*****************************************************************************/
72 ORTEDomainSendThreadStop(TaskProp *tp)
77 tp->terminate=ORTE_TRUE;
78 ORTEDomainWakeUpSendingThread(&d->objectEntry);
79 pthread_join(tp->thread,NULL);
83 /*****************************************************************************/
85 ORTEDomainStart(ORTEDomain *d,
86 Boolean recvUnicastMetatrafficThread,
87 Boolean recvMulticastMetatrafficThread,
88 Boolean recvUnicastUserdataThread,
89 Boolean recvMulticastUserdataThread,
94 if (recvUnicastMetatrafficThread)
95 ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
97 if (recvMulticastMetatrafficThread)
98 ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
100 if (recvUnicastUserdataThread)
101 ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
103 if (recvMulticastUserdataThread)
104 ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
107 ORTEDomainSendThreadStart(&d->taskSend);
110 /*****************************************************************************/
112 ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
115 memset(prop, 0, sizeof(*prop));
117 prop->multicast.enabled=ORTE_FALSE;
118 prop->multicast.ttl=1;
119 prop->multicast.loopBackEnabled=ORTE_TRUE;
122 sock_init_udp(&sock);
123 sock_bind(&sock,0,INADDR_ANY);
124 sock_get_local_interfaces(&sock,prop->IFProp,&prop->IFCount);
127 prop->mgrs=NULL; //only from localhost
128 prop->appLocalManager=StringToIPAddress("127.0.0.1");
129 prop->listen=INADDR_ANY;
130 prop->keys=NULL; //are assign be orte
131 sprintf(prop->version,ORTE_PACKAGE_STRING\
137 prop->recvBuffSize=0x4000;
138 prop->sendBuffSize=0x4000;
139 prop->wireProp.metaBytesPerPacket=1500;
140 prop->wireProp.metaBytesPerFastPacket=1000; //not used
141 prop->wireProp.metabitsPerACKBitmap=32; //not used
142 prop->wireProp.userBytesPerPacket=0x3000;
145 prop->baseProp.registrationMgrRetries=0;
146 NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod,0);//0s
147 prop->baseProp.registrationAppRetries=3;
148 NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod,0,500);//500ms
149 NTPTIME_BUILD(prop->baseProp.expirationTime,180); //180s
150 NTPTIME_BUILD(prop->baseProp.refreshPeriod,72); //72s - refresh self parameters
151 NTPTIME_BUILD(prop->baseProp.purgeTime,60); //60s - purge time of parameters
152 NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime,72);//72s - announcement by HB
153 NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime,72);//72s - announcement by ACK
154 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin,0,10);//10ms - delay before send ACK
155 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax,1,0);//1s
156 NtpTimeAssembFromMs(prop->baseProp.maxBlockTime,20,0);//20s
157 prop->baseProp.ACKMaxRetries=10;
158 prop->baseProp.HBMaxRetries=10;
160 PublParamsInit(&prop->publPropDefault);
161 SubsParamsInit(&prop->subsPropDefault);
166 /*****************************************************************************/
168 ORTEDomainInitEvents(ORTEDomainAppEvents *events) {
169 memset(events,0,sizeof(ORTEDomainAppEvents));
174 /*****************************************************************************/
176 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
177 ORTEDomainAppEvents *events,Boolean manager) {
179 ObjectEntryOID *objectEntryOID;
180 AppParams *appParams;
181 CSTWriterParams cstWriterParams;
182 CSTReaderParams cstReaderParams;
183 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
184 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
188 Boolean error=ORTE_FALSE;
190 debug(30,2) ("ORTEDomainCreate: %s compiled: %s,%s\n",
191 ORTE_PACKAGE_STRING,__DATE__,__TIME__);
193 debug(30,10) ("ORTEDomainCreate: start\n");
194 //Create domainApplication
195 d=MALLOC(sizeof(ORTEDomain));
196 if (!d) return NULL; //no memory
197 //initialization local values
199 d->taskRecvUnicastMetatraffic.d=d;
200 d->taskRecvUnicastMetatraffic.terminate=ORTE_TRUE;
201 d->taskRecvMulticastMetatraffic.d=d;
202 d->taskRecvMulticastMetatraffic.terminate=ORTE_TRUE;
203 d->taskRecvUnicastUserdata.d=d;
204 d->taskRecvUnicastUserdata.terminate=ORTE_TRUE;
205 d->taskRecvMulticastUserdata.d=d;
206 d->taskRecvMulticastUserdata.terminate=ORTE_TRUE;
208 d->taskSend.terminate=ORTE_TRUE;
209 d->taskRecvUnicastMetatraffic.sock.port=0;
210 d->taskRecvMulticastMetatraffic.sock.port=0;
211 d->taskRecvUnicastUserdata.sock.port=0;
212 d->taskRecvMulticastUserdata.sock.port=0;
213 d->taskSend.sock.port=0;
214 //init structure objectEntry
215 ObjectEntryHID_init_root_field(&d->objectEntry);
216 pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
217 htimerRoot_init_queue(&d->objectEntry);
218 pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
219 pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
220 pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
221 d->objectEntry.htimSendCondValue=0;
222 //publication,subscriptions
223 d->publications.counter=d->subscriptions.counter=0;
224 CSTWriter_init_root_field(&d->publications);
225 CSTReader_init_root_field(&d->subscriptions);
226 pthread_rwlock_init(&d->publications.lock,NULL);
227 pthread_rwlock_init(&d->subscriptions.lock,NULL);
228 //publication,subscriptions lists
229 PublicationList_init_root_field(&d->psEntry);
230 pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
231 SubscriptionList_init_root_field(&d->psEntry);
232 pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
235 pthread_rwlock_init(&d->patternEntry.lock,NULL);
236 ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
237 Pattern_init_head(&d->patternEntry);
241 memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
243 ORTEDomainPropDefaultGet(&d->domainProp);
246 //print local IP addresses
248 if (d->domainProp.IFCount) {
249 for(i=0;i<d->domainProp.IFCount;i++)
250 strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
251 debug(30,2) ("ORTEDomainCreate: localIPAddres(es) %s\n",iflocal);
253 debug(30,2) ("ORTEDomainCreate: no active interface card\n");
254 if (d->domainProp.multicast.enabled) {
255 debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
263 memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
265 memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
269 CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
270 CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
271 CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
272 CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
273 CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
274 CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
275 d->domainProp.recvBuffSize);
276 CDR_buffer_init(&d->taskSend.mb.cdrCodec,
277 d->domainProp.sendBuffSize);
278 d->taskSend.mb.cdrCodec.wptr_max=d->domainProp.wireProp.metaBytesPerPacket;
280 CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
281 d->domainProp.recvBuffSize);
282 if (d->domainProp.multicast.enabled) {
283 CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
284 d->domainProp.recvBuffSize);
285 CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
286 d->domainProp.recvBuffSize);
289 d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
292 ORTEType_init_root_field(&d->typeEntry);
293 pthread_rwlock_init(&d->typeEntry.lock,NULL);
296 sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
297 sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
298 sock_init_udp(&d->taskRecvUnicastUserdata.sock);
299 sock_init_udp(&d->taskRecvMulticastUserdata.sock);
300 sock_init_udp(&d->taskSend.sock);
302 /************************************************************************/
303 /* UnicastMetatraffic */
304 Domain2Port(d->domain,port);
306 if (d->domainProp.multicast.enabled) {
307 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
312 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET,
313 SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
314 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
318 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP,
319 IP_MULTICAST_LOOP, (char*)&loop,
321 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
324 //joint to multicast group
325 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
326 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
327 if(sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock,IPPROTO_IP,
328 IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
329 debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
330 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
333 sock_bind(&d->taskRecvUnicastMetatraffic.sock,port,prop->listen);
335 /* give me receiving port (metatraffic) */
336 sock_bind(&d->taskRecvUnicastMetatraffic.sock,0,prop->listen);
338 debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
339 d->taskRecvUnicastMetatraffic.sock.port);
341 /************************************************************************/
342 /* MulticastMetatraffic */
343 if (d->domainProp.multicast.enabled && !manager) {
344 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
350 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET,
351 SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
352 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
356 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
357 IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled,
358 sizeof(d->domainProp.multicast.loopBackEnabled));
359 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
360 d->domainProp.multicast.loopBackEnabled);
362 //joint to multicast group
363 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
364 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
365 if(sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock,IPPROTO_IP,
366 IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
367 debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
368 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
371 /* receiving multicast port (metatraffic) */
372 Domain2PortMulticastMetatraffic(d->domain,mport);
373 sock_bind(&d->taskRecvMulticastMetatraffic.sock,(uint16_t)mport,prop->listen);
374 debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
375 d->taskRecvMulticastMetatraffic.sock.port);
378 /************************************************************************/
381 /* give me receiving port (userdata) */
382 sock_bind(&d->taskRecvUnicastUserdata.sock,0,prop->listen);
383 debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
384 d->taskRecvUnicastUserdata.sock.port);
386 if (d->domainProp.multicast.enabled) {
391 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET,
392 SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
393 debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
397 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
398 IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled,
399 sizeof(d->domainProp.multicast.loopBackEnabled));
400 debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
401 d->domainProp.multicast.loopBackEnabled);
403 /* receiving multicast port (userdata) */
404 Domain2PortMulticastUserdata(d->domain,mport);
405 sock_bind(&d->taskRecvMulticastUserdata.sock,(uint16_t)mport,prop->listen);
406 debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
407 d->taskRecvMulticastUserdata.sock.port);
411 /************************************************************************/
413 /* give me sending port */
414 sock_bind(&d->taskSend.sock,0,prop->listen);
415 debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
416 d->taskSend.sock.port);
417 if (d->domainProp.multicast.enabled) {
419 if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL,
420 &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
421 debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
422 d->domainProp.multicast.ttl);
426 /************************************************************************/
427 /* tests for valid resources */
428 if ((d->taskRecvUnicastMetatraffic.sock.fd<0) ||
429 (d->taskSend.sock.fd<0) ||
430 (d->domainProp.multicast.enabled &&
431 (d->taskRecvUnicastUserdata.sock.fd<0)) ||
432 (d->domainProp.multicast.enabled &&
433 (d->taskRecvMulticastUserdata.sock.fd<0)) ||
434 (d->domainProp.multicast.enabled &&
435 (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
436 debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
440 if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) ||
441 (!d->taskSend.mb.cdrCodec.buffer) ||
442 (d->domainProp.multicast.enabled && !manager &&
443 !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) ||
444 (d->domainProp.multicast.enabled && !manager &&
445 !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) ||
446 (d->domainProp.multicast.enabled && !manager &&
447 !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) { //no a memory
448 debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
451 /* a problem occure with resources */
453 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
454 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
455 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
456 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
457 sock_cleanup(&d->taskSend.sock);
458 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
459 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
460 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
461 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
462 CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
466 /************************************************************************/
467 //Generates local GUID
468 if (d->domainProp.IFCount>0)
469 d->guid.hid=d->domainProp.IFProp[0].ipAddress;
471 d->guid.hid=StringToIPAddress("127.0.0.1");
473 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER;
475 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
478 debug(30,2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
479 GUID_PRINTF(d->guid));
481 //create HEADER of message for sending task
482 RTPSHeaderCreate(&d->taskSend.mb.cdrCodec,d->guid.hid,d->guid.aid);
483 d->taskSend.mb.needSend=ORTE_FALSE;
484 d->taskSend.mb.containsInfoReply=ORTE_FALSE;
485 d->taskSend.mb.cdrCodecDirect=NULL;
487 //Self object data & fellow managers object data
488 appParams=(AppParams*)MALLOC(sizeof(AppParams));
489 AppParamsInit(appParams);
490 appParams->expirationTime=d->domainProp.baseProp.expirationTime;
491 VENDOR_ID_OCERA(appParams->vendorId);
492 appParams->hostId=d->guid.hid;
493 appParams->appId=d->guid.aid;
494 appParams->metatrafficUnicastPort=d->taskRecvUnicastMetatraffic.sock.port;
495 appParams->userdataUnicastPort=d->taskRecvUnicastUserdata.sock.port;
496 //fill unicast/multicast ip addresses
497 if (d->domainProp.IFCount) {
498 for(i=0;i<d->domainProp.IFCount;i++)
499 appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
500 appParams->unicastIPAddressCount=d->domainProp.IFCount;
502 if (d->domainProp.multicast.enabled &&
503 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
504 appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount]=
505 d->domainProp.multicast.ipAddress;
506 appParams->metatrafficMulticastIPAddressCount++;
508 if (!d->domainProp.IFCount) {
509 appParams->unicastIPAddressList[appParams->unicastIPAddressCount]=
510 StringToIPAddress("127.0.0.1");
511 appParams->unicastIPAddressCount++;
515 if (!d->domainProp.keys) {
516 appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
517 for(i=0;i<d->domainProp.IFCount;i++)
518 appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
519 if (d->domainProp.multicast.enabled &&
520 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
521 appParams->managerKeyList[i+1]=d->domainProp.multicast.ipAddress;
524 appParams->managerKeyCount=i+1;
526 appParams->managerKeyCount=i=0;
527 while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
528 appParams->managerKeyList[appParams->managerKeyCount++]=
529 StringToIPAddress(sbuff);
532 d->appParams=appParams;
533 //insert object, doesn't need to be locked
534 d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
535 d->objectEntryOID->privateCreated=ORTE_TRUE;
538 /************************************************************************/
540 // writerApplicationSelf (WAS)
541 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
542 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
543 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
544 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
545 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
547 cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationMgrRetries;
548 cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationMgrPeriod;
549 cstWriterParams.fullAcknowledge=ORTE_FALSE;
551 cstWriterParams.registrationRetries=d->domainProp.baseProp.registrationAppRetries;
552 cstWriterParams.registrationPeriod=d->domainProp.baseProp.registrationAppPeriod;
553 cstWriterParams.fullAcknowledge=ORTE_TRUE;
555 CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
556 OID_WRITE_APPSELF,&cstWriterParams,NULL);
559 while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
561 IPAddress ipAddress=StringToIPAddress(sbuff);
563 guid.aid=AID_UNKNOWN;
565 if (!objectEntryFind(d,&guid)) {
566 CSTRemoteReader *cstRemoteReader;
567 appParams=(AppParams*)MALLOC(sizeof(AppParams));
568 AppParamsInit(appParams);
569 appParams->hostId=guid.hid;
570 appParams->appId=guid.aid;
571 appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
572 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
573 if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
574 appParams->metatrafficMulticastIPAddressList[0]=ipAddress;
575 appParams->metatrafficMulticastIPAddressCount=1;
576 objectEntryOID->multicastPort=port;
578 appParams->unicastIPAddressList[0]=ipAddress;
579 appParams->unicastIPAddressCount=1;
580 objectEntryOID->multicastPort=0;
582 appParams->userdataUnicastPort=0; //Manager support only metatraffic
583 cstRemoteReader=CSTWriterAddRemoteReader(d,
584 &d->writerApplicationSelf,
588 debug(29,2) ("ORTEDomainCreate: add fellow manager (%s)\n",
589 IPAddressToString(ipAddress,sIPAddress));
593 // add to WAS remote writer(s)
594 if (d->domainProp.appLocalManager) {
596 guid.hid=d->domainProp.appLocalManager;
597 guid.aid=AID_UNKNOWN;
599 if (!objectEntryFind(d,&guid)) {
600 appParams=(AppParams*)MALLOC(sizeof(AppParams));
601 AppParamsInit(appParams);
602 appParams->hostId=guid.hid;
603 appParams->appId=guid.aid;
604 appParams->metatrafficUnicastPort=port;
605 appParams->userdataUnicastPort=0; //Manager support only metatraffic
606 appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
607 appParams->unicastIPAddressCount=1;
608 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
609 CSTWriterAddRemoteReader(d,
610 &d->writerApplicationSelf,
614 debug(30,2) ("ORTEDomainCreate: add manager (%s)\n",
615 IPAddressToString(d->domainProp.appLocalManager,sIPAddress));
621 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
622 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
623 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
625 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
626 cstReaderParams.repeatActiveQueryTime=iNtpTime; //RM cann't repeatly send ACKf
628 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
629 cstReaderParams.fullAcknowledge=ORTE_TRUE;
631 CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
632 OID_READ_MGR,&cstReaderParams,NULL);
634 // readerApplications
635 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
636 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
637 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
638 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
639 cstReaderParams.fullAcknowledge=ORTE_TRUE;
640 CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
641 OID_READ_APP,&cstReaderParams,NULL);
644 // writerApplications
645 cstWriterParams.registrationRetries=0;
646 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
647 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
648 cstWriterParams.refreshPeriod=iNtpTime; //only WAS,WM can refresh csChange(s)
649 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
650 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
651 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
652 cstWriterParams.fullAcknowledge=ORTE_FALSE;
653 CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
654 OID_WRITE_APP,&cstWriterParams,NULL);
657 cstWriterParams.registrationRetries=0;
658 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
659 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
660 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
661 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
662 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
663 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
664 cstWriterParams.fullAcknowledge=ORTE_TRUE;
665 CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
666 OID_WRITE_MGR,&cstWriterParams,NULL);
670 // writerPublications
671 cstWriterParams.registrationRetries=0;
672 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
673 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
674 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
675 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
676 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
677 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
678 cstWriterParams.fullAcknowledge=ORTE_TRUE;
679 CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
680 OID_WRITE_PUBL,&cstWriterParams,NULL);
681 // writerSubscriptions
682 cstWriterParams.registrationRetries=0;
683 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
684 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
685 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
686 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
687 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
688 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
689 cstWriterParams.fullAcknowledge=ORTE_TRUE;
690 CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
691 OID_WRITE_SUBS,&cstWriterParams,NULL);
692 // readerPublications
693 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
694 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
695 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
696 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
697 cstReaderParams.fullAcknowledge=ORTE_TRUE;
698 CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
699 OID_READ_PUBL,&cstReaderParams,NULL);
700 // readerSubscriptions
701 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
702 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
703 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
704 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
705 cstReaderParams.fullAcknowledge=ORTE_TRUE;
706 CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
707 OID_READ_SUBS,&cstReaderParams,NULL);
710 //add csChange for WAS
711 appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
713 debug(30,10) ("ORTEDomainCreate: finished\n");
717 /*****************************************************************************/
719 ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
720 CSTWriter *cstWriter;
721 CSTReader *cstReader;
723 debug(30,10) ("ORTEDomainDestroy: start\n");
724 if (!d) return ORTE_FALSE;
726 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
727 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
728 appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);
729 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
730 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
733 ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
734 ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
735 ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
736 ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
737 ORTEDomainSendThreadStop(&d->taskSend);
738 debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
740 //CSTReaders and CSTWriters
741 CSTWriterDelete(d,&d->writerApplicationSelf);
742 CSTReaderDelete(d,&d->readerManagers);
743 CSTReaderDelete(d,&d->readerApplications);
745 CSTWriterDelete(d,&d->writerManagers);
746 CSTWriterDelete(d,&d->writerApplications);
748 CSTWriterDelete(d,&d->writerPublications);
749 CSTWriterDelete(d,&d->writerSubscriptions);
750 CSTReaderDelete(d,&d->readerPublications);
751 CSTReaderDelete(d,&d->readerSubscriptions);
753 while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
754 CSTWriterDelete(d,cstWriter);
757 while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
758 CSTReaderDelete(d,cstReader);
763 //objects in objectsEntry
764 objectEntryDeleteAll(d,&d->objectEntry);
765 debug(30,3) ("ORTEDomainDestroy: deleted all objects\n");
768 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
769 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
770 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
771 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
772 sock_cleanup(&d->taskSend.sock);
776 pthread_cond_destroy(&d->objectEntry.htimSendCond);
777 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
780 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
781 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
782 pthread_rwlock_destroy(&d->publications.lock);
783 pthread_rwlock_destroy(&d->subscriptions.lock);
784 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
785 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
788 ORTETypeRegisterDestroyAll(d);
791 ORTEDomainAppSubscriptionPatternDestroy(d);
792 pthread_rwlock_destroy(&d->patternEntry.lock);
795 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
796 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
797 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
798 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
799 CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
801 //Free domain instance
804 debug(30,10) ("ORTEDomainDestroy: finished\n");