]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomainMgr.c
26f6a0e3e7ddf39fa8114d61ea442bf1e1459f55
[orte.git] / orte / liborte / ORTEDomainMgr.c
1 /*
2  *  $Id: ORTEDomainMgr.c,v 0.0.0.1      2003/09/12
3  *
4  *  DEBUG:  section 29                  Domain manager
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 ORTEDomain *
26 ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
27                     ORTEDomainAppEvents *events,Boolean suspended) {
28   ORTEDomain        *d;
29   ObjectEntryOID    *objectEntryOID;
30   AppParams         *appParams;
31   CSTWriterParams   cstWriterParams;
32   CSTReaderParams   cstReaderParams;
33   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
34   char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
35   char              sbuff[128];
36   int               i;
37   uint16_t          port=0;
38
39   debug(29,10) ("ORTEDomainMgrCreate: start\n");
40   //Create domainApplication
41   d=MALLOC(sizeof(ORTEDomain));
42   if (!d) return NULL;  //no memory
43   //initialization local values
44   d->domain=domain;
45   d->taskRecvMetatraffic.terminate=ORTE_TRUE;
46   d->taskSend.terminate=ORTE_TRUE;
47   d->taskRecvMetatraffic.sock.port=0;
48   d->taskSend.sock.port=0;
49   //init structure objectEntry
50   ObjectEntryHID_init_root_field(&d->objectEntry);
51   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
52   htimerRoot_init_queue(&d->objectEntry);
53   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
54   pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
55   pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
56   d->objectEntry.htimSendCondValue=0;
57   
58   //create domainProp 
59   if (prop!=NULL) {
60     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
61   } else {
62     ORTEDomainPropDefaultGet(&d->domainProp);
63   }
64   
65   //print local IP addresses
66   iflocal[0]=0;
67   if (d->domainProp.IFCount) {
68     for(i=0;i<d->domainProp.IFCount;i++) 
69       strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
70     debug(29,2) ("ORTEDomainMgrCreate: localIPAddres(es) %s\n",iflocal);
71   } else{
72     debug(29,2) ("ORTEDomainMgrCreate: no active interface card\n");
73   }
74
75   //DomainEvents
76   if (events!=NULL) {
77     memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
78   } else {
79     memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
80   }
81
82   //local buffers
83   d->mbRecvMetatraffic.cdrStream.buffer=
84       (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
85   d->mbRecvUserdata.cdrStream.buffer=NULL;
86   d->mbSend.cdrStream.buffer=
87       (uint8_t*)MALLOC(d->domainProp.sendBuffSize);
88   if ((!d->mbRecvMetatraffic.cdrStream.buffer) || 
89       (!d->mbSend.cdrStream.buffer)) {    //no memory
90     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
91     FREE(d->mbSend.cdrStream.buffer);
92     FREE(d);
93     return NULL;
94   }
95   d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
96   d->mbRecvMetatraffic.cdrStream.length=0;
97   d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
98   d->mbRecvUserdata.cdrStream.length=0;
99   d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
100   d->mbSend.cdrStream.length=0;
101
102   //Sockets
103   sock_init_udp(&d->taskRecvMetatraffic.sock);
104   sock_init_udp(&d->taskSend.sock);
105   if (d->domainProp.multicast.enabled) {
106     Domain2PortMulticastMetatraffic(d->domain,port);   
107   } else {
108     Domain2Port(d->domain,port);
109   }
110   sock_bind(&d->taskRecvMetatraffic.sock,port);    //receiving port
111   debug(29,2) ("ORTEDomainMgrCreate: bind on port(Recv): %u\n",
112                d->taskRecvMetatraffic.sock.port);
113   sock_bind(&d->taskSend.sock,0);       //give me sending port
114   debug(29,2) ("ORTEDomainAppCreate: bind on port(Send): %u\n",
115                d->taskSend.sock.port);
116   if (d->domainProp.multicast.enabled) {
117     struct ip_mreq mreq;
118     //ttl
119     if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL, 
120         &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
121       debug(29,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
122            d->domainProp.multicast.ttl);
123     } 
124     // join multicast group
125     mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
126     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
127     if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
128         (void *) &mreq, sizeof(mreq))>=0) {
129       debug(29,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
130            IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
131     }
132   }
133   if ((d->taskRecvMetatraffic.sock.fd<0) || (d->taskSend.sock.fd<0) ||
134       (port!=d->taskRecvMetatraffic.sock.port))  {
135     printf("Error creating socket(s).\n");
136     sock_cleanup(&d->taskRecvMetatraffic.sock);
137     sock_cleanup(&d->taskSend.sock);
138     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
139     FREE(d->mbSend.cdrStream.buffer);
140     FREE(d);
141     return NULL;
142   }
143
144   //Generates local GUID
145   if (d->domainProp.IFCount>0) 
146     d->guid.hid=d->domainProp.IFProp[0].ipAddress;
147   else
148     d->guid.hid=StringToIPAddress("127.0.0.1");
149   d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER; 
150   d->guid.oid=OID_APP;
151   debug(29,2) ("ORTEDomainMgrCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
152                d->guid.hid,d->guid.aid,d->guid.oid); 
153
154   //create HEADER of message for sending task
155   RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
156   d->mbSend.cdrStream.bufferPtr=
157       d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
158   d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;    
159   d->mbSend.needSend=ORTE_FALSE;
160   d->mbSend.containsInfoReply=ORTE_FALSE;  
161
162   //Self object data & fellow managers object data
163   appParams=(AppParams*)MALLOC(sizeof(AppParams));
164   AppParamsInit(appParams);
165   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
166   VENDOR_ID_OCERA(appParams->vendorId);
167   appParams->hostId=d->guid.hid;
168   appParams->appId=d->guid.aid;
169   appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
170   appParams->userdataUnicastPort=0;  //Manager support only metatraffic
171   if (d->domainProp.multicast.enabled) {
172     //multicast
173     for(i=0;i<d->domainProp.IFCount;i++)
174       appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
175     appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
176   } else {
177     //unicast
178     if (d->domainProp.IFCount) {
179       for(i=0;i<d->domainProp.IFCount;i++)
180         appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
181       appParams->unicastIPAddressCount=d->domainProp.IFCount;
182     } else {
183       appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
184       appParams->unicastIPAddressCount=1;
185     }
186   }
187   //managerKeyList
188   if (!d->domainProp.keys) {
189     appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
190     for(i=0;i<d->domainProp.IFCount;i++)
191       appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
192     appParams->managerKeyCount=d->domainProp.IFCount+1;
193   } else {
194     appParams->managerKeyCount=i=0;
195     while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
196       printf("a");
197       ORTESleepMs(100);
198       appParams->managerKeyList[appParams->managerKeyCount++]=
199           StringToIPAddress(sbuff);
200     }
201     
202   }
203   d->appParams=appParams;
204   //insert object, doesn't need to be locked
205   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
206   d->objectEntryOID->privateCreated=ORTE_TRUE;
207
208   //CST objects
209   //  writerApplicationSelf (WAS)
210   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
211   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
212   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
213   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
214   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
215   cstWriterParams.fullAcknowledge=ORTE_FALSE;
216   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
217       OID_WRITE_APPSELF,&cstWriterParams,NULL);
218   //  add to WAS remote writer(s)
219   i=0;
220   while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
221     GUID_RTPS guid;
222     IPAddress ipAddress=StringToIPAddress(sbuff);
223     guid.hid=ipAddress;
224     guid.aid=AID_UNKNOWN;
225     guid.oid=OID_APP;
226     if (!objectEntryFind(d,&guid)) {
227       appParams=(AppParams*)MALLOC(sizeof(AppParams));
228       AppParamsInit(appParams);
229       appParams->hostId=guid.hid;
230       appParams->appId=guid.aid;
231       appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
232       appParams->userdataUnicastPort=0;  //Manager support only metatraffic
233       appParams->unicastIPAddressList[0]=ipAddress;
234       appParams->unicastIPAddressCount=1;
235       objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
236       CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
237           OID_READ_MGR);
238       debug(29,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
239                   IPAddressToString(ipAddress,sIPAddress));
240     }
241   }
242   //  readerManagers
243   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
244   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
245   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
246   cstReaderParams.repeatActiveQueryTime=iNtpTime;  //RM cann't repeatly send ACKf
247   cstReaderParams.fullAcknowledge=ORTE_FALSE;      //never will send ACK
248   CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
249       OID_READ_MGR,&cstReaderParams,NULL);
250   //  readerApplications
251   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
252   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
253   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
254   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
255   cstReaderParams.fullAcknowledge=ORTE_TRUE;
256   CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
257       OID_READ_APP,&cstReaderParams,NULL);
258   //  writerApplications
259   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
260   cstWriterParams.refreshPeriod=iNtpTime;  //only WAS,WM can refresh csChange(s)
261   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
262   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
263   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
264   cstWriterParams.fullAcknowledge=ORTE_FALSE;
265   CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
266       OID_WRITE_APP,&cstWriterParams,NULL);
267   //  writerManagers
268   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
269   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
270   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
271   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
272   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
273   cstWriterParams.fullAcknowledge=ORTE_TRUE;
274   CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
275       OID_WRITE_MGR,&cstWriterParams,NULL);
276   
277   //add csChange for WAS
278   appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
279   
280   //Start threads
281   if (!suspended) {
282     ORTEDomainStart(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE);
283   }
284
285   debug(29,10) ("ORTEDomainMgrCreate: finished\n");
286   return d;
287 }
288
289 /*****************************************************************************/
290 Boolean
291 ORTEDomainMgrDestroy(ORTEDomain *d) {
292
293   debug(29,10) ("ORTEDomainMgrDestroy: start\n");
294   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
295   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
296   appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
297   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
298   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
299   //Stoping threads
300   if(!d->taskRecvMetatraffic.terminate) {
301     d->taskRecvMetatraffic.terminate=ORTE_TRUE;
302     ORTEDomainWakeUpReceivingThread(d,
303         &d->taskSend.sock,d->taskRecvMetatraffic.sock.port); 
304     pthread_join(d->taskRecvMetatraffic.thread,NULL); 
305   }
306   if (!d->taskSend.terminate) {
307     d->taskSend.terminate=ORTE_TRUE;
308     ORTEDomainWakeUpSendingThread(&d->objectEntry); 
309     pthread_join(d->taskSend.thread,NULL); 
310   }
311   debug(29,8) ("ORTEDomainMgrDestroy: threads stoped and destroyed\n");
312
313   objectEntryDump(&d->objectEntry);  
314     
315   //Sockets
316   sock_cleanup(&d->taskRecvMetatraffic.sock);
317   sock_cleanup(&d->taskSend.sock);
318
319   //Signals
320   pthread_cond_destroy(&d->objectEntry.htimSendCond);
321   pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
322   
323   //rwLocks
324   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
325   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
326
327   //CSTReaders and CSTWriters
328   CSTReaderDelete(d,&d->readerManagers);
329   CSTReaderDelete(d,&d->readerApplications);
330   CSTWriterDelete(d,&d->writerManagers);
331   CSTWriterDelete(d,&d->writerApplications);
332   CSTWriterDelete(d,&d->writerApplicationSelf);
333   
334   //objects in objectsEntry
335   objectEntryDeleteAll(d,&d->objectEntry);
336   
337   FREE(d->mbRecvMetatraffic.cdrStream.buffer);
338   FREE(d->mbSend.cdrStream.buffer);
339   FREE(d);
340   debug(29,10) ("ORTEDomainMgrDestroy: finished\n");
341   return ORTE_TRUE;
342 }