]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTEDomain.c
updated email address - petr@smoliku.cz
[orte.git] / orte / liborte / ORTEDomain.c
index 26206db9e080a8e71977c43415a09a305da576c2..be4e0b1473deee5837b57f2fdee0c094976e51e3 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: ORTEDomain.c,v 0.0.0.1         2003/08/21
  *
  *  DEBUG:  section 30                  Domain functions
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr@smoliku.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
@@ -110,12 +120,15 @@ ORTEDomainPropDefaultGet(ORTEDomainProp *prop) {
 
   //IFProp
   sock_init_udp(&sock);
-  sock_bind(&sock,0);
-  sock_get_local_interfaces(&sock,prop->IFProp,&prop->IFCount);
+  if (sock_bind(&sock,0,INADDR_ANY) == -1) {
+    return ORTE_FALSE;
+  }
+  sock_get_local_interfaces(&sock,prop->IFProp, (char *)&prop->IFCount);
   sock_cleanup(&sock); 
 
   prop->mgrs=NULL; //only from localhost
   prop->appLocalManager=StringToIPAddress("127.0.0.1");
+  prop->listen=INADDR_ANY;
   prop->keys=NULL; //are assign be orte
   sprintf(prop->version,ORTE_PACKAGE_STRING\
                         ", compiled: "\
@@ -174,7 +187,7 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   char              sbuff[128];
   int               i;
   uint16_t          port=0;
-  Boolean           error=ORTE_FALSE;
+  int              errno_save = 0;
 
   debug(30,2)  ("ORTEDomainCreate: %s compiled: %s,%s\n",
                 ORTE_PACKAGE_STRING,__DATE__,__TIME__);
@@ -208,6 +221,7 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   pthread_cond_init(&d->objectEntry.htimSendCond,NULL);
   pthread_mutex_init(&d->objectEntry.htimSendMutex,NULL);
   d->objectEntry.htimSendCondValue=0;
+  d->objectEntry.htimNeedWakeUp=ORTE_TRUE;
   //publication,subscriptions
   d->publications.counter=d->subscriptions.counter=0;
   CSTWriter_init_root_field(&d->publications);
@@ -229,7 +243,9 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   if (prop!=NULL) {
     memcpy(&d->domainProp,prop,sizeof(ORTEDomainProp));
   } else {
-    ORTEDomainPropDefaultGet(&d->domainProp);
+    if (!ORTEDomainPropDefaultGet(&d->domainProp)) {
+      goto err_domainProp;
+    }
   }
   
   //print local IP addresses
@@ -242,8 +258,7 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
     debug(30,2) ("ORTEDomainCreate: no active interface card\n");
     if (d->domainProp.multicast.enabled) {
        debug(30,0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
-       FREE(d);
-       return NULL;
+       goto err_domainProp;
     }
   }
 
@@ -292,10 +307,43 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   /* UnicastMetatraffic */
   Domain2Port(d->domain,port);
   if (manager) {
-    sock_bind(&d->taskRecvUnicastMetatraffic.sock,port); 
+    if (d->domainProp.multicast.enabled) {
+      char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+      struct ip_mreq mreq;
+      int reuse=1,loop=0;
+    
+      //reuseaddr
+      sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, SOL_SOCKET, 
+                   SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
+      debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
+                   reuse);
+
+      //multicast loop
+      sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock, IPPROTO_IP, 
+                   IP_MULTICAST_LOOP, (const char *)&loop, 
+                   sizeof(loop));
+      debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
+                 loop);
+      
+      //joint to multicast group
+      memset(&mreq, 0, sizeof(mreq));
+      mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
+      mreq.imr_interface.s_addr=htonl(INADDR_ANY);
+      if(sock_setsockopt(&d->taskRecvUnicastMetatraffic.sock,IPPROTO_IP,
+         IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
+        debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
+                      IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
+      } else
+             goto err_sock;
+    }
+    if (sock_bind(&d->taskRecvUnicastMetatraffic.sock,port,d->domainProp.listen) == -1) {
+      goto err_sock;
+    }
   } else {
     /* give me receiving port (metatraffic) */
-    sock_bind(&d->taskRecvUnicastMetatraffic.sock,0); 
+    if (sock_bind(&d->taskRecvUnicastMetatraffic.sock,0,d->domainProp.listen) == -1) {
+      goto err_sock;
+    }
   }
   debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
                d->taskRecvUnicastMetatraffic.sock.port);
@@ -303,25 +351,38 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   /************************************************************************/
   /* MulticastMetatraffic */
   if (d->domainProp.multicast.enabled && !manager) {
+    char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+    struct ip_mreq mreq;
     Port mport;
     int reuse=1;
     
     //reuseaddr
     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET, 
-                   SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
+                   SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
     debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
                  reuse);
 
     //multicast loop
     sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP, 
-                   IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
+                   IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled, 
                    sizeof(d->domainProp.multicast.loopBackEnabled));
     debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
                  d->domainProp.multicast.loopBackEnabled);
