2 * $Id: ORTEDomainMgr.c,v 0.0.0.1 2003/09/12
4 * DEBUG: section 29 Domain manager
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
26 ORTEDomainMgrCreate(int domain, ORTEDomainProp *prop,
27 ORTEDomainAppEvents *events,Boolean suspended) {
29 ObjectEntryOID *objectEntryOID;
31 CSTWriterParams cstWriterParams;
32 CSTReaderParams cstReaderParams;
33 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
34 char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
39 debug(29,10) ("ORTEDomainMgrCreate: start\n");
40 //Create domainApplication
41 d=MALLOC(sizeof(ORTEDomain));
42 if (!d) return NULL; //no memory
43 //initialization local values
45 d->taskRecvMetatraffic.terminate=ORTE_TRUE;
46 d->taskSend.terminate=ORTE_TRUE;
47 d->taskRecvMetatraffic.sock.port=0;
48 d->taskSend.sock.port=0;
49 //init structure objectEntry
50 ObjectEntryHID_init_root_field(&d->objectEntry);
51 pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
52 htimerRoot_init_queue(&d->objectEntry);
53 pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
54 pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
55 pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
56 d->objectEntry.htimSendCondValue=0;
60 memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
62 ORTEDomainPropDefaultGet(&d->domainProp);
65 //print local IP addresses
67 if (d->domainProp.IFCount) {
68 for(i=0;i<d->domainProp.IFCount;i++)
69 strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress,sIPAddress));
70 debug(29,2) ("ORTEDomainMgrCreate: localIPAddres(es) %s\n",iflocal);
72 debug(29,2) ("ORTEDomainMgrCreate: no active interface card\n");
77 memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
79 memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
83 d->mbRecvMetatraffic.cdrStream.buffer=
84 (uint8_t*)MALLOC(d->domainProp.recvBuffSize);
85 d->mbRecvUserdata.cdrStream.buffer=NULL;
86 d->mbSend.cdrStream.buffer=
87 (uint8_t*)MALLOC(d->domainProp.sendBuffSize);
88 if ((!d->mbRecvMetatraffic.cdrStream.buffer) ||
89 (!d->mbSend.cdrStream.buffer)) { //no memory
90 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
91 FREE(d->mbSend.cdrStream.buffer);
95 d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
96 d->mbRecvMetatraffic.cdrStream.length=0;
97 d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
98 d->mbRecvUserdata.cdrStream.length=0;
99 d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
100 d->mbSend.cdrStream.length=0;
103 sock_init_udp(&d->taskRecvMetatraffic.sock);
104 sock_init_udp(&d->taskSend.sock);
105 if (d->domainProp.multicast.enabled) {
106 Domain2PortMulticastMetatraffic(d->domain,port);
108 Domain2Port(d->domain,port);
110 sock_bind(&d->taskRecvMetatraffic.sock,port); //receiving port
111 debug(29,2) ("ORTEDomainMgrCreate: bind on port(Recv): %u\n",
112 d->taskRecvMetatraffic.sock.port);
113 sock_bind(&d->taskSend.sock,0); //give me sending port
114 debug(29,2) ("ORTEDomainAppCreate: bind on port(Send): %u\n",
115 d->taskSend.sock.port);
116 if (d->domainProp.multicast.enabled) {
119 if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL,
120 &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
121 debug(29,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
122 d->domainProp.multicast.ttl);
124 // join multicast group
125 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
126 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
127 if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
128 (void *) &mreq, sizeof(mreq))>=0) {
129 debug(29,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
130 IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
133 if ((d->taskRecvMetatraffic.sock.fd<0) || (d->taskSend.sock.fd<0) ||
134 (port!=d->taskRecvMetatraffic.sock.port)) {
135 printf("Error creating socket(s).\n");
136 sock_cleanup(&d->taskRecvMetatraffic.sock);
137 sock_cleanup(&d->taskSend.sock);
138 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
139 FREE(d->mbSend.cdrStream.buffer);
144 //Generates local GUID
145 if (d->domainProp.IFCount>0)
146 d->guid.hid=d->domainProp.IFProp[0].ipAddress;
148 d->guid.hid=StringToIPAddress("127.0.0.1");
149 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGER;
151 debug(29,2) ("ORTEDomainMgrCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
152 d->guid.hid,d->guid.aid,d->guid.oid);
154 //create HEADER of message for sending task
155 RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
156 d->mbSend.cdrStream.bufferPtr=
157 d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
158 d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;
159 d->mbSend.needSend=ORTE_FALSE;
160 d->mbSend.containsInfoReply=ORTE_FALSE;
162 //Self object data & fellow managers object data
163 appParams=(AppParams*)MALLOC(sizeof(AppParams));
164 AppParamsInit(appParams);
165 appParams->expirationTime=d->domainProp.baseProp.expirationTime;
166 VENDOR_ID_OCERA(appParams->vendorId);
167 appParams->hostId=d->guid.hid;
168 appParams->appId=d->guid.aid;
169 appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
170 appParams->userdataUnicastPort=0; //Manager support only metatraffic
171 if (d->domainProp.multicast.enabled) {
173 for(i=0;i<d->domainProp.IFCount;i++)
174 appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
175 appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
178 if (d->domainProp.IFCount) {
179 for(i=0;i<d->domainProp.IFCount;i++)
180 appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
181 appParams->unicastIPAddressCount=d->domainProp.IFCount;
183 appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
184 appParams->unicastIPAddressCount=1;
188 if (!d->domainProp.keys) {
189 appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
190 for(i=0;i<d->domainProp.IFCount;i++)
191 appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
192 appParams->managerKeyCount=d->domainProp.IFCount+1;
194 appParams->managerKeyCount=i=0;
195 while (getStringPart(d->domainProp.keys,':',&i,sbuff)) {
198 appParams->managerKeyList[appParams->managerKeyCount++]=
199 StringToIPAddress(sbuff);
203 d->appParams=appParams;
204 //insert object, doesn't need to be locked
205 d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
206 d->objectEntryOID->privateCreated=ORTE_TRUE;
209 // writerApplicationSelf (WAS)
210 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
211 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
212 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
213 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
214 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
215 cstWriterParams.fullAcknowledge=ORTE_FALSE;
216 CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
217 OID_WRITE_APPSELF,&cstWriterParams,NULL);
218 // add to WAS remote writer(s)
220 while (getStringPart(d->domainProp.mgrs,':',&i,sbuff)>0) {
222 IPAddress ipAddress=StringToIPAddress(sbuff);
224 guid.aid=AID_UNKNOWN;
226 if (!objectEntryFind(d,&guid)) {
227 appParams=(AppParams*)MALLOC(sizeof(AppParams));
228 AppParamsInit(appParams);
229 appParams->hostId=guid.hid;
230 appParams->appId=guid.aid;
231 appParams->metatrafficUnicastPort=d->appParams->metatrafficUnicastPort;
232 appParams->userdataUnicastPort=0; //Manager support only metatraffic
233 appParams->unicastIPAddressList[0]=ipAddress;
234 appParams->unicastIPAddressCount=1;
235 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
236 CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
238 debug(29,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
239 IPAddressToString(ipAddress,sIPAddress));
243 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
244 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
245 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
246 cstReaderParams.repeatActiveQueryTime=iNtpTime; //RM cann't repeatly send ACKf
247 cstReaderParams.fullAcknowledge=ORTE_FALSE; //never will send ACK
248 CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
249 OID_READ_MGR,&cstReaderParams,NULL);
250 // readerApplications
251 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
252 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
253 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
254 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
255 cstReaderParams.fullAcknowledge=ORTE_TRUE;
256 CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
257 OID_READ_APP,&cstReaderParams,NULL);
258 // writerApplications
259 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
260 cstWriterParams.refreshPeriod=iNtpTime; //only WAS,WM can refresh csChange(s)
261 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
262 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
263 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
264 cstWriterParams.fullAcknowledge=ORTE_FALSE;
265 CSTWriterInit(d,&d->writerApplications,d->objectEntryOID,
266 OID_WRITE_APP,&cstWriterParams,NULL);
268 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
269 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
270 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
271 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
272 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
273 cstWriterParams.fullAcknowledge=ORTE_TRUE;
274 CSTWriterInit(d,&d->writerManagers,d->objectEntryOID,
275 OID_WRITE_MGR,&cstWriterParams,NULL);
277 //add csChange for WAS
278 appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE,ORTE_TRUE);
282 ORTEDomainStart(d,ORTE_TRUE,ORTE_FALSE,ORTE_TRUE);
285 debug(29,10) ("ORTEDomainMgrCreate: finished\n");
289 /*****************************************************************************/
291 ORTEDomainMgrDestroy(ORTEDomain *d) {
293 debug(29,10) ("ORTEDomainMgrDestroy: start\n");
294 pthread_rwlock_wrlock(&d->objectEntry.objRootLock);
295 pthread_rwlock_wrlock(&d->objectEntry.htimRootLock);
296 appSelfParamChanged(d,ORTE_TRUE,ORTE_TRUE,ORTE_FALSE,ORTE_FALSE);
297 pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
298 pthread_rwlock_unlock(&d->objectEntry.objRootLock);
300 if(!d->taskRecvMetatraffic.terminate) {
301 d->taskRecvMetatraffic.terminate=ORTE_TRUE;
302 ORTEDomainWakeUpReceivingThread(d,
303 &d->taskSend.sock,d->taskRecvMetatraffic.sock.port);
304 pthread_join(d->taskRecvMetatraffic.thread,NULL);
306 if (!d->taskSend.terminate) {
307 d->taskSend.terminate=ORTE_TRUE;
308 ORTEDomainWakeUpSendingThread(&d->objectEntry);
309 pthread_join(d->taskSend.thread,NULL);
311 debug(29,8) ("ORTEDomainMgrDestroy: threads stoped and destroyed\n");
313 objectEntryDump(&d->objectEntry);
316 sock_cleanup(&d->taskRecvMetatraffic.sock);
317 sock_cleanup(&d->taskSend.sock);
320 pthread_cond_destroy(&d->objectEntry.htimSendCond);
321 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
324 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
325 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
327 //CSTReaders and CSTWriters
328 CSTReaderDelete(d,&d->readerManagers);
329 CSTReaderDelete(d,&d->readerApplications);
330 CSTWriterDelete(d,&d->writerManagers);
331 CSTWriterDelete(d,&d->writerApplications);
332 CSTWriterDelete(d,&d->writerApplicationSelf);
334 //objects in objectsEntry
335 objectEntryDeleteAll(d,&d->objectEntry);
337 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
338 FREE(d->mbSend.cdrStream.buffer);
340 debug(29,10) ("ORTEDomainMgrDestroy: finished\n");