2 * $Id: ORTEDomainApp.c,v 0.0.0.1 2003/08/21
4 * DEBUG: section 21 Domain application
5 * AUTHOR: Petr Smolik petr.smolik@wo.cz
7 * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
8 * --------------------------------------------------------------------
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
24 /*****************************************************************************/
26 ORTEDomainAppCreate(int domain, ORTEDomainProp *prop,
27 ORTEDomainAppEvents *events) {
29 ObjectEntryOID *objectEntryOID;
31 CSTWriterParams cstWriterParams;
32 CSTReaderParams cstReaderParams;
33 char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
37 debug(21,10) ("ORTEDomainAppCreate: start\n");
38 //Create domainApplication
39 d=MALLOC(sizeof(ORTEDomain));
40 if (!d) return NULL; //no memory
41 //initialization local values
43 d->taskRecvMetatraffic.terminate=ORTE_FALSE;
44 d->taskRecvUserdata.terminate=ORTE_FALSE;
45 d->taskSend.terminate=ORTE_FALSE;
46 d->taskRecvMetatraffic.sock.port=0;
47 d->taskRecvUserdata.sock.port=0;
48 d->taskSend.sock.port=0;
49 PublParamsInit(&d->publPropDefault);
50 SubsParamsInit(&d->subsPropDefault);
51 //init structure objectEntry
52 ObjectEntryHID_init_root_field(&d->objectEntry);
53 pthread_rwlock_init(&d->objectEntry.objRootLock,NULL);
54 htimerRoot_init_queue(&d->objectEntry);
55 pthread_rwlock_init(&d->objectEntry.htimRootLock,NULL);
56 pthread_mutex_init(&d->objectEntry.htimSendMutex, NULL);
57 //publication,subscriptions
58 d->publications.counter=d->subscriptions.counter=0;
59 CSTWriter_init_root_field(&d->publications);
60 CSTReader_init_root_field(&d->subscriptions);
61 pthread_rwlock_init(&d->publications.lock,NULL);
62 pthread_rwlock_init(&d->subscriptions.lock,NULL);
63 //publication,subscriptions lists
64 PublicationList_init_root_field(&d->psEntry);
65 pthread_rwlock_init(&d->psEntry.publicationsLock,NULL);
66 SubscriptionList_init_root_field(&d->psEntry);
67 pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
70 ORTEPatternRegister(d,ORTEPatternCheckDefault,ORTEPatternMatchDefault,NULL);
71 SubscriptionPattern_init_head(&d->patternEntry);
75 memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
77 ORTEDomainPropDefaultGet(&d->domainProp);
80 //print local IP addresses
82 if (d->domainProp.IFCount) {
83 for(i=0;i<d->domainProp.IFCount;i++)
84 strcat(iflocal,IPAddressToString(d->domainProp.IFProp[i].ipAddress));
85 debug(21,2) ("ORTEDomainAppCreate: localIPAddres(es) %s\n",iflocal);
87 debug(21,2) ("ORTEDomainAppCreate: no activ interface card\n");
92 memcpy(&d->domainEvents,events,sizeof(ORTEDomainAppEvents));
94 memset(&d->domainEvents,0,sizeof(ORTEDomainAppEvents));
98 d->mbRecvMetatraffic.cdrStream.buffer=
99 (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
100 d->mbRecvUserdata.cdrStream.buffer=
101 (u_int8_t*)MALLOC(d->domainProp.recvBuffSize);
102 d->mbSend.cdrStream.buffer=
103 (u_int8_t*)MALLOC(d->domainProp.sendBuffSize);
104 if ((!d->mbRecvMetatraffic.cdrStream.buffer) ||
105 (!d->mbRecvUserdata.cdrStream.buffer) ||
106 (!d->mbSend.cdrStream.buffer)) { //no memory
107 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
108 FREE(d->mbRecvUserdata.cdrStream.buffer);
109 FREE(d->mbSend.cdrStream.buffer);
113 d->mbRecvMetatraffic.cdrStream.bufferPtr=d->mbRecvMetatraffic.cdrStream.buffer;
114 d->mbRecvMetatraffic.cdrStream.length=0;
115 d->mbRecvUserdata.cdrStream.bufferPtr=d->mbRecvUserdata.cdrStream.buffer;
116 d->mbRecvUserdata.cdrStream.length=0;
117 d->mbSend.cdrStream.bufferPtr=d->mbSend.cdrStream.buffer;
118 d->mbSend.cdrStream.length=0;
121 ORTEType_init_root_field(&d->typeEntry);
122 pthread_rwlock_init(&d->typeEntry.lock,NULL);
125 sock_init_udp(&d->taskRecvMetatraffic.sock);
126 sock_init_udp(&d->taskRecvUserdata.sock);
127 sock_init_udp(&d->taskSend.sock);
128 if (d->domainProp.multicast.enabled) {
129 Domain2PortMulticastMetatraffic(d->domain,port);
131 Domain2Port(d->domain,port);
133 sock_bind(&d->taskRecvMetatraffic.sock,0); //give me receiving port (metatraffic)
134 debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvMetatraffic):%u\n",
135 d->taskRecvMetatraffic.sock.port);
136 sock_bind(&d->taskRecvUserdata.sock,0); //give me receiving port (userdata)
137 debug(21,2) ("ORTEDomainAppCreate: Bind on port(RecvUserdata):%u\n",
138 d->taskRecvUserdata.sock.port);
139 sock_bind(&d->taskSend.sock,0); //give me sending port
140 debug(21,2) ("ORTEDomainAppCreate: Bind on port(Send):%u\n",
141 d->taskSend.sock.port);
142 if ((d->taskRecvMetatraffic.sock.fd<0) ||
143 (d->taskRecvUserdata.sock.fd<0) ||
144 (d->taskSend.sock.fd<0)) {
145 debug(21,0) ("Error creating socket(s).\n");
146 sock_cleanup(&d->taskRecvMetatraffic.sock);
147 sock_cleanup(&d->taskRecvUserdata.sock);
148 sock_cleanup(&d->taskSend.sock);
149 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
150 FREE(d->mbRecvUserdata.cdrStream.buffer);
151 FREE(d->mbSend.cdrStream.buffer);
155 if (d->domainProp.multicast.enabled) {
158 if(sock_setsockopt(&d->taskSend.sock,IP_MULTICAST_TTL,
159 &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
160 debug(21,2) ("ORTEDomainAppCreate: ttl set on: %u\n",
161 d->domainProp.multicast.ttl);
163 // join multicast group
164 mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
165 mreq.imr_interface.s_addr=htonl(INADDR_ANY);
166 if(sock_setsockopt(&d->taskRecvUserdata.sock,IP_ADD_MEMBERSHIP,
167 (void *) &mreq, sizeof(mreq))>=0) {
168 debug(21,2) ("ORTEDomainAppCreate: listening to mgroup %s\n",
169 IPAddressToString(d->domainProp.multicast.ipAddress));
173 //Generates local GUID
174 if (d->domainProp.IFCount>0)
175 d->guid.hid=d->domainProp.IFProp[0].ipAddress;
177 d->guid.hid=StringToIPAddress("127.0.0.1");
178 d->guid.aid=(d->taskSend.sock.port<<8)+MANAGEDAPPLICATION;
180 debug(29,2) ("ORTEDomainAppCreate: GUID: %#10.8x,%#10.8x,%#10.8x\n",
181 d->guid.hid,d->guid.aid,d->guid.oid);
183 //create HEADER of message for sending task
184 RTPSHeaderCreate(d->mbSend.cdrStream.buffer,d->guid.hid,d->guid.aid);
185 d->mbSend.cdrStream.bufferPtr=
186 d->mbSend.cdrStream.buffer+RTPS_HEADER_LENGTH;
187 d->mbSend.cdrStream.length=RTPS_HEADER_LENGTH;
188 d->mbSend.needSend=ORTE_FALSE;
189 d->mbSend.containsInfoReply=ORTE_FALSE;
191 //Self object data & fellow managers object data
192 appParams=(AppParams*)MALLOC(sizeof(AppParams));
193 AppParamsInit(appParams);
194 appParams->expirationTime=d->domainProp.baseProp.expirationTime;
195 VENDOR_ID_OCERA(appParams->vendorId);
196 appParams->hostId=d->guid.hid;
197 appParams->appId=d->guid.aid;
198 appParams->metatrafficUnicastPort=d->taskRecvMetatraffic.sock.port;
199 appParams->userdataUnicastPort=d->taskRecvUserdata.sock.port;
200 if (d->domainProp.multicast.enabled) {
202 for(i=0;i<d->domainProp.IFCount;i++)
203 appParams->metatrafficMulticastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
204 appParams->metatrafficMulticastIPAddressCount=d->domainProp.IFCount;
207 if (d->domainProp.IFCount) {
208 for(i=0;i<d->domainProp.IFCount;i++)
209 appParams->unicastIPAddressList[i]=d->domainProp.IFProp[i].ipAddress;
210 appParams->unicastIPAddressCount=d->domainProp.IFCount;
212 appParams->unicastIPAddressList[0]=StringToIPAddress("127.0.0.1");
213 appParams->unicastIPAddressCount=1;
217 appParams->managerKeyList[0]=StringToIPAddress("127.0.0.1");
218 for(i=0;i<d->domainProp.IFCount;i++)
219 appParams->managerKeyList[i+1]=d->domainProp.IFProp[i].ipAddress;
220 appParams->managerKeyCount=d->domainProp.IFCount+1;
221 d->appParams=appParams;
222 //insert object, doesn't need to be locked
223 d->objectEntryOID=objectEntryAdd(d,&d->guid,(void*)appParams);
224 d->objectEntryOID->private=ORTE_TRUE;
227 // writerApplicationSelf (WAS)
228 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
229 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
230 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
231 NTPTIME_ZERO(cstWriterParams.delayResponceTime);
232 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
233 cstWriterParams.fullAcknowledge=ORTE_TRUE;
234 CSTWriterInit(d,&d->writerApplicationSelf,d->objectEntryOID,
235 OID_WRITE_APPSELF,&cstWriterParams,NULL);
236 // add to WAS remote writer(s)
237 if (d->domainProp.appLocalManager) {
239 guid.hid=d->domainProp.appLocalManager;
240 guid.aid=AID_UNKNOWN;
242 if (!objectEntryFind(d,&guid)) {
243 appParams=(AppParams*)MALLOC(sizeof(AppParams));
244 AppParamsInit(appParams);
245 appParams->hostId=guid.hid;
246 appParams->appId=guid.aid;
247 appParams->metatrafficUnicastPort=port;
248 appParams->userdataUnicastPort=0; //Manager support only metatraffic
249 appParams->unicastIPAddressList[0]=d->domainProp.appLocalManager;
250 appParams->unicastIPAddressCount=1;
251 objectEntryOID=objectEntryAdd(d,&guid,(void*)appParams);
252 CSTWriterAddRemoteReader(d,&d->writerApplicationSelf,objectEntryOID,
254 debug(21,2) ("ORTEDomainAppCreate: add fellow manager (%s)\n",
255 IPAddressToString(d->domainProp.appLocalManager));
259 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
260 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
261 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
262 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
263 cstReaderParams.fullAcknowledge=ORTE_TRUE;
264 CSTReaderInit(d,&d->readerManagers,d->objectEntryOID,
265 OID_READ_MGR,&cstReaderParams,NULL);
266 // readerApplications
267 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
268 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
269 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
270 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
271 cstReaderParams.fullAcknowledge=ORTE_TRUE;
272 CSTReaderInit(d,&d->readerApplications,d->objectEntryOID,
273 OID_READ_APP,&cstReaderParams,NULL);
274 // writerPublications
275 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
276 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
277 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
278 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
279 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
280 cstWriterParams.fullAcknowledge=ORTE_TRUE;
281 CSTWriterInit(d,&d->writerPublications,d->objectEntryOID,
282 OID_WRITE_PUBL,&cstWriterParams,NULL);
283 // writerSubscriptions
284 NTPTIME_ZERO(cstWriterParams.waitWhileDataUnderwayTime);
285 cstWriterParams.refreshPeriod=d->domainProp.baseProp.refreshPeriod;
286 cstWriterParams.repeatAnnounceTime=d->domainProp.baseProp.repeatAnnounceTime;
287 NtpTimeAssembFromMs(cstWriterParams.delayResponceTime,0,20);
288 cstWriterParams.HBMaxRetries=d->domainProp.baseProp.HBMaxRetries;
289 cstWriterParams.fullAcknowledge=ORTE_TRUE;
290 CSTWriterInit(d,&d->writerSubscriptions,d->objectEntryOID,
291 OID_WRITE_SUBS,&cstWriterParams,NULL);
292 // readerPublications
293 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
294 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
295 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
296 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
297 cstReaderParams.fullAcknowledge=ORTE_TRUE;
298 CSTReaderInit(d,&d->readerPublications,d->objectEntryOID,
299 OID_READ_PUBL,&cstReaderParams,NULL);
300 // readerSubscriptions
301 cstReaderParams.delayResponceTimeMin=d->domainProp.baseProp.delayResponceTimeACKMin;
302 cstReaderParams.delayResponceTimeMax=d->domainProp.baseProp.delayResponceTimeACKMax;
303 cstReaderParams.ACKMaxRetries=d->domainProp.baseProp.ACKMaxRetries;
304 cstReaderParams.repeatActiveQueryTime=d->domainProp.baseProp.repeatActiveQueryTime;
305 cstReaderParams.fullAcknowledge=ORTE_TRUE;
306 CSTReaderInit(d,&d->readerSubscriptions,d->objectEntryOID,
307 OID_READ_SUBS,&cstReaderParams,NULL);
309 //add csChange for WAS
310 appSelfParamChanged(d,ORTE_FALSE,ORTE_FALSE,ORTE_FALSE);
313 pthread_mutex_lock(&d->objectEntry.htimSendMutex);
314 pthread_create(&d->taskRecvMetatraffic.thread, NULL,
315 (void*)&ORTEAppRecvMetatrafficThread, (void *)d);
316 pthread_create(&d->taskRecvUserdata.thread, NULL,
317 (void*)&ORTEAppRecvUserdataThread, (void *)d);
318 pthread_create(&d->taskSend.thread, NULL,
319 (void*)&ORTEAppSendThread, (void *)d);
321 debug(21,10) ("ORTEDomainAppCreate: finished\n");
325 /*****************************************************************************/
327 ORTEDomainAppDestroy(ORTEDomain *d) {
328 CSTWriter *cstWriter;
329 CSTReader *cstReader;
331 debug(21,10) ("ORTEDomainAppDestroy: start\n");
332 if (!d) return ORTE_FALSE;
334 d->taskRecvMetatraffic.terminate=ORTE_TRUE;
335 d->taskRecvUserdata.terminate=ORTE_TRUE;
336 d->taskSend.terminate=ORTE_TRUE;
337 ORTEDomainWakeUpReceivingThread(d,
338 &d->taskSend.sock,d->taskRecvMetatraffic.sock.port);
339 pthread_join(d->taskRecvMetatraffic.thread,NULL);
340 ORTEDomainWakeUpReceivingThread(d,
341 &d->taskSend.sock,d->taskRecvUserdata.sock.port);
342 pthread_join(d->taskRecvUserdata.thread,NULL);
343 ORTEDomainWakeUpSendingThread(&d->objectEntry);
344 pthread_join(d->taskSend.thread,NULL);
345 debug(21,3) ("ORTEDomainAppDestroy: threads stoped\n");
348 sock_cleanup(&d->taskRecvMetatraffic.sock);
349 sock_cleanup(&d->taskRecvUserdata.sock);
350 sock_cleanup(&d->taskSend.sock);
353 pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
356 pthread_rwlock_destroy(&d->objectEntry.objRootLock);
357 pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
358 pthread_rwlock_destroy(&d->publications.lock);
359 pthread_rwlock_destroy(&d->subscriptions.lock);
360 pthread_rwlock_destroy(&d->psEntry.publicationsLock);
361 pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
364 ORTETypeRegisterDestroyAll(d);
365 pthread_rwlock_destroy(&d->typeEntry.lock);
368 ORTEDomainAppSubscriptionPatternDestroyAll(d);
370 //CSTReaders and CSTWriters
371 CSTWriterDelete(d,&d->writerApplicationSelf);
372 CSTReaderDelete(d,&d->readerManagers);
373 CSTReaderDelete(d,&d->readerApplications);
374 CSTWriterDelete(d,&d->writerPublications);
375 CSTWriterDelete(d,&d->writerSubscriptions);
376 CSTReaderDelete(d,&d->readerPublications);
377 CSTReaderDelete(d,&d->readerSubscriptions);
378 gavl_cust_for_each(CSTWriter,&d->publications,cstWriter) {
379 CSTWriterDelete(d,cstWriter);
382 gavl_cust_for_each(CSTReader,&d->subscriptions,cstReader) {
383 CSTReaderDelete(d,cstReader);
387 //objects in objectsEntry
388 objectEntryDeleteAll(d,&d->objectEntry);
390 FREE(d->mbRecvMetatraffic.cdrStream.buffer);
391 FREE(d->mbRecvUserdata.cdrStream.buffer);
392 FREE(d->mbSend.cdrStream.buffer);
394 debug(21,10) ("ORTEDomainAppDestroy: finished\n");
398 /*****************************************************************************/
400 ORTEDomainAppSubscriptionPatternAdd(ORTEDomain *d,const char *topic,
401 const char *type,ORTESubscriptionPatternCallBack subscriptionCallBack,
403 SubscriptionPatternNode *psnode;
405 if (!d) return ORTE_FALSE;
406 psnode=(SubscriptionPatternNode*)MALLOC(sizeof(SubscriptionPatternNode));
407 strcpy(psnode->topic,topic);
408 strcpy(psnode->type,type);
409 psnode->subscriptionCallBack=subscriptionCallBack;
411 SubscriptionPattern_insert(&d->patternEntry,psnode);
415 /*****************************************************************************/
417 ORTEDomainAppSubscriptionPatternRemove(ORTEDomain *d,const char *topic,
419 SubscriptionPatternNode *psnode;
421 if (!d) return ORTE_FALSE;
422 ul_list_for_each(SubscriptionPattern,&d->patternEntry,psnode) {
423 if ((strcmp(psnode->topic,topic)==0) &&
424 (strcmp(psnode->type,type)==0)) {
425 SubscriptionPattern_delete(&d->patternEntry,psnode);
433 /*****************************************************************************/
435 ORTEDomainAppSubscriptionPatternDestroyAll(ORTEDomain *d) {
436 SubscriptionPatternNode *psnode;
438 if (!d) return ORTE_FALSE;
439 while((psnode=SubscriptionPattern_cut_first(&d->patternEntry))) {