]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTESubscription.c
Add missing FREE()
[orte.git] / orte / liborte / ORTESubscription.c
index 5f0e041f429424f821374b0f7be81994f32f806c..3cbc8596a20b7cea3902b2940291f4110e5e18ee 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
  *
  *  DEBUG:  section 33                  Functions working over subscription
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr@smoliku.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
@@ -19,7 +29,7 @@
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
 
 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
                        PSEntry, ObjectEntryOID, GUID_RTPS,
@@ -30,7 +40,7 @@ ORTESubscription *
 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
     const char *topic,const char *typeName,void *instance,NtpTime *deadline,
     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
-    void *recvCallBackParam) {
+    void *recvCallBackParam, IPAddress multicastIPAddress) {
   GUID_RTPS             guid;
   CSTReader             *cstReader;
   CSTReaderParams       cstReaderParams;
@@ -50,19 +60,34 @@ ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sTyp
     pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
     printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
             ser./deser. function for a given typeName!!!\n");
+    FREE(cstReader);
     return NULL;
   }  
   pthread_rwlock_wrlock(&d->subscriptions.lock);
+  // join to multicast group
+  if (IN_MULTICAST(multicastIPAddress)) {
+    char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+    struct ip_mreq mreq;
+
+    mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
+    mreq.imr_interface.s_addr=htonl(INADDR_ANY);
+    if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
+         IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
+        debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
+                      IPAddressToString(multicastIPAddress,sIPAddress));
+    }
+  }
   //generate new guid of publisher
   d->subscriptions.counter++;
   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
   guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
   sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
   memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
-  strcpy(sp->topic,topic);
-  strcpy(sp->typeName,typeName);
+  strcpy((char *)sp->topic,topic);
+  strcpy((char *)sp->typeName,typeName);
   sp->deadline=*deadline;
   sp->minimumSeparation=*minimumSeparation;
+  sp->multicast=multicastIPAddress;
   switch (sType) {
     case BEST_EFFORTS:
       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
@@ -94,13 +119,14 @@ ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sTyp
   parameterUpdateCSChangeFromSubscription(csChange,sp);
   csChange->guid=guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  CDR_codec_init_static(&csChange->cdrCodec);
   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
   pthread_rwlock_unlock(&d->subscriptions.lock);
   pthread_rwlock_unlock(&d->typeEntry.lock);    
   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
+
   return cstReader;
 }
 
@@ -115,7 +141,7 @@ ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
   CSChangeAttributes_init_head(csChange);
   csChange->guid=cstReader->guid;
   csChange->alive=ORTE_FALSE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   CSTWriterAddCSChange(cstReader->domain,
                        &cstReader->domain->writerSubscriptions,
                        csChange);
@@ -166,7 +192,7 @@ ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
   parameterUpdateCSChangeFromSubscription(csChange,sp);
   csChange->guid=cstReader->guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   CSTWriterAddCSChange(cstReader->domain,
       &cstReader->domain->writerSubscriptions,csChange);
   pthread_rwlock_unlock(&cstReader->lock);
@@ -236,8 +262,8 @@ ORTESubscriptionPull(ORTESubscription *cstReader) {
           htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
       memset(&info,0,sizeof(info));
       info.status=DEADLINE;
-      info.topic=sp->topic;
-      info.type=sp->typeName;
+      info.topic=(char*)sp->topic;
+      info.type=(char*)sp->typeName;
       cstReader->objectEntryOID->recvCallBack(&info,
           cstReader->objectEntryOID->instance,
           cstReader->objectEntryOID->callBackParam);
@@ -254,3 +280,11 @@ ORTESubscriptionPull(ORTESubscription *cstReader) {
   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
   return ORTE_OK;
 }
+
+
+/*****************************************************************************/
+inline void *
+ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
+  return cstReader->objectEntryOID->instance;
+}
+