-
+    
+    //joint to multicast group
+    mreq.imr_multiaddr.s_addr=htonl(d->domainProp.multicast.ipAddress);
+    mreq.imr_interface.s_addr=htonl(INADDR_ANY);
+    if(sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock,IPPROTO_IP,
+        IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
+      debug(30,2) ("ORTEDomainCreate: joint to mgroup %s\n",
+                    IPAddressToString(d->domainProp.multicast.ipAddress,sIPAddress));
+    }
+    
     /* receiving multicast port (metatraffic) */
     Domain2PortMulticastMetatraffic(d->domain,mport);
-    sock_bind(&d->taskRecvMulticastMetatraffic.sock,mport); 
+    if (sock_bind(&d->taskRecvMulticastMetatraffic.sock,(uint16_t)mport,d->domainProp.listen) == -1) {
+      goto err_sock;
+    }
     debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
                   d->taskRecvMulticastMetatraffic.sock.port);
   }
@@ -330,7 +391,9 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   /* UserData */
   if (!manager) {
     /* give me receiving port (userdata) */
-    sock_bind(&d->taskRecvUnicastUserdata.sock,0); 
+    if (sock_bind(&d->taskRecvUnicastUserdata.sock,0,d->domainProp.listen) == -1) {
+      goto err_sock;
+    }
     debug(30,2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
                   d->taskRecvUnicastUserdata.sock.port);
 
@@ -340,20 +403,22 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
     
       //reuseaddr
       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET, 
-                     SO_REUSEADDR, (char*)&reuse, sizeof(reuse));
+                     SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
       debug(30,2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
                      reuse);
 
       //multicast loop
       sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP, 
-                     IP_MULTICAST_LOOP, &d->domainProp.multicast.loopBackEnabled, 
+                     IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled, 
                      sizeof(d->domainProp.multicast.loopBackEnabled));
       debug(30,2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
                    d->domainProp.multicast.loopBackEnabled);
       
       /* receiving multicast port (userdata) */
       Domain2PortMulticastUserdata(d->domain,mport);
-      sock_bind(&d->taskRecvMulticastUserdata.sock,mport); 
+      if (sock_bind(&d->taskRecvMulticastUserdata.sock,(uint16_t)mport,d->domainProp.listen) == -1) {
+       goto err_sock;
+      }
       debug(30,2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
                     d->taskRecvMulticastUserdata.sock.port);
     }
@@ -362,13 +427,14 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   /************************************************************************/
   /* Send */
   /* give me sending port */
-  sock_bind(&d->taskSend.sock,0);         
+  if (sock_bind(&d->taskSend.sock,0,d->domainProp.listen) == -1) {
+    goto err_sock;
+  }
   debug(30,2) ("ORTEDomainCreate: bind on port(Send): %u\n",
                d->taskSend.sock.port);
   if (d->domainProp.multicast.enabled) {
     //ttl
-    if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL, 
-        &d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
+    if(sock_setsockopt(&d->taskSend.sock,IPPROTO_IP,IP_MULTICAST_TTL, (const char *)&d->domainProp.multicast.ttl,sizeof(d->domainProp.multicast.ttl))>=0) {
       debug(30,2) ("ORTEDomainCreate: ttl set on: %u\n",
            d->domainProp.multicast.ttl);
     } 
@@ -385,7 +451,7 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
       (d->domainProp.multicast.enabled && 
        (d->taskRecvMulticastMetatraffic.sock.fd<0))) {
     debug(30,0) ("ORTEDomainCreate: Error creating socket(s).\n");
-    error=ORTE_TRUE;
+    goto err_sock;
   }
 
   if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) || 
@@ -397,22 +463,8 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
       (d->domainProp.multicast.enabled && !manager &&
        !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) {    //no a memory
     debug(30,0) ("ORTEDomainCreate: Error creating buffer(s).\n");
-    error=ORTE_TRUE;
+    goto err_sock;
   } 
-  /* a problem occure with resources */
-  if (error) {
-    sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
-    sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
-    sock_cleanup(&d->taskRecvUnicastUserdata.sock);
-    sock_cleanup(&d->taskRecvMulticastUserdata.sock);
-    sock_cleanup(&d->taskSend.sock);
-    CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
-    CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
-    CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
-    CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
-    CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
-    FREE(d);
-  }
 
   /************************************************************************/
   //Generates local GUID
@@ -437,6 +489,9 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
   
   //Self object data & fellow managers object data
   appParams=(AppParams*)MALLOC(sizeof(AppParams));
+  if (!appParams) {
+    goto err_sock;
+  }
   AppParamsInit(appParams);
   appParams->expirationTime=d->domainProp.baseProp.expirationTime;
   VENDOR_ID_OCERA(appParams->vendorId);
