]> rtime.felk.cvut.cz Git - orte.git/blob - orte/liborte/ORTEDomainMgr.c
4254dc83e39c7fc5967ae7a037280ac7e383d13f
[orte.git] / orte / liborte / ORTEDomainMgr.c
1 /*
2  *  $Id: ORTEDomainMgr.c,v 0.0.0.1      2003/09/12
3  *
4  *  DEBUG:  section 29                  Domain manager
5  *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
6  *
7  *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
8  *  --------------------------------------------------------------------
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *  
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *  
20  */ 
21
22 #include "orte.h"
23
24 /*****************************************************************************/
25 ORTEDomain *
26 ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
27                     ORTEDomainAppEvents *events,Boolean suspended) {
28   ORTEDomain        *d;
29   ObjectEntryOID    *objectEntryOID;
30   AppParams         *appParams;
31   CSTWriterParams   cstWriterParams;
32   CSTReaderParams   cstReaderParams;
33   char              iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
34   char              sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
35   char              sbuff[128];
36   int               i;
37   u_int16_t         port=0;
38
39   debug(29,10) ("ORTEDomainMgrCreate: start\n");
40   //Create domainApplication
41   d=MALLOC(sizeof(ORTEDomain));
42   if (!d) return NULL;  //no memory
43   //initialization local values
44   d->domain=domain;
45   d->taskRecvMetatraffic.terminate=ORTE_TRUE;
46   d->taskSend.terminate=ORTE_TRUE;
47   d->taskRecvMetatraffic.sock.port=0;
48   d->taskSend.sock.port=0;
49   //init structure objectEntry
50   ObjectEntryHID_init_root_field(&d->objectEntry);
51   pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
52   htimerRoot_init_queue(&d->objectEntry);
53   pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
54   sem_init(&d->objectEntry.htimSendSem, 0, 0);
55   
56   //create domainProp 
57   if (prop!=NULL) {
58     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
59   } else {
60     ORTEDomainPropDefaultGet(&d->domainProp);
61   }
62   
63   //print local IP addresses
64   iflocal[0]=0;
65   if (d->domainProp.IFCount) {
66     for(i=0;i<d->domainProp.IFCount;i++) 
67       strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
68     debug(29,2) ("ORTEDomainMgrCreate: localIPAddres(es) %s\n",iflocal);
69   } else{
70     debug(29,2) ("ORTEDomainMgrCreate: no activ interface card\n");
71   }
72
73   //DomainEvents
74   if (events!=NULL) {
75     memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
76   } else {
77     memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
78   }
79
80   //local buffers
81   d->mbRecvMetatraffic.cdrStream.buffer=
82       (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
83   d->mbRecvUserdata.cdrStream.buffer=NULL;
84   d->mbSend.cdrStream.buffer=
85       (u_int8_t*)MALLOC(d->domainProp.sendBuffSize);
86   if ((!d->mbRecvMetatraffic.cdrStream.buffer) || 
87       (!d->mbSend.cdrStream.buffer)) {    //no memory
88     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
89     FREE(d->mbSend.cdrStream.buffer);
90     FREE(d);
91     return NULL;
92   }
93   d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
94   d->mbRecvMetatraffic.cdrStream.length=0;
95   d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
96   d->mbRecvUserdata.cdrStream.length=0;
97   d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
98   d->mbSend.cdrStream.length=0;
99
100   //Sockets
101   sock_init_udp(&d->taskRecvMetatraffic.sock);
102   sock_init_udp(&d->taskSend.sock);
103   if (d->domainProp.multicast.enabled) {
104     Domain2PortMulticastMetatraffic(d->domain,port);   
105   } else {
106     Domain2Port(d->domain,port);
107   }
108   sock_bind(&d->taskRecvMetatraffic.sock,port);    //receiving port
109   debug(29,2) ("ORTEDomainMgrCreate: bind on port(Recv): %u\n",
110                d->taskRecvMetatraffic.sock.port);
111   sock_bind(&d->taskSend.sock,0);       //give me sending port
112   debug(29,2) ("ORTEDomainAppCreate: bind on port(Send): %u\n",
113                d->taskSend.sock.port);
114   if (d->domainProp.multicast.enabled) {
115     struct ip_mreq mreq;
116     //ttl
117     if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL, 
118         &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
119       debug(29,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
120            d->domainProp.multicast.ttl);
121     } 
122     // join multicast group
123     mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
124     mreq.imr_interface.s_addr=htonl(INADDR_ANY);
125     if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
126         (void *) &mreq, sizeof(mreq))>=0) {
127       debug(29,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
128            IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
129     }
130   }
131   if ((d->taskRecvMetatraffic.sock.fd<0) || (d->taskSend.sock.fd<0) ||
132       (port!=d->taskRecvMetatraffic.sock.port))  {
133     printf("Error creating socket(s).\n");
134     sock_cleanup(&d->taskRecvMetatraffic.sock);
135     sock_cleanup(&d->taskSend.sock);
136     FREE(d->mbRecvMetatraffic.cdrStream.buffer);
137     FREE(d->mbSend.cdrStream.buffer);
138     FREE(d);
139     return NULL;
140   }
141
142   //Generates local GUID
143   if (d->domainProp.IFCount>0) 
144     d->guid.hid=d->domainProp.IFProp[0].ipAddress;
145   else
146     d->guid.hid=StringToIPAddress("127.0.0.1");
147   d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER; 
148   d->guid.oid=OID_APP;
149   debug(29,2) ("ORTEDomainMgrCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
150                d->guid.hid,d->guid.aid,d->guid.oid); 
151
152   //create HEADER of message for sending task
153   RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
154   d->mbSend.cdrStream.bufferPtr=
155       d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
156   d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;    
157   d->mbSend.needSend=ORTE_FALSE;
158   d->mbSend.containsInfoReply=ORTE_FALSE;  
159
160   //Self object data & fellow managers object data
161   appParams=(AppParams*)MALLOC(sizeof(AppParams));
162   AppParamsInit(appParams);
163   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
164   VENDOR_ID_OCERA(appParams->vendorId);
165   appParams->hostId=d->guid.hid;
166   appParams->appId=d->guid.aid;
167   appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
168   appParams->userdataUnicastPort=0;  //Manager support only metatraffic
169   if (d->domainProp.multicast.enabled) {
170     //multicast
171     for(i=0;i<d->domainProp.IFCount;i++)
172       appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
173     appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
174   } else {
175     //unicast
176     if (d->domainProp.IFCount) {
177       for(i=0;i<d->domainProp.IFCount;i++)
178         appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
179       appParams->unicastIPAddressCount=d->domainProp.IFCount;
180     } else {
181       appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
182       appParams->unicastIPAddressCount=1;
183     }
184   }
185   //managerKeyList
186   if (!d->domainProp.keys) {
187     appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
188     for(i=0;i<d->domainProp.IFCount;i++)
189       appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
190     appParams->managerKeyCount=d->domainProp.IFCount+1;
191   } else {
192     appParams->managerKeyCount=i=0;
193     while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
194       printf("a");
195       ORTESleepMs(100);
196       appParams->managerKeyList[appParams->managerKeyCount++]=
197           StringToIPAddress(sbuff);
198     }
199     
200   }
201   d->appParams=appParams;
202   //insert object, doesn't need to be locked
203   d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
204   d->objectEntryOID->privateCreated=ORTE_TRUE;
205
206   //CST objects
207   //  writerApplicationSelf (WAS)
208   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
209   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
210   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
211   NTPTIME_ZERO(cstWriterParams.delayResponceTime);
212   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
213   cstWriterParams.fullAcknowledge=ORTE_FALSE;
214   CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
215       OID_WRITE_APPSELF,&cstWriterParams,NULL);
216   //  add to WAS remote writer(s)
217   i=0;
218   while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
219     GUID_RTPS guid;
220     IPAddress ipAddress=StringToIPAddress(sbuff);
221     guid.hid=ipAddress;
222     guid.aid=AID_UNKNOWN;
223     guid.oid=OID_APP;
224     if (!objectEntryFind(d,&guid)) {
225       appParams=(AppParams*)MALLOC(sizeof(AppParams));
226       AppParamsInit(appParams);
227       appParams->hostId=guid.hid;
228       appParams->appId=guid.aid;
229       appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
230       appParams->userdataUnicastPort=0;  //Manager support only metatraffic
231       appParams->unicastIPAddressList[0]=ipAddress;
232       appParams->unicastIPAddressCount=1;
233       objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
234       CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
235           OID_READ_MGR);
236       debug(29,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
237                   IPAddressToString(ipAddress,sIPAddress));
238     }
239   }
240   //  readerManagers
241   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
242   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
243   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
244   cstReaderParams.repeatActiveQueryTime=iNtpTime;  //RM cann't repeatly send ACKf
245   cstReaderParams.fullAcknowledge=ORTE_FALSE;      //never will send ACK
246   CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
247       OID_READ_MGR,&cstReaderParams,NULL);
248   //  readerApplications
249   cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
250   cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
251   cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
252   cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
253   cstReaderParams.fullAcknowledge=ORTE_TRUE;
254   CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
255       OID_READ_APP,&cstReaderParams,NULL);
256   //  writerApplications
257   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
258   cstWriterParams.refreshPeriod=iNtpTime;  //only WAS,WM can refresh csChange(s)
259   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
260   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
261   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
262   cstWriterParams.fullAcknowledge=ORTE_FALSE;
263   CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
264       OID_WRITE_APP,&cstWriterParams,NULL);
265   //  writerManagers
266   NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
267   cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod; 
268   cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
269   NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
270   cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
271   cstWriterParams.fullAcknowledge=ORTE_TRUE;
272   CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
273       OID_WRITE_MGR,&cstWriterParams,NULL);
274   
275   //add csChange for WAS
276   appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
277   
278   //Start threads
279   if (!suspended) {
280     ORTEDomainStart(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE);
281   }
282
283   debug(29,10) ("ORTEDomainMgrCreate: finished\n");
284   return d;
285 }
286
287 /*****************************************************************************/
288 Boolean
289 ORTEDomainMgrDestroy(ORTEDomain *d) {
290
291   debug(29,10) ("ORTEDomainMgrDestroy: start\n");
292   pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
293   pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
294   appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);    
295   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
296   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
297   //Stoping threads
298   if(!d->taskRecvMetatraffic.terminate) {
299     d->taskRecvMetatraffic.terminate=ORTE_TRUE;
300     ORTEDomainWakeUpReceivingThread(d,
301         &d->taskSend.sock,d->taskRecvMetatraffic.sock.port); 
302     pthread_join(d->taskRecvMetatraffic.thread,NULL); 
303   }
304   if (!d->taskSend.terminate) {
305     d->taskSend.terminate=ORTE_TRUE;
306     ORTEDomainWakeUpSendingThread(&d->objectEntry); 
307     pthread_join(d->taskSend.thread,NULL); 
308   }
309   debug(29,8) ("ORTEDomainMgrDestroy: threads stoped and destroyed\n");
310
311   objectEntryDump(&d->objectEntry);  
312     
313   //Sockets
314   sock_cleanup(&d->taskRecvMetatraffic.sock);
315   sock_cleanup(&d->taskSend.sock);
316
317   //Semas
318   sem_destroy(&d->objectEntry.htimSendSem);
319   
320   //rwLocks
321   pthread_rwlock_destroy(&d->objectEntry.objRootLock);
322   pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
323
324   //CSTReaders and CSTWriters
325   CSTReaderDelete(d,&d->readerManagers);
326   CSTReaderDelete(d,&d->readerApplications);
327   CSTWriterDelete(d,&d->writerManagers);
328   CSTWriterDelete(d,&d->writerApplications);
329   CSTWriterDelete(d,&d->writerApplicationSelf);
330   
331   //objects in objectsEntry
332   objectEntryDeleteAll(d,&d->objectEntry);
333   
334   FREE(d->mbRecvMetatraffic.cdrStream.buffer);
335   FREE(d->mbSend.cdrStream.buffer);
336   FREE(d);
337   debug(29,10) ("ORTEDomainMgrDestroy: finished\n");
338   return ORTE_TRUE;
339 }