]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomain.c
533cc738797652bb7993ff9477b38a43141af4d8
[orte.git] / orte / liborte / ORTEDomain.c
1 /*
2  *  $Id: ORTEDomain.c,v 0.0.0.1         2003/08/21
3  *
4  *  DEBUG:  section 30                  Domain functions
5  *
6  *  -------------------------------------------------------------------
7  *                                ORTE
8  *                      Open Real-Time Ethernet
9  *
10  *                      Copyright (C) 2001-2006
11  *  Department of Control Engineering FEE CTU Prague, Czech Republic
12  *                      http://dce.felk.cvut.cz
13  *                      http://www.ocera.org
14  *
15  *  Author:              Petr Smolik    petr@smoliku.cz
16  *  Advisor:             Pavel Pisa
17  *  Project Responsible: Zdenek Hanzalek
18  *  --------------------------------------------------------------------
19  *
20  *  This program is free software; you can redistribute it and/or modify
21  *  it under the terms of the GNU General Public License as published by
22  *  the Free Software Foundation; either version 2 of the License, or
23  *  (at your option) any later version.
24  *
25  *  This program is distributed in the hope that it will be useful,
26  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
27  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *  GNU General Public License for more details.
29  *
30  */
31
32 #include "orte_all.h"
33
34 /*****************************************************************************/
35 void
36 ORTEDomainRecvThreadStart(TaskProp *tp)
37 {
38   if (tp->terminate) {
39     tp->terminate = ORTE_FALSE;
40     pthread_create(&(tp->thread), NULL,
41                    (void *)&ORTEAppRecvThread, (void *)tp);
42   }
43 }
44
45 /*****************************************************************************/
46 void
47 ORTEDomainSendThreadStart(TaskProp *tp)
48 {
49   if (tp->terminate) {
50     tp->terminate = ORTE_FALSE;
51     pthread_create(&(tp->thread), NULL,
52                    (void *)&ORTEAppSendThread, (void *)tp);
53   }
54 }
55
56 /*****************************************************************************/
57 void
58 ORTEDomainRecvThreadStop(TaskProp *tp)
59 {
60   ORTEDomain *d = tp->d;
61
62   if (!tp->terminate) {
63     tp->terminate = ORTE_TRUE;
64     ORTEDomainWakeUpReceivingThread(d,
65                                     &d->taskSend.sock, tp->sock.port);
66     pthread_join(tp->thread, NULL);
67   }
68 }
69
70 /*****************************************************************************/
71 void
72 ORTEDomainSendThreadStop(TaskProp *tp)
73 {
74   ORTEDomain *d = tp->d;
75
76   if (!tp->terminate) {
77     tp->terminate = ORTE_TRUE;
78     ORTEDomainWakeUpSendingThread(&d->objectEntry);
79     pthread_join(tp->thread, NULL);
80   }
81 }
82
83 /*****************************************************************************/
84 void
85 ORTEDomainStart(ORTEDomain *d,
86                 Boolean recvUnicastMetatrafficThread,
87                 Boolean recvMulticastMetatrafficThread,
88                 Boolean recvUnicastUserdataThread,
89                 Boolean recvMulticastUserdataThread,
90                 Boolean sendThread)
91 {
92
93   if (!d)
94     return;
95
96   if (recvUnicastMetatrafficThread)
97     ORTEDomainRecvThreadStart(&d->taskRecvUnicastMetatraffic);
98
99   if (recvMulticastMetatrafficThread)
100     ORTEDomainRecvThreadStart(&d->taskRecvMulticastMetatraffic);
101
102   if (recvUnicastUserdataThread)
103     ORTEDomainRecvThreadStart(&d->taskRecvUnicastUserdata);
104
105   if (recvMulticastUserdataThread)
106     ORTEDomainRecvThreadStart(&d->taskRecvMulticastUserdata);
107
108   if (sendThread)
109     ORTEDomainSendThreadStart(&d->taskSend);
110 }
111
112 /*****************************************************************************/
113 Boolean
114 ORTEDomainPropDefaultGet(ORTEDomainProp *prop)
115 {
116   sock_t        sock;
117
118   memset(prop, 0, sizeof(*prop));
119
120   prop->multicast.enabled = ORTE_FALSE;
121   prop->multicast.ttl = 1;
122   prop->multicast.loopBackEnabled = ORTE_TRUE;
123
124   //IFProp
125   sock_init_udp(&sock);
126   if (sock_bind(&sock, 0, INADDR_ANY) == -1) {
127     return ORTE_FALSE;
128   }
129   sock_get_local_interfaces(&sock, prop->IFProp, (char *)&prop->IFCount);
130   sock_cleanup(&sock);
131
132   prop->mgrs = NULL; //only from localhost
133   prop->appLocalManager = StringToIPAddress("127.0.0.1");
134   prop->listen = INADDR_ANY;
135   prop->keys = NULL; //are assign be orte
136   sprintf(prop->version, ORTE_PACKAGE_STRING \
137           ", compiled: " \
138           __DATE__ \
139           " " \
140           __TIME__);
141
142   prop->recvBuffSize = 0x4000;
143   prop->sendBuffSize = 0x4000;
144   prop->wireProp.metaBytesPerPacket = 1500;
145   prop->wireProp.metaBytesPerFastPacket = 1000; //not used
146   prop->wireProp.metabitsPerACKBitmap = 32;     //not used
147   prop->wireProp.userBytesPerPacket = 0x3000;
148
149   //domainBaseProp
150   prop->baseProp.registrationMgrRetries = 0;
151   NTPTIME_BUILD(prop->baseProp.registrationMgrPeriod, 0); //0s
152   prop->baseProp.registrationAppRetries = 3;
153   NtpTimeAssembFromMs(prop->baseProp.registrationAppPeriod, 0, 500); //500ms
154   NTPTIME_BUILD(prop->baseProp.expirationTime, 180);  //180s
155   NTPTIME_BUILD(prop->baseProp.refreshPeriod, 72);    //72s - refresh self parameters
156   NTPTIME_BUILD(prop->baseProp.purgeTime, 60);        //60s - purge time of parameters
157   NTPTIME_BUILD(prop->baseProp.repeatAnnounceTime, 72); //72s - announcement by HB
158   NTPTIME_BUILD(prop->baseProp.repeatActiveQueryTime, 72); //72s - announcement by ACK
159   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMin, 0, 10); //10ms - delay before send ACK
160   NtpTimeAssembFromMs(prop->baseProp.delayResponceTimeACKMax, 1, 0); //1s
161   NtpTimeAssembFromMs(prop->baseProp.maxBlockTime, 20, 0); //20s
162   prop->baseProp.ACKMaxRetries = 10;
163   prop->baseProp.HBMaxRetries = 10;
164
165   PublParamsInit(&prop->publPropDefault);
166   SubsParamsInit(&prop->subsPropDefault);
167
168   return ORTE_TRUE;
169 }
170
171 /*****************************************************************************/
172 Boolean
173 ORTEDomainInitEvents(ORTEDomainAppEvents *events)
174 {
175   memset(events, 0, sizeof(ORTEDomainAppEvents));
176   return ORTE_TRUE;
177 }
178
179
180 /*****************************************************************************/
181 ORTEDomain *
182 ORTEDomainCreate(int domain, ORTEDomainProp *prop,
183                  ORTEDomainAppEvents *events, Boolean manager)
184 {
185   ORTEDomain        *d;
186   ObjectEntryOID    *objectEntryOID;
187   AppParams         *appParams;
188   CSTWriterParams   cstWriterParams;
189   CSTReaderParams   cstReaderParams;
190   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
191   char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
192   char              sbuff[128];
193   int               i;
194   uint16_t          port = 0;
195   int               errno_save = 0;
196
197   debug(30, 2)  ("ORTEDomainCreate: %s compiled: %s,%s\n",
198                  ORTE_PACKAGE_STRING, __DATE__, __TIME__);
199
200   debug(30, 10) ("ORTEDomainCreate: start\n");
201   //Create domainApplication
202   d = MALLOC(sizeof(ORTEDomain));
203   if (!d)
204     return NULL;        //no memory
205   //initialization local values
206   d->domain = domain;
207   d->taskRecvUnicastMetatraffic.d = d;
208   d->taskRecvUnicastMetatraffic.terminate = ORTE_TRUE;
209   d->taskRecvMulticastMetatraffic.d = d;
210   d->taskRecvMulticastMetatraffic.terminate = ORTE_TRUE;
211   d->taskRecvUnicastUserdata.d = d;
212   d->taskRecvUnicastUserdata.terminate = ORTE_TRUE;
213   d->taskRecvMulticastUserdata.d = d;
214   d->taskRecvMulticastUserdata.terminate = ORTE_TRUE;
215   d->taskSend.d = d;
216   d->taskSend.terminate = ORTE_TRUE;
217   d->taskRecvUnicastMetatraffic.sock.port = 0;
218   d->taskRecvMulticastMetatraffic.sock.port = 0;
219   d->taskRecvUnicastUserdata.sock.port = 0;
220   d->taskRecvMulticastUserdata.sock.port = 0;
221   d->taskSend.sock.port = 0;
222   //init structure objectEntry
223   ObjectEntryHID_init_root_field(&d->objectEntry);
224   pthread_rwlock_init(&d->objectEntry.objRootLock, NULL);
225   htimerRoot_init_queue(&d->objectEntry);
226   pthread_rwlock_init(&d->objectEntry.htimRootLock, NULL);
227   pthread_cond_init(&d->objectEntry.htimSendCond, NULL);
228   pthread_mutex_init(&d->objectEntry.htimSendMutex, NULL);
229   d->objectEntry.htimSendCondValue = 0;
230   d->objectEntry.htimNeedWakeUp = ORTE_TRUE;
231   //publication,subscriptions
232   d->publications.counter = d->subscriptions.counter = 0;
233   CSTWriter_init_root_field(&d->publications);
234   CSTReader_init_root_field(&d->subscriptions);
235   pthread_rwlock_init(&d->publications.lock, NULL);
236   pthread_rwlock_init(&d->subscriptions.lock, NULL);
237   //publication,subscriptions lists
238   PublicationList_init_root_field(&d->psEntry);
239   pthread_rwlock_init(&d->psEntry.publicationsLock, NULL);
240   SubscriptionList_init_root_field(&d->psEntry);
241   pthread_rwlock_init(&d->psEntry.subscriptionsLock, NULL);
242
243   //pattern
244   pthread_rwlock_init(&d->patternEntry.lock, NULL);
245   ORTEPatternRegister(d, ORTEPatternCheckDefault, ORTEPatternMatchDefault, NULL);
246   Pattern_init_head(&d->patternEntry);
247
248   //create domainProp
249   if (prop != NULL) {
250     memcpy(&d->domainProp, prop, sizeof(ORTEDomainProp));
251   } else {
252     if (!ORTEDomainPropDefaultGet(&d->domainProp)) {
253       goto err_domainProp;
254     }
255   }
256
257   //print local IP addresses
258   iflocal[0] = 0;
259   if (d->domainProp.IFCount) {
260     for (i = 0; i < d->domainProp.IFCount; i++)
261       strcat(iflocal, IPAddressToString(d->domainProp.IFProp[i].ipAddress, sIPAddress));
262     debug(30, 2) ("ORTEDomainCreate: localIPAddres(es) %s\n", iflocal);
263   } else {
264     debug(30, 2) ("ORTEDomainCreate: no active interface card\n");
265     if (d->domainProp.multicast.enabled) {
266       debug(30, 0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
267       goto err_domainProp;
268     }
269   }
270
271   //DomainEvents
272   if (events != NULL) {
273     memcpy(&d->domainEvents, events, sizeof(ORTEDomainAppEvents));
274   } else {
275     memset(&d->domainEvents, 0, sizeof(ORTEDomainAppEvents));
276   }
277
278   //local buffers
279   CDR_codec_init_static(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
280   CDR_codec_init_static(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
281   CDR_codec_init_static(&d->taskRecvUnicastUserdata.mb.cdrCodec);
282   CDR_codec_init_static(&d->taskRecvMulticastUserdata.mb.cdrCodec);
283   CDR_codec_init_static(&d->taskSend.mb.cdrCodec);
284   CDR_buffer_init(&d->taskRecvUnicastMetatraffic.mb.cdrCodec,
285                   d->domainProp.recvBuffSize);
286   CDR_buffer_init(&d->taskSend.mb.cdrCodec,
287                   d->domainProp.sendBuffSize);
288   d->taskSend.mb.cdrCodec.wptr_max = d->domainProp.wireProp.metaBytesPerPacket;
289   if (!manager) {
290     CDR_buffer_init(&d->taskRecvUnicastUserdata.mb.cdrCodec,
291                     d->domainProp.recvBuffSize);
292     if (d->domainProp.multicast.enabled) {
293       CDR_buffer_init(&d->taskRecvMulticastMetatraffic.mb.cdrCodec,
294                       d->domainProp.recvBuffSize);
295       CDR_buffer_init(&d->taskRecvMulticastUserdata.mb.cdrCodec,
296                       d->domainProp.recvBuffSize);
297     }
298   }
299   d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
300
301   //TypeRegister
302   ORTEType_init_root_field(&d->typeEntry);
303   pthread_rwlock_init(&d->typeEntry.lock, NULL);
304
305   //Sockets
306   sock_init_udp(&d->taskRecvUnicastMetatraffic.sock);
307   sock_init_udp(&d->taskRecvMulticastMetatraffic.sock);
308   sock_init_udp(&d->taskRecvUnicastUserdata.sock);
309   sock_init_udp(&d->taskRecvMulticastUserdata.sock);
310   sock_init_udp(&d->taskSend.sock);
311
312   /************************************************************************/
313   /* UnicastMetatraffic */
314   Domain2Port(d->domain, port);
315   if (manager) {
316     if (d->domainProp.multicast.enabled) {
317       char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
318       struct ip_mreq mreq;
319       int reuse = 1, loop = 0;
320
321       //reuseaddr
322       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET,
323                       SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
324       debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
325                     reuse);
326
327       //multicast loop
328       sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP,
329                       IP_MULTICAST_LOOP, (const char *)&loop,
330                       sizeof(loop));
331       debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
332                     loop);
333
334       //joint to multicast group
335       memset(&mreq, 0, sizeof(mreq));
336       mreq.imr_multiaddr.s_addr = htonl(d->domainProp.multicast.ipAddress);
337       mreq.imr_interface.s_addr = htonl(INADDR_ANY);
338       if (sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP,
339                           IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
340         debug(30, 2) ("ORTEDomainCreate: joint to mgroup %s\n",
341                       IPAddressToString(d->domainProp.multicast.ipAddress, sIPAddress));
342       } else
343         goto err_sock;
344     }
345     if (sock_bind(&d->taskRecvUnicastMetatraffic.sock, port, d->domainProp.listen) == -1) {
346       goto err_sock;
347     }
348   } else {
349     /* give me receiving port (metatraffic) */
350     if (sock_bind(&d->taskRecvUnicastMetatraffic.sock, 0, d->domainProp.listen) == -1) {
351       goto err_sock;
352     }
353   }
354   debug(30, 2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
355                 d->taskRecvUnicastMetatraffic.sock.port);
356
357   /************************************************************************/
358   /* MulticastMetatraffic */
359   if (d->domainProp.multicast.enabled && !manager) {
360     char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
361     struct ip_mreq mreq;
362     Port mport;
363     int reuse = 1;
364
365     //reuseaddr
366     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET,
367                     SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
368     debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
369                   reuse);
370
371     //multicast loop
372     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
373                     IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
374                     sizeof(d->domainProp.multicast.loopBackEnabled));
375     debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
376                   d->domainProp.multicast.loopBackEnabled);
377
378     //joint to multicast group
379     mreq.imr_multiaddr.s_addr = htonl(d->domainProp.multicast.ipAddress);
380     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
381     if (sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
382                         IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
383       debug(30, 2) ("ORTEDomainCreate: joint to mgroup %s\n",
384                     IPAddressToString(d->domainProp.multicast.ipAddress, sIPAddress));
385     }
386
387     /* receiving multicast port (metatraffic) */
388     Domain2PortMulticastMetatraffic(d->domain, mport);
389     if (sock_bind(&d->taskRecvMulticastMetatraffic.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
390       goto err_sock;
391     }
392     debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
393                   d->taskRecvMulticastMetatraffic.sock.port);
394   }
395
396   /************************************************************************/
397   /* UserData */
398   if (!manager) {
399     /* give me receiving port (userdata) */
400     if (sock_bind(&d->taskRecvUnicastUserdata.sock, 0, d->domainProp.listen) == -1) {
401       goto err_sock;
402     }
403     debug(30, 2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
404                   d->taskRecvUnicastUserdata.sock.port);
405
406     if (d->domainProp.multicast.enabled) {
407       Port mport;
408       int reuse = 1;
409
410       //reuseaddr
411       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET,
412                       SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
413       debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
414                     reuse);
415
416       //multicast loop
417       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
418                       IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
419                       sizeof(d->domainProp.multicast.loopBackEnabled));
420       debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
421                     d->domainProp.multicast.loopBackEnabled);
422
423       /* receiving multicast port (userdata) */
424       Domain2PortMulticastUserdata(d->domain, mport);
425       if (sock_bind(&d->taskRecvMulticastUserdata.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
426         goto err_sock;
427       }
428       debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
429                     d->taskRecvMulticastUserdata.sock.port);
430     }
431   }
432
433   /************************************************************************/
434   /* Send */
435   /* give me sending port */
436   if (sock_bind(&d->taskSend.sock, 0, d->domainProp.listen) == -1) {
437     goto err_sock;
438   }
439   debug(30, 2) ("ORTEDomainCreate: bind on port(Send): %u\n",
440                 d->taskSend.sock.port);
441   if (d->domainProp.multicast.enabled) {
442     //ttl
443     if (sock_setsockopt(&d->taskSend.sock, IPPROTO_IP, IP_MULTICAST_TTL, (const char *)&d->domainProp.multicast.ttl, sizeof(d->domainProp.multicast.ttl)) >= 0) {
444       debug(30, 2) ("ORTEDomainCreate: ttl set on: %u\n",
445                     d->domainProp.multicast.ttl);
446     }
447   }
448
449   /************************************************************************/
450   /* tests for valid resources */
451   if ((d->taskRecvUnicastMetatraffic.sock.fd < 0) ||
452       (d->taskSend.sock.fd < 0) ||
453       (d->domainProp.multicast.enabled &&
454        (d->taskRecvUnicastUserdata.sock.fd < 0)) ||
455       (d->domainProp.multicast.enabled &&
456        (d->taskRecvMulticastUserdata.sock.fd < 0)) ||
457       (d->domainProp.multicast.enabled &&
458        (d->taskRecvMulticastMetatraffic.sock.fd < 0))) {
459     debug(30, 0) ("ORTEDomainCreate: Error creating socket(s).\n");
460     goto err_sock;
461   }
462
463   if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) ||
464       (!d->taskSend.mb.cdrCodec.buffer) ||
465       (d->domainProp.multicast.enabled && !manager &&
466        !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) ||
467       (d->domainProp.multicast.enabled && !manager &&
468        !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) ||
469       (d->domainProp.multicast.enabled && !manager &&
470        !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) {    //no a memory
471     debug(30, 0) ("ORTEDomainCreate: Error creating buffer(s).\n");
472     goto err_sock;
473   }
474
475   /************************************************************************/
476   //Generates local GUID
477   if (d->domainProp.IFCount > 0)
478     d->guid.hid = d->domainProp.IFProp[0].ipAddress;
479   else
480     d->guid.hid = StringToIPAddress("127.0.0.1");
481   if (manager) {
482     d->guid.aid = (d->taskSend.sock.port<<8)+MANAGER;
483   } else {
484     d->guid.aid = (d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
485   }
486   d->guid.oid = OID_APP;
487   debug(30, 2) ("ORTEDomainCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
488                 GUID_PRINTF(d->guid));
489
490   //create HEADER of message for sending task
491   RTPSHeaderCreate(&d->taskSend.mb.cdrCodec, d->guid.hid, d->guid.aid);
492   d->taskSend.mb.needSend = ORTE_FALSE;
493   d->taskSend.mb.containsInfoReply = ORTE_FALSE;
494   d->taskSend.mb.cdrCodecDirect = NULL;
495
496   //Self object data & fellow managers object data
497   appParams = (AppParams *)MALLOC(sizeof(AppParams));
498   if (!appParams) {
499     goto err_sock;
500   }
501   AppParamsInit(appParams);
502   appParams->expirationTime = d->domainProp.baseProp.expirationTime;
503   VENDOR_ID_OCERA(appParams->vendorId);
504   appParams->hostId = d->guid.hid;
505   appParams->appId = d->guid.aid;
506   appParams->metatrafficUnicastPort = d->taskRecvUnicastMetatraffic.sock.port;
507   appParams->userdataUnicastPort = d->taskRecvUnicastUserdata.sock.port;
508   //fill unicast/multicast ip addresses
509   if (d->domainProp.IFCount) {
510     for (i = 0; i < d->domainProp.IFCount; i++)
511       appParams->unicastIPAddressList[i] = d->domainProp.IFProp[i].ipAddress;
512     appParams->unicastIPAddressCount = d->domainProp.IFCount;
513   }
514   if (d->domainProp.multicast.enabled &&
515       IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
516     appParams->metatrafficMulticastIPAddressList[appParams->metatrafficMulticastIPAddressCount] =
517       d->domainProp.multicast.ipAddress;
518     appParams->metatrafficMulticastIPAddressCount++;
519   } else {
520     if (!d->domainProp.IFCount) {
521       appParams->unicastIPAddressList[appParams->unicastIPAddressCount] =
522         StringToIPAddress("127.0.0.1");
523       appParams->unicastIPAddressCount++;
524     }
525   }
526   //KeyList
527   if (!d->domainProp.keys) {
528     appParams->managerKeyList[0] = StringToIPAddress("127.0.0.1");
529     for (i = 0; i < d->domainProp.IFCount; i++)
530       appParams->managerKeyList[i+1] = d->domainProp.IFProp[i].ipAddress;
531     if (d->domainProp.multicast.enabled &&
532         IN_MULTICAST(d->domainProp.multicast.ipAddress)) {
533       appParams->managerKeyList[i+1] = d->domainProp.multicast.ipAddress;
534       i++;
535     }
536     appParams->managerKeyCount = i+1;
537   } else {
538     appParams->managerKeyCount = i = 0;
539     while (getStringPart(d->domainProp.keys, ':', &i, sbuff))
540       appParams->managerKeyList[appParams->managerKeyCount++] =
541         StringToIPAddress(sbuff);
542   }
543   d->appParams = appParams;
544   //insert object, doesn't need to be locked
545   d->objectEntryOID = objectEntryAdd(d, &d->guid, (void *)appParams);
546   d->objectEntryOID->privateCreated = ORTE_TRUE;
547
548
549   /************************************************************************/
550   //CST objects
551   //  writerApplicationSelf (WAS)
552   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
553   cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
554   cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
555   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
556   cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
557   if (manager) {
558     cstWriterParams.registrationRetries = d->domainProp.baseProp.registrationMgrRetries;
559     cstWriterParams.registrationPeriod = d->domainProp.baseProp.registrationMgrPeriod;
560     cstWriterParams.fullAcknowledge = ORTE_FALSE;
561   } else {
562     cstWriterParams.registrationRetries = d->domainProp.baseProp.registrationAppRetries;
563     cstWriterParams.registrationPeriod = d->domainProp.baseProp.registrationAppPeriod;
564     cstWriterParams.fullAcknowledge = ORTE_TRUE;
565   }
566   CSTWriterInit(d, &d->writerApplicationSelf, d->objectEntryOID,
567                 OID_WRITE_APPSELF, &cstWriterParams, NULL);
568   if (manager) {
569     i = 0;
570     while (getStringPart(d->domainProp.mgrs, ':', &i, sbuff) > 0) {
571       GUID_RTPS guid;
572       IPAddress ipAddress = StringToIPAddress(sbuff);
573       guid.hid = ipAddress;
574       guid.aid = AID_UNKNOWN;
575       guid.oid = OID_APP;
576       if (!objectEntryFind(d, &guid)) {
577         CSTRemoteReader *cstRemoteReader;
578         appParams = (AppParams *)MALLOC(sizeof(AppParams));
579         AppParamsInit(appParams);
580         appParams->hostId = guid.hid;
581         appParams->appId = guid.aid;
582         appParams->metatrafficUnicastPort = d->appParams->metatrafficUnicastPort;
583         objectEntryOID = objectEntryAdd(d, &guid, (void *)appParams);
584         if (d->domainProp.multicast.enabled && IN_MULTICAST(ipAddress)) {
585           appParams->metatrafficMulticastIPAddressList[0] = ipAddress;
586           appParams->metatrafficMulticastIPAddressCount = 1;
587           objectEntryOID->multicastPort = port;
588         } else {
589           appParams->unicastIPAddressList[0] = ipAddress;
590           appParams->unicastIPAddressCount = 1;
591           objectEntryOID->multicastPort = 0;
592         }
593         appParams->userdataUnicastPort = 0;  //Manager support only metatraffic
594         cstRemoteReader = CSTWriterAddRemoteReader(d,
595                                                    &d->writerApplicationSelf,
596                                                    objectEntryOID,
597                                                    OID_READ_MGR,
598                                                    objectEntryOID);
599         debug(29, 2) ("ORTEDomainCreate: add fellow manager (%s)\n",
600                       IPAddressToString(ipAddress, sIPAddress));
601       }
602     }
603   } else {
604     //  add to WAS remote writer(s)
605     if (d->domainProp.appLocalManager) {
606       GUID_RTPS guid;
607       guid.hid = d->domainProp.appLocalManager;
608       guid.aid = AID_UNKNOWN;
609       guid.oid = OID_APP;
610       if (!objectEntryFind(d, &guid)) {
611         appParams = (AppParams *)MALLOC(sizeof(AppParams));
612         AppParamsInit(appParams);
613         appParams->hostId = guid.hid;
614         appParams->appId = guid.aid;
615         appParams->metatrafficUnicastPort = port;
616         appParams->userdataUnicastPort = 0;  //Manager support only metatraffic
617         appParams->unicastIPAddressList[0] = d->domainProp.appLocalManager;
618         appParams->unicastIPAddressCount = 1;
619         objectEntryOID = objectEntryAdd(d, &guid, (void *)appParams);
620         CSTWriterAddRemoteReader(d,
621                                  &d->writerApplicationSelf,
622                                  objectEntryOID,
623                                  OID_READ_MGR,
624                                  objectEntryOID);
625         debug(30, 2) ("ORTEDomainCreate: add manager (%s)\n",
626                       IPAddressToString(d->domainProp.appLocalManager, sIPAddress));
627       }
628     }
629   }
630
631   //  readerManagers
632   cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
633   cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
634   cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
635   if (manager) {
636     cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
637     cstReaderParams.repeatActiveQueryTime = iNtpTime;  //RM cann't repeatly send ACKf
638   } else {
639     cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
640     cstReaderParams.fullAcknowledge = ORTE_TRUE;
641   }
642   CSTReaderInit(d, &d->readerManagers, d->objectEntryOID,
643                 OID_READ_MGR, &cstReaderParams, NULL);
644
645   //  readerApplications
646   cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
647   cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
648   cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
649   cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
650   cstReaderParams.fullAcknowledge = ORTE_TRUE;
651   CSTReaderInit(d, &d->readerApplications, d->objectEntryOID,
652                 OID_READ_APP, &cstReaderParams, NULL);
653
654   if (manager) {
655     //  writerApplications
656     cstWriterParams.registrationRetries = 0;
657     NTPTIME_ZERO(cstWriterParams.registrationPeriod);
658     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
659     cstWriterParams.refreshPeriod = iNtpTime;  //only WAS,WM can refresh csChange(s)
660     cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
661     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
662     cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
663     cstWriterParams.fullAcknowledge = ORTE_FALSE;
664     CSTWriterInit(d, &d->writerApplications, d->objectEntryOID,
665                   OID_WRITE_APP, &cstWriterParams, NULL);
666
667     //  writerManagers
668     cstWriterParams.registrationRetries = 0;
669     NTPTIME_ZERO(cstWriterParams.registrationPeriod);
670     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
671     cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
672     cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
673     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
674     cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
675     cstWriterParams.fullAcknowledge = ORTE_TRUE;
676     CSTWriterInit(d, &d->writerManagers, d->objectEntryOID,
677                   OID_WRITE_MGR, &cstWriterParams, NULL);
678   }
679
680   if (!manager) {
681     //  writerPublications
682     cstWriterParams.registrationRetries = 0;
683     NTPTIME_ZERO(cstWriterParams.registrationPeriod);
684     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
685     cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
686     cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
687     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
688     cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
689     cstWriterParams.fullAcknowledge = ORTE_TRUE;
690     CSTWriterInit(d, &d->writerPublications, d->objectEntryOID,
691                   OID_WRITE_PUBL, &cstWriterParams, NULL);
692     //  writerSubscriptions
693     cstWriterParams.registrationRetries = 0;
694     NTPTIME_ZERO(cstWriterParams.registrationPeriod);
695     NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
696     cstWriterParams.refreshPeriod = d->domainProp.baseProp.refreshPeriod;
697     cstWriterParams.repeatAnnounceTime = d->domainProp.baseProp.repeatAnnounceTime;
698     NtpTimeAssembFromMs(cstWriterParams.delayResponceTime, 0, 20);
699     cstWriterParams.HBMaxRetries = d->domainProp.baseProp.HBMaxRetries;
700     cstWriterParams.fullAcknowledge = ORTE_TRUE;
701     CSTWriterInit(d, &d->writerSubscriptions, d->objectEntryOID,
702                   OID_WRITE_SUBS, &cstWriterParams, NULL);
703     //  readerPublications
704     cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
705     cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
706     cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
707     cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
708     cstReaderParams.fullAcknowledge = ORTE_TRUE;
709     CSTReaderInit(d, &d->readerPublications, d->objectEntryOID,
710                   OID_READ_PUBL, &cstReaderParams, NULL);
711     //  readerSubscriptions
712     cstReaderParams.delayResponceTimeMin = d->domainProp.baseProp.delayResponceTimeACKMin;
713     cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
714     cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
715     cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;
716     cstReaderParams.fullAcknowledge = ORTE_TRUE;
717     CSTReaderInit(d, &d->readerSubscriptions, d->objectEntryOID,
718                   OID_READ_SUBS, &cstReaderParams, NULL);
719   }
720
721   //add csChange for WAS
722   appSelfParamChanged(d, ORTE_FALSE, ORTE_FALSE, ORTE_FALSE, ORTE_TRUE);
723
724   debug(30, 10) ("ORTEDomainCreate: finished\n");
725   return d;
726
727 //err:
728   if (!errno_save)
729     errno_save = errno;
730   /* TODO */
731   FREE(appParams);
732 err_sock:
733   if (!errno_save)
734     errno_save = errno;
735   sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
736   sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
737   sock_cleanup(&d->taskRecvUnicastUserdata.sock);
738   sock_cleanup(&d->taskRecvMulticastUserdata.sock);
739   sock_cleanup(&d->taskSend.sock);
740   pthread_rwlock_destroy(&d->typeEntry.lock);
741   if (d->domainProp.multicast.enabled) {
742     CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
743     CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
744   }
745   if (!manager) {
746     CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
747   }
748   CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
749 err_domainProp:
750   if (!errno_save)
751     errno_save = errno;
752   pthread_rwlock_destroy(&d->patternEntry.lock);
753   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
754   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
755   pthread_rwlock_destroy(&d->subscriptions.lock);
756   pthread_rwlock_destroy(&d->publications.lock);
757   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
758   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
759   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
760   FREE(d);
761   errno = errno_save;
762   return NULL;
763 }
764
765 /*****************************************************************************/
766 Boolean
767 ORTEDomainDestroy(ORTEDomain *d, Boolean manager)
768 {
769   CSTWriter             *cstWriter;
770   CSTReader             *cstReader;
771
772   debug(30, 10) ("ORTEDomainDestroy: start\n");
773   if (!d)
774     return ORTE_FALSE;
775
776   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
777   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
778   appSelfParamChanged(d, ORTE_TRUE, ORTE_TRUE, ORTE_FALSE, ORTE_FALSE);
779   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
780   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
781
782   //Stoping threads
783   ORTEDomainRecvThreadStop(&d->taskRecvUnicastMetatraffic);
784   ORTEDomainRecvThreadStop(&d->taskRecvMulticastMetatraffic);
785   ORTEDomainRecvThreadStop(&d->taskRecvUnicastUserdata);
786   ORTEDomainRecvThreadStop(&d->taskRecvMulticastUserdata);
787   ORTEDomainSendThreadStop(&d->taskSend);
788   debug(30, 3) ("ORTEDomainDestroy: threads stoped\n");
789
790   //CSTReaders and CSTWriters
791   CSTWriterDelete(d, &d->writerApplicationSelf);
792   CSTReaderDelete(d, &d->readerManagers);
793   CSTReaderDelete(d, &d->readerApplications);
794   if (manager) {
795     CSTWriterDelete(d, &d->writerManagers);
796     CSTWriterDelete(d, &d->writerApplications);
797   } else {
798     CSTWriterDelete(d, &d->writerPublications);
799     CSTWriterDelete(d, &d->writerSubscriptions);
800     CSTReaderDelete(d, &d->readerPublications);
801     CSTReaderDelete(d, &d->readerSubscriptions);
802
803     while ((cstWriter = CSTWriter_cut_first(&d->publications))) {
804       CSTWriterDelete(d, cstWriter);
805       FREE(cstWriter);
806     }
807     while ((cstReader = CSTReader_cut_first(&d->subscriptions))) {
808       CSTReaderDelete(d, cstReader);
809       FREE(cstReader);
810     }
811   }
812
813   //objects in objectsEntry
814   objectEntryDeleteAll(d, &d->objectEntry);
815   debug(30, 3) ("ORTEDomainDestroy: deleted all objects\n");
816
817   //Sockets
818   sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
819   sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
820   sock_cleanup(&d->taskRecvUnicastUserdata.sock);
821   sock_cleanup(&d->taskRecvMulticastUserdata.sock);
822   sock_cleanup(&d->taskSend.sock);
823
824
825   //Signals
826   pthread_cond_destroy(&d->objectEntry.htimSendCond);
827   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
828
829   //rwLocks
830   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
831   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
832   pthread_rwlock_destroy(&d->publications.lock);
833   pthread_rwlock_destroy(&d->subscriptions.lock);
834   pthread_rwlock_destroy(&d->psEntry.publicationsLock);
835   pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
836
837   //TypeRegister
838   ORTETypeRegisterDestroyAll(d);
839
840   //Pattern
841   ORTEDomainAppSubscriptionPatternDestroy(d);
842   pthread_rwlock_destroy(&d->patternEntry.lock);
843
844   //Release buffers
845   CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
846   CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
847   CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
848   CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
849   CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
850
851   //Free domain instance
852   FREE(d);
853
854   debug(30, 10) ("ORTEDomainDestroy: finished\n");
855
856   return ORTE_TRUE;
857 }