]> rtime.felk.cvut.cz Git - orte.git/blobdiff - orte/liborte/ORTESubscription.c
changed name to Open Real-time Ethernet, some source header arranging
[orte.git] / orte / liborte / ORTESubscription.c
index 0886b64835f2a554cd420560915378e27ff16555..7d8a5cdce27417e58fc690b95b49c5875ff824ba 100644 (file)
@@ -2,9 +2,19 @@
  *  $Id: ORTESubscription.c,v 0.0.0.1     2003/11/21
  *
  *  DEBUG:  section 33                  Functions working over subscription
- *  AUTHOR: Petr Smolik                 petr.smolik@wo.cz
  *
- *  ORTE - OCERA Real-Time Ethernet     http://www.ocera.org/
+ *  -------------------------------------------------------------------  
+ *                                ORTE                                 
+ *                      Open Real-Time Ethernet                       
+ *                                                                    
+ *                      Copyright (C) 2001-2006                       
+ *  Department of Control Engineering FEE CTU Prague, Czech Republic  
+ *                      http://dce.felk.cvut.cz                       
+ *                      http://www.ocera.org                          
+ *                                                                    
+ *  Author:             Petr Smolik    petr.smolik@wo.cz             
+ *  Advisor:            Pavel Pisa                                   
+ *  Project Responsible: Zdenek Hanzalek                              
  *  --------------------------------------------------------------------
  *
  *  This program is free software; you can redistribute it and/or modify
@@ -19,7 +29,7 @@
  *  
  */ 
 
-#include "orte.h"
+#include "orte_all.h"
 
 GAVL_CUST_NODE_INT_IMP(SubscriptionList, 
                        PSEntry, ObjectEntryOID, GUID_RTPS,
@@ -30,7 +40,7 @@ ORTESubscription *
 ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
     const char *topic,const char *typeName,void *instance,NtpTime *deadline,
     NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
-    void *recvCallBackParam) {
+    void *recvCallBackParam, IPAddress multicastIPAddress) {
   GUID_RTPS             guid;
   CSTReader             *cstReader;
   CSTReaderParams       cstReaderParams;
@@ -53,6 +63,19 @@ ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sTyp
     return NULL;
   }  
   pthread_rwlock_wrlock(&d->subscriptions.lock);
+  // join to multicast group
+  if (IN_MULTICAST(multicastIPAddress)) {
+    char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+    struct ip_mreq mreq;
+
+    mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
+    mreq.imr_interface.s_addr=htonl(INADDR_ANY);
+    if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
+         IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
+        debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
+                      IPAddressToString(multicastIPAddress,sIPAddress));
+    }
+  }
   //generate new guid of publisher
   d->subscriptions.counter++;
   guid.hid=d->guid.hid;guid.aid=d->guid.aid;
@@ -63,6 +86,7 @@ ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sTyp
   strcpy(sp->typeName,typeName);
   sp->deadline=*deadline;
   sp->minimumSeparation=*minimumSeparation;
+  sp->multicast=multicastIPAddress;
   switch (sType) {
     case BEST_EFFORTS:
       sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
@@ -94,13 +118,14 @@ ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sTyp
   parameterUpdateCSChangeFromSubscription(csChange,sp);
   csChange->guid=guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  CDR_codec_init_static(&csChange->cdrCodec);
   CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
   pthread_rwlock_unlock(&d->writerSubscriptions.lock);
   pthread_rwlock_unlock(&d->subscriptions.lock);
   pthread_rwlock_unlock(&d->typeEntry.lock);    
   pthread_rwlock_unlock(&d->objectEntry.objRootLock);
   pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
+
   return cstReader;
 }
 
@@ -115,7 +140,7 @@ ORTESubscriptionDestroyLocked(ORTESubscription *cstReader) {
   CSChangeAttributes_init_head(csChange);
   csChange->guid=cstReader->guid;
   csChange->alive=ORTE_FALSE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   CSTWriterAddCSChange(cstReader->domain,
                        &cstReader->domain->writerSubscriptions,
                        csChange);
@@ -166,7 +191,7 @@ ORTESubscriptionPropertiesSet(ORTESubscription *cstReader,ORTESubsProp *sp) {
   parameterUpdateCSChangeFromSubscription(csChange,sp);
   csChange->guid=cstReader->guid;
   csChange->alive=ORTE_TRUE;
-  csChange->cdrStream.buffer=NULL;
+  csChange->cdrCodec.buffer=NULL;
   CSTWriterAddCSChange(cstReader->domain,
       &cstReader->domain->writerSubscriptions,csChange);
   pthread_rwlock_unlock(&cstReader->lock);
@@ -181,7 +206,7 @@ int
 ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
     unsigned int retries,unsigned int noPublications) {
   unsigned int wPublications;
-  u_int32_t sec,ms;
+  uint32_t sec,ms;
 
   if (!cstReader) return ORTE_BAD_HANDLE;
   NtpTimeDisAssembToMs(sec,ms,wait);
@@ -254,3 +279,11 @@ ORTESubscriptionPull(ORTESubscription *cstReader) {
   pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
   return ORTE_OK;
 }
+
+
+/*****************************************************************************/
+inline void *
+ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
+  return cstReader->objectEntryOID->instance;
+}
+