@@ -663,6 +718,40 @@ ORTEDomainCreate(int domain, ORTEDomainProp *prop,
 
   debug(30,10) ("ORTEDomainCreate: finished\n");
   return d;
+
+//err:
+  if (!errno_save) errno_save = errno;
+  /* TODO */
+  FREE(appParams);
+err_sock:
+  if (!errno_save) errno_save = errno;
+  sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
+  sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
+  sock_cleanup(&d->taskRecvUnicastUserdata.sock);
+  sock_cleanup(&d->taskRecvMulticastUserdata.sock);
+  sock_cleanup(&d->taskSend.sock);
+  pthread_rwlock_destroy(&d->typeEntry.lock);
+  if (d->domainProp.multicast.enabled) {
+    CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
+    CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
+  }
+  if (!manager) {
+    CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
+  }
+  CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
+err_domainProp:
+  if (!errno_save) errno_save = errno;
+  pthread_rwlock_init(&d->patternEntry.lock,NULL);
+  pthread_rwlock_init(&d->psEntry.subscriptionsLock,NULL);
+  pthread_rwlock_destroy(&d->psEntry.publicationsLock);
+  pthread_rwlock_destroy(&d->subscriptions.lock);
+  pthread_rwlock_destroy(&d->publications.lock);
+  pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
+  pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
+  pthread_rwlock_destroy(&d->objectEntry.objRootLock);
+  FREE(d);
+  errno = errno_save;
+  return NULL;
 }
 
 /*****************************************************************************/
@@ -688,34 +777,6 @@ ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
   ORTEDomainSendThreadStop(&d->taskSend);
   debug(30,3) ("ORTEDomainDestroy: threads stoped\n");
   
-  //Sockets
-  sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
-  sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
-  sock_cleanup(&d->taskRecvUnicastUserdata.sock);
-  sock_cleanup(&d->taskRecvMulticastUserdata.sock);
-  sock_cleanup(&d->taskSend.sock);
-
-  //Signals
-  pthread_cond_destroy(&d->objectEntry.htimSendCond);
-  pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
-
-  //rwLocks
-  pthread_rwlock_destroy(&d->objectEntry.objRootLock);
-  pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
-  pthread_rwlock_destroy(&d->publications.lock);
-  pthread_rwlock_destroy(&d->subscriptions.lock);
-  pthread_rwlock_destroy(&d->psEntry.publicationsLock);
-  pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
-
-  //TypeRegister
-  ORTETypeRegisterDestroyAll(d);
-  pthread_rwlock_destroy(&d->typeEntry.lock);
-  
-  //Pattern
-  ORTEDomainAppSubscriptionPatternDestroy(d);
-  pthread_rwlock_unlock(&d->typeEntry.lock);    
-  pthread_rwlock_destroy(&d->patternEntry.lock);
-  
   //CSTReaders and CSTWriters
   CSTWriterDelete(d,&d->writerApplicationSelf);
   CSTReaderDelete(d,&d->readerManagers);
@@ -741,13 +802,46 @@ ORTEDomainDestroy(ORTEDomain *d,Boolean manager) {
     
   //objects in objectsEntry
   objectEntryDeleteAll(d,&d->objectEntry);
+  debug(30,3) ("ORTEDomainDestroy: deleted all objects\n");
+
+  //Sockets
+  sock_cleanup(&d->taskRecvUnicastMetatraffic.sock);
+  sock_cleanup(&d->taskRecvMulticastMetatraffic.sock);
+  sock_cleanup(&d->taskRecvUnicastUserdata.sock);
+  sock_cleanup(&d->taskRecvMulticastUserdata.sock);
+  sock_cleanup(&d->taskSend.sock);
+
+
+  //Signals
+  pthread_cond_destroy(&d->objectEntry.htimSendCond);
+  pthread_mutex_destroy(&d->objectEntry.htimSendMutex);
+
+  //rwLocks
+  pthread_rwlock_destroy(&d->objectEntry.objRootLock);
+  pthread_rwlock_destroy(&d->objectEntry.htimRootLock);
+  pthread_rwlock_destroy(&d->publications.lock);
+  pthread_rwlock_destroy(&d->subscriptions.lock);
+  pthread_rwlock_destroy(&d->psEntry.publicationsLock);
+  pthread_rwlock_destroy(&d->psEntry.subscriptionsLock);
+
+  //TypeRegister
+  ORTETypeRegisterDestroyAll(d);
   
+  //Pattern
+  ORTEDomainAppSubscriptionPatternDestroy(d);
+  pthread_rwlock_destroy(&d->patternEntry.lock);
+  
+  //Release buffers  
   CDR_codec_release_buffer(&d->taskRecvUnicastMetatraffic.mb.cdrCodec);
   CDR_codec_release_buffer(&d->taskRecvMulticastMetatraffic.mb.cdrCodec);
   CDR_codec_release_buffer(&d->taskRecvUnicastUserdata.mb.cdrCodec);
   CDR_codec_release_buffer(&d->taskRecvMulticastUserdata.mb.cdrCodec);
   CDR_codec_release_buffer(&d->taskSend.mb.cdrCodec);
+  
+  //Free domain instance
   FREE(d);
+  
   debug(30,10) ("ORTEDomainDestroy: finished\n");
+  
   return ORTE_TRUE;
 }