2 * $Id: ORTEDomain.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 30 Domain functions
6 * -------------------------------------------------------------------
8 * Open Real-Time Ethernet
10 * Copyright (C) 2001-2006
11 * Department of Control Engineering FEE CTU Prague, Czech Republic
12 * http://dce.felk.cvut.cz
13 * http://www.ocera.org
15 * Author: Petr Smolik petr@smoliku.cz
17 * Project Responsible: Zdenek Hanzalek
18 * --------------------------------------------------------------------
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
34 /*****************************************************************************/
36 ORTEDomainRecvThreadStart(TaskProp *tp)
39 tp->terminate = ORTE_FALSE;
40 pthread_create(&(tp->thread), NULL,
41 (void *)&ORTEAppRecvThread, (void *)tp);
45 /*****************************************************************************/
47 ORTEDomainSendThreadStart(TaskProp *tp)
50 tp->terminate = ORTE_FALSE;
51 pthread_create(&(tp->thread), NULL,
52 (void *)&ORTEAppSendThread, (void *)tp);
56 /*****************************************************************************/
58 ORTEDomainRecvThreadStop(TaskProp *tp)
60 ORTEDomain *d = tp->d;
63 tp->terminate = ORTE_TRUE;
64 ORTEDomainWakeUpReceivingThread(d,
65 &d->taskSend.sock, tp->sock.port);
66 pthread_join(tp->thread, NULL);
70 /*****************************************************************************/
72 ORTEDomainSendThreadStop(TaskProp *tp)
74 ORTEDomain *d = tp->d;
77 tp->terminate = ORTE_TRUE;
78 ORTEDomainWakeUpSendingThread(&d->objectEntry);
79 pthread_join(tp->thread, NULL);
83 /*****************************************************************************/
85 ORTEDomainStart(ORTEDomain *d,
86 Boolean recvUnicastMetatrafficThread,
87 Boolean recvMulticastMetatrafficThread,
88 Boolean recvUnicastUserdataThread,
89 Boolean recvMulticastUserdataThread,
96 if (recvUnicastMetatrafficThread)
97 ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
99 if (recvMulticastMetatrafficThread)
100 ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
102 if (recvUnicastUserdataThread)
103 ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
105 if (recvMulticastUserdataThread)
106 ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
109 ORTEDomainSendThreadStart(&d->taskSend);
112 /*****************************************************************************/
114 ORTEDomainPropDefaultGet(ORTEDomainProp *prop)
118 memset(prop, 0, sizeof(*prop));
120 prop->multicast.enabled = ORTE_FALSE;
121 prop->multicast.ttl = 1;
122 prop->multicast.loopBackEnabled = ORTE_TRUE;
125 sock_init_udp(&sock);
126 if (sock_bind(&sock, 0, INADDR_ANY) == -1) {
129 sock_get_local_interfaces(&sock, prop->IFProp, (char *)&prop->IFCount);
132 prop->mgrs = NULL; //only from localhost
133 prop->appLocalManager = StringToIPAddress("127.0.0.1");
134 prop->listen = INADDR_ANY;
135 prop->keys = NULL; //are assign be orte
136 sprintf(prop->version, ORTE_PACKAGE_STRING \
142 prop->recvBuffSize = 0x4000;
143 prop->sendBuffSize = 0x4000;
144 prop->wireProp.metaBytesPerPacket = 1500;
145 prop->wireProp.metaBytesPerFastPacket = 1000; //not used
146 prop->wireProp.metabitsPerACKBitmap = 32; //not used
147 prop->wireProp.userBytesPerPacket = 0x3000;
150 prop->baseProp.registrationMgrRetries = 0;
151 NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod, 0); //0s
152 prop->baseProp.registrationAppRetries = 3;
153 NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod, 0, 500); //500ms
154 NTPTIME_BUILD(prop->baseProp.expirationTime, 180); //180s
155 NTPTIME_BUILD(prop->baseProp.refreshPeriod, 72); //72s - refresh self parameters
156 NTPTIME_BUILD(prop->baseProp.purgeTime, 60); //60s - purge time of parameters
157 NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime, 72); //72s - announcement by HB
158 NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime, 72); //72s - announcement by ACK
159 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin, 0, 10); //10ms - delay before send ACK
160 NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax, 1, 0); //1s
161 NtpTimeAssembFromMs(prop->baseProp.maxBlockTime, 20, 0); //20s
162 prop->baseProp.ACKMaxRetries = 10;
163 prop->baseProp.HBMaxRetries = 10;
165 PublParamsInit(&prop->publPropDefault);
166 SubsParamsInit(&prop->subsPropDefault);
171 /*****************************************************************************/
173 ORTEDomainInitEvents(ORTEDomainAppEvents *events)
175 memset(events, 0, sizeof(ORTEDomainAppEvents));
180 /*****************************************************************************/
182 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
183 ORTEDomainAppEvents *events, Boolean manager)
186 ObjectEntryOID *objectEntryOID;
187 AppParams *appParams;
188 CSTWriterParams cstWriterParams;
189 CSTReaderParams cstReaderParams;
190 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
191 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
197 debug(30, 2) ("ORTEDomainCreate: %s compiled: %s,%s\n",
198 ORTE_PACKAGE_STRING, __DATE__, __TIME__);
200 debug(30, 10) ("ORTEDomainCreate: start\n");
201 //Create domainApplication
202 d = MALLOC(sizeof(ORTEDomain));
204 return NULL; //no memory
205 //initialization local values
207 d->taskRecvUnicastMetatraffic.d = d;
208 d->taskRecvUnicastMetatraffic.terminate = ORTE_TRUE;
209 d->taskRecvMulticastMetatraffic.d = d;
210 d->taskRecvMulticastMetatraffic.terminate = ORTE_TRUE;
211 d->taskRecvUnicastUserdata.d = d;
212 d->taskRecvUnicastUserdata.terminate = ORTE_TRUE;
213 d->taskRecvMulticastUserdata.d = d;
214 d->taskRecvMulticastUserdata.terminate = ORTE_TRUE;
216 d->taskSend.terminate = ORTE_TRUE;
217 d->taskRecvUnicastMetatraffic.sock.port = 0;
218 d->taskRecvMulticastMetatraffic.sock.port = 0;
219 d->taskRecvUnicastUserdata.sock.port = 0;
220 d->taskRecvMulticastUserdata.sock.port = 0;
221 d->taskSend.sock.port = 0;
222 //init structure objectEntry
223 ObjectEntryHID_init_root_field(&d->objectEntry);
224 pthread_rwlock_init(&d->objectEntry.objRootLock, NULL);
225 htimerRoot_init_queue(&d->objectEntry);
226 pthread_rwlock_init(&d->objectEntry.htimRootLock, NULL);
227 pthread_cond_init(&d->objectEntry.htimSendCond, NULL);
228 pthread_mutex_init(&d->objectEntry.htimSendMutex, NULL);
229 d->objectEntry.htimSendCondValue = 0;
230 d->objectEntry.htimNeedWakeUp = ORTE_TRUE;
231 //publication,subscriptions
232 d->publications.counter = d->subscriptions.counter = 0;
233 CSTWriter_init_root_field(&d->publications);
234 CSTReader_init_root_field(&d->subscriptions);
235 pthread_rwlock_init(&d->publications.lock, NULL);
236 pthread_rwlock_init(&d->subscriptions.lock, NULL);
237 //publication,subscriptions lists
238 PublicationList_init_root_field(&d->psEntry);
239 pthread_rwlock_init(&d->psEntry.publicationsLock, NULL);
240 SubscriptionList_init_root_field(&d->psEntry);
241 pthread_rwlock_init(&d->psEntry.subscriptionsLock, NULL);
244 pthread_rwlock_init(&d->patternEntry.lock, NULL);
245 ORTEPatternRegister(d, ORTEPatternCheckDefault, ORTEPatternMatchDefault, NULL);
246 Pattern_init_head(&d->patternEntry);
250 memcpy(&d->domainProp, prop, sizeof(ORTEDomainProp));
252 if (!ORTEDomainPropDefaultGet(&d->domainProp)) {
257 //print local IP addresses
259 if (d->domainProp.IFCount) {
260 for (i = 0; i < d->domainProp.IFCount; i++)
261 strcat(iflocal, IPAddressToString(d->domainProp.IFProp[i].ipAddress, sIPAddress));
262 debug(30, 2) ("ORTEDomainCreate: localIPAddres(es) %s\n", iflocal);
264 debug(30, 2) ("ORTEDomainCreate: no active interface card\n");
265 if (d->domainProp.multicast.enabled) {
266 debug(30, 0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
272 if (events != NULL) {
273 memcpy(&d->domainEvents, events, sizeof(ORTEDomainAppEvents));
275 memset(&d->domainEvents, 0, sizeof(ORTEDomainAppEvents));
279 CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
280 CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
281 CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
282 CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
283 CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
284 CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
285 d->domainProp.recvBuffSize);
286 CDR_buffer_init(&d->taskSend.mb.cdrCodec,
287 d->domainProp.sendBuffSize);
288 d->taskSend.mb.cdrCodec.wptr_max = d->domainProp.wireProp.metaBytesPerPacket;
290 CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
291 d->domainProp.recvBuffSize);
292 if (d->domainProp.multicast.enabled) {
293 CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
294 d->domainProp.recvBuffSize);
295 CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
296 d->domainProp.recvBuffSize);
299 d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
302 ORTEType_init_root_field(&d->typeEntry);
303 pthread_rwlock_init(&d->typeEntry.lock, NULL);
306 sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
307 sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
308 sock_init_udp(&d->taskRecvUnicastUserdata.sock);
309 sock_init_udp(&d->taskRecvMulticastUserdata.sock);
310 sock_init_udp(&d->taskSend.sock);
312 /************************************************************************/
313 /* UnicastMetatraffic */
314 Domain2Port(d->domain, port);
316 if (d->domainProp.multicast.enabled) {
317 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
319 int reuse = 1, loop = 0;
322 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET,
323 SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
324 debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
328 sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP,
329 IP_MULTICAST_LOOP, (const char *)&loop,
331 debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
334 //joint to multicast group
335 memset(&mreq, 0, sizeof(mreq));
336 mreq.imr_multiaddr.s_addr = htonl(d->domainProp.multicast.ipAddress);
337 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
338 if (sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP,
339 IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
340 debug(30, 2) ("ORTEDomainCreate: joint to mgroup %s\n",
341 IPAddressToString(d->domainProp.multicast.ipAddress, sIPAddress));
345 if (sock_bind(&d->taskRecvUnicastMetatraffic.sock, port, d->domainProp.listen) == -1) {
349 /* give me receiving port (metatraffic) */
350 if (sock_bind(&d->taskRecvUnicastMetatraffic.sock, 0, d->domainProp.listen) == -1) {
354 debug(30, 2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
355 d->taskRecvUnicastMetatraffic.sock.port);
357 /************************************************************************/
358 /* MulticastMetatraffic */
359 if (d->domainProp.multicast.enabled && !manager) {
360 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
366 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET,
367 SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
368 debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
372 sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
373 IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
374 sizeof(d->domainProp.multicast.loopBackEnabled));
375 debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
376 d->domainProp.multicast.loopBackEnabled);
378 //joint to multicast group
379 mreq.imr_multiaddr.s_addr = htonl(d->domainProp.multicast.ipAddress);
380 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
381 if (sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
382 IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
383 debug(30, 2) ("ORTEDomainCreate: joint to mgroup %s\n",
384 IPAddressToString(d->domainProp.multicast.ipAddress, sIPAddress));
387 /* receiving multicast port (metatraffic) */
388 Domain2PortMulticastMetatraffic(d->domain, mport);
389 if (sock_bind(&d->taskRecvMulticastMetatraffic.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
392 debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
393 d->taskRecvMulticastMetatraffic.sock.port);
396 /************************************************************************/
399 /* give me receiving port (userdata) */
400 if (sock_bind(&d->taskRecvUnicastUserdata.sock, 0, d->domainProp.listen) == -1) {
403 debug(30, 2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
404 d->taskRecvUnicastUserdata.sock.port);
406 if (d->domainProp.multicast.enabled) {
411 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET,
412 SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
413 debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
417 sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
418 IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
419 sizeof(d->domainProp.multicast.loopBackEnabled));
420 debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
421 d->domainProp.multicast.loopBackEnabled);
423 /* receiving multicast port (userdata) */
424 Domain2PortMulticastUserdata(d->domain, mport);
425 if (sock_bind(&d->taskRecvMulticastUserdata.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
428 debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
429 d->taskRecvMulticastUserdata.sock.port);
433 /************************************************************************/
435 /* give me sending port */
436 if (sock_bind(&d->taskSend.sock, 0, d->domainProp.listen) == -1) {
439 debug(30, 2) ("ORTEDomainCreate: bind on port(Send): %u\n",
440 d->taskSend.sock.port);
441 if (d->domainProp.multicast.enabled) {
443 if (sock_setsockopt(&d->taskSend.sock, IPPROTO_IP, IP_MULTICAST_TTL, (const char *)&d->domainProp.multicast.ttl, sizeof(d->domainProp.multicast.ttl)) >= 0) {
444 debug(30, 2) ("ORTEDomainCreate: ttl set on: %u\n",
445 d->domainProp.multicast.ttl);
449 /************************************************************************/
450 /* tests for valid resources */
451 if ((d->taskRecvUnicastMetatraffic.sock.fd < 0) ||
452 (d->taskSend.sock.fd < 0) ||
453 (d->domainProp.multicast.enabled &&
454 (d->taskRecvUnicastUserdata.sock.fd < 0)) ||
455 (d->domainProp.multicast.enabled &&
456 (d->taskRecvMulticastUserdata.sock.fd < 0)) ||
457 (d->domainProp.multicast.enabled &&
458 (d->taskRecvMulticastMetatraffic.sock.fd < 0))) {
459 debug(30, 0) ("ORTEDomainCreate: Error creating socket(s).\n");
463 if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) ||
464 (!d->taskSend.mb.cdrCodec.buffer) ||
465 (d->domainProp.multicast.enabled && !manager &&
466 !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) ||
467 (d->domainProp.multicast.enabled && !manager &&
468 !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) ||
469 (d->domainProp.multicast.enabled && !manager &&
470 !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) { //no a memory
471 debug(30, 0) ("ORTEDomainCreate: Error creating buffer(s).\n");
475 /************************************************************************/
476 //Generates local GUID
477 if (d->domainProp.IFCount > 0)
478 d->guid.hid = d->domainProp.IFProp[0].ipAddress;
480 d->guid.hid = StringToIPAddress("127.0.0.1");
482 d->guid.aid = (d->taskSend.sock.port<<8)+MANAGER;
484 d->guid.aid = (d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
486 d->guid.oid = OID_APP;
487 debug(30, 2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
488 GUID_PRINTF(d->guid));
490 //create HEADER of message for sending task
491 RTPSHeaderCreate(&d->taskSend.mb.cdrCodec, d->guid.hid, d->guid.aid);
492 d->taskSend.mb.needSend = ORTE_FALSE;
493 d->taskSend.mb.containsInfoReply = ORTE_FALSE;
494 d->taskSend.mb.cdrCodecDirect = NULL;
496 //Self object data & fellow managers object data
497 appParams = (AppParams *)MALLOC(sizeof(AppParams));
501 AppParamsInit(appParams);
502 appParams->expirationTime = d->domainProp.baseProp.expirationTime;
503 VENDOR_ID_OCERA(appParams->vendorId);
504 appParams->hostId = d->guid.hid;
505 appParams->appId = d->guid.aid;
506 appParams->metatrafficUnicastPort = d->taskRecvUnicastMetatraffic.sock.port;
507 appParams->userdataUnicastPort = d->taskRecvUnicastUserdata.sock.port;
508 //fill unicast/multicast ip addresses
509 if (d->domainProp.IFCount) {
510 for (i = 0; i < d->domainProp.IFCount; i++)
511 appParams->unicastIPAddressList[i] = d->domainProp.IFProp[i].ipAddress;
512 appParams->unicastIPAddressCount = d->domainProp.IFCount;
514 if (d->domainProp.multicast.enabled &&
515 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
516 appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount] =
517 d->domainProp.multicast.ipAddress;
518 appParams->metatrafficMulticastIPAddressCount++;
520 if (!d->domainProp.IFCount) {
521 appParams->unicastIPAddressList[appParams->unicastIPAddressCount] =
522 StringToIPAddress("127.0.0.1");
523 appParams->unicastIPAddressCount++;
527 if (!d->domainProp.keys) {
528 appParams->managerKeyList[0] = StringToIPAddress("127.0.0.1");
529 for (i = 0; i < d->domainProp.IFCount; i++)
530 appParams->managerKeyList[i+1] = d->domainProp.IFProp[i].ipAddress;
531 if (d->domainProp.multicast.enabled &&
532 IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
533 appParams->managerKeyList[i+1] = d->domainProp.multicast.ipAddress;
536 appParams->managerKeyCount = i+1;
538 appParams->managerKeyCount = i = 0;
539 while (getStringPart(d->domainProp.keys, ':', &i, sbuff))
540 appParams->managerKeyList[appParams->managerKeyCount++] =
541 StringToIPAddress(sbuff);
543 d->appParams = appParams;
544 //insert object, doesn't need to be locked
545 d->objectEntryOID = objectEntryAdd(d, &d->guid, (void *)appParams);
546 d->objectEntryOID->privateCreated = ORTE_TRUE;
549 /************************************************************************/
551 // writerApplicationSelf (WAS)
552 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
553 cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
554 cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
555 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
556 cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
558 cstWriterParams.registrationRetries = d->domainProp.baseProp.registrationMgrRetries;
559 cstWriterParams.registrationPeriod = d->domainProp.baseProp.registrationMgrPeriod;
560 cstWriterParams.fullAcknowledge = ORTE_FALSE;
562 cstWriterParams.registrationRetries = d->domainProp.baseProp.registrationAppRetries;
563 cstWriterParams.registrationPeriod = d->domainProp.baseProp.registrationAppPeriod;
564 cstWriterParams.fullAcknowledge = ORTE_TRUE;
566 CSTWriterInit(d, &d->writerApplicationSelf, d->objectEntryOID,
567 OID_WRITE_APPSELF, &cstWriterParams, NULL);
570 while (getStringPart(d->domainProp.mgrs, ':', &i, sbuff) > 0) {
572 IPAddress ipAddress = StringToIPAddress(sbuff);
573 guid.hid = ipAddress;
574 guid.aid = AID_UNKNOWN;
576 if (!objectEntryFind(d, &guid)) {
577 CSTRemoteReader *cstRemoteReader;
578 appParams = (AppParams *)MALLOC(sizeof(AppParams));
579 AppParamsInit(appParams);
580 appParams->hostId = guid.hid;
581 appParams->appId = guid.aid;
582 appParams->metatrafficUnicastPort = d->appParams->metatrafficUnicastPort;
583 objectEntryOID = objectEntryAdd(d, &guid, (void *)appParams);
584 if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
585 appParams->metatrafficMulticastIPAddressList[0] = ipAddress;
586 appParams->metatrafficMulticastIPAddressCount = 1;
587 objectEntryOID->multicastPort = port;
589 appParams->unicastIPAddressList[0] = ipAddress;
590 appParams->unicastIPAddressCount = 1;
591 objectEntryOID->multicastPort = 0;
593 appParams->userdataUnicastPort = 0; //Manager support only metatraffic
594 cstRemoteReader = CSTWriterAddRemoteReader(d,
595 &d->writerApplicationSelf,
599 debug(29, 2) ("ORTEDomainCreate: add fellow manager (%s)\n",
600 IPAddressToString(ipAddress, sIPAddress));
604 // add to WAS remote writer(s)
605 if (d->domainProp.appLocalManager) {
607 guid.hid = d->domainProp.appLocalManager;
608 guid.aid = AID_UNKNOWN;
610 if (!objectEntryFind(d, &guid)) {
611 appParams = (AppParams *)MALLOC(sizeof(AppParams));
612 AppParamsInit(appParams);
613 appParams->hostId = guid.hid;
614 appParams->appId = guid.aid;
615 appParams->metatrafficUnicastPort = port;
616 appParams->userdataUnicastPort = 0; //Manager support only metatraffic
617 appParams->unicastIPAddressList[0] = d->domainProp.appLocalManager;
618 appParams->unicastIPAddressCount = 1;
619 objectEntryOID = objectEntryAdd(d, &guid, (void *)appParams);
620 CSTWriterAddRemoteReader(d,
621 &d->writerApplicationSelf,
625 debug(30, 2) ("ORTEDomainCreate: add manager (%s)\n",
626 IPAddressToString(d->domainProp.appLocalManager, sIPAddress));
632 cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
633 cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
634 cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
636 cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
637 cstReaderParams.repeatActiveQueryTime = iNtpTime; //RM cann't repeatly send ACKf
639 cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
640 cstReaderParams.fullAcknowledge = ORTE_TRUE;
642 CSTReaderInit(d, &d->readerManagers, d->objectEntryOID,
643 OID_READ_MGR, &cstReaderParams, NULL);
645 // readerApplications
646 cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
647 cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
648 cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
649 cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
650 cstReaderParams.fullAcknowledge = ORTE_TRUE;
651 CSTReaderInit(d, &d->readerApplications, d->objectEntryOID,
652 OID_READ_APP, &cstReaderParams, NULL);
655 // writerApplications
656 cstWriterParams.registrationRetries = 0;
657 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
658 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
659 cstWriterParams.refreshPeriod = iNtpTime; //only WAS,WM can refresh csChange(s)
660 cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
661 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
662 cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
663 cstWriterParams.fullAcknowledge = ORTE_FALSE;
664 CSTWriterInit(d, &d->writerApplications, d->objectEntryOID,
665 OID_WRITE_APP, &cstWriterParams, NULL);
668 cstWriterParams.registrationRetries = 0;
669 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
670 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
671 cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
672 cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
673 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
674 cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
675 cstWriterParams.fullAcknowledge = ORTE_TRUE;
676 CSTWriterInit(d, &d->writerManagers, d->objectEntryOID,
677 OID_WRITE_MGR, &cstWriterParams, NULL);
681 // writerPublications
682 cstWriterParams.registrationRetries = 0;
683 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
684 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
685 cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
686 cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
687 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
688 cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
689 cstWriterParams.fullAcknowledge = ORTE_TRUE;
690 CSTWriterInit(d, &d->writerPublications, d->objectEntryOID,
691 OID_WRITE_PUBL, &cstWriterParams, NULL);
692 // writerSubscriptions
693 cstWriterParams.registrationRetries = 0;
694 NTPTIME_ZERO(cstWriterParams.registrationPeriod);
695 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
696 cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
697 cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
698 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
699 cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
700 cstWriterParams.fullAcknowledge = ORTE_TRUE;
701 CSTWriterInit(d, &d->writerSubscriptions, d->objectEntryOID,
702 OID_WRITE_SUBS, &cstWriterParams, NULL);
703 // readerPublications
704 cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
705 cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
706 cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
707 cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
708 cstReaderParams.fullAcknowledge = ORTE_TRUE;
709 CSTReaderInit(d, &d->readerPublications, d->objectEntryOID,
710 OID_READ_PUBL, &cstReaderParams, NULL);
711 // readerSubscriptions
712 cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
713 cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
714 cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
715 cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
716 cstReaderParams.fullAcknowledge = ORTE_TRUE;
717 CSTReaderInit(d, &d->readerSubscriptions, d->objectEntryOID,
718 OID_READ_SUBS, &cstReaderParams, NULL);
721 //add csChange for WAS
722 appSelfParamChanged(d, ORTE_FALSE, ORTE_FALSE, ORTE_FALSE, ORTE_TRUE);
724 debug(30, 10) ("ORTEDomainCreate: finished\n");
735 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
736 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
737 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
738 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
739 sock_cleanup(&d->taskSend.sock);
740 pthread_rwlock_destroy(&d->typeEntry.lock);
741 if (d->domainProp.multicast.enabled) {
742 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
743 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
746 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
748 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
752 pthread_rwlock_destroy(&d->patternEntry.lock);
753 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
754 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
755 pthread_rwlock_destroy(&d->subscriptions.lock);
756 pthread_rwlock_destroy(&d->publications.lock);
757 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
758 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
759 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
765 /*****************************************************************************/
767 ORTEDomainDestroy(ORTEDomain *d, Boolean manager)
769 CSTWriter *cstWriter;
770 CSTReader *cstReader;
772 debug(30, 10) ("ORTEDomainDestroy: start\n");
776 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
777 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
778 appSelfParamChanged(d, ORTE_TRUE, ORTE_TRUE, ORTE_FALSE, ORTE_FALSE);
779 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
780 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
783 ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
784 ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
785 ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
786 ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
787 ORTEDomainSendThreadStop(&d->taskSend);
788 debug(30, 3) ("ORTEDomainDestroy: threads stoped\n");
790 //CSTReaders and CSTWriters
791 CSTWriterDelete(d, &d->writerApplicationSelf);
792 CSTReaderDelete(d, &d->readerManagers);
793 CSTReaderDelete(d, &d->readerApplications);
795 CSTWriterDelete(d, &d->writerManagers);
796 CSTWriterDelete(d, &d->writerApplications);
798 CSTWriterDelete(d, &d->writerPublications);
799 CSTWriterDelete(d, &d->writerSubscriptions);
800 CSTReaderDelete(d, &d->readerPublications);
801 CSTReaderDelete(d, &d->readerSubscriptions);
803 while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
804 CSTWriterDelete(d, cstWriter);
807 while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
808 CSTReaderDelete(d, cstReader);
813 //objects in objectsEntry
814 objectEntryDeleteAll(d, &d->objectEntry);
815 debug(30, 3) ("ORTEDomainDestroy: deleted all objects\n");
818 sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
819 sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
820 sock_cleanup(&d->taskRecvUnicastUserdata.sock);
821 sock_cleanup(&d->taskRecvMulticastUserdata.sock);
822 sock_cleanup(&d->taskSend.sock);
826 pthread_cond_destroy(&d->objectEntry.htimSendCond);
827 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
830 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
831 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
832 pthread_rwlock_destroy(&d->publications.lock);
833 pthread_rwlock_destroy(&d->subscriptions.lock);
834 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
835 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
838 ORTETypeRegisterDestroyAll(d);
841 ORTEDomainAppSubscriptionPatternDestroy(d);
842 pthread_rwlock_destroy(&d->patternEntry.lock);
845 CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
846 CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
847 CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
848 CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
849 CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
851 //Free domain instance
854 debug(30, 10) ("ORTEDomainDestroy: finished\n");