* $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
*
* DEBUG: section 33 Functions working over subscription
- * AUTHOR: Petr Smolik petr.smolik@wo.cz
*
- * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
+ * -------------------------------------------------------------------
+ * ORTE
+ * Open Real-Time Ethernet
+ *
+ * Copyright (C) 2001-2006
+ * Department of Control Engineering FEE CTU Prague, Czech Republic
+ * http://dce.felk.cvut.cz
+ * http://www.ocera.org
+ *
+ * Author: Petr Smolik petr.smolik@wo.cz
+ * Advisor: Pavel Pisa
+ * Project Responsible: Zdenek Hanzalek
* --------------------------------------------------------------------
*
* This program is free software; you can redistribute it and/or modify
*
*/
-#include "orte.h"
+#include "orte_all.h"
GAVL_CUST_NODE_INT_IMP(SubscriptionList,
PSEntry, ObjectEntryOID, GUID_RTPS,
ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
const char *topic,const char *typeName,void *instance,NtpTime *deadline,
NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
- void *recvCallBackParam) {
+ void *recvCallBackParam, IPAddress multicastIPAddress) {
GUID_RTPS guid;
CSTReader *cstReader;
CSTReaderParams cstReaderParams;
return NULL;
}
pthread_rwlock_wrlock(&d->subscriptions.lock);
+ // join to multicast group
+ if (IN_MULTICAST(multicastIPAddress)) {
+ char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+ struct ip_mreq mreq;
+
+ mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
+ mreq.imr_interface.s_addr=htonl(INADDR_ANY);
+ if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
+ IP_ADD_MEMBERSHIP,(void *) &mreq, sizeof(mreq))>=0) {
+ debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
+ IPAddressToString(multicastIPAddress,sIPAddress));
+ }
+ }
//generate new guid of publisher
d->subscriptions.counter++;
guid.hid=d->guid.hid;guid.aid=d->guid.aid;
strcpy(sp->typeName,typeName);
sp->deadline=*deadline;
sp->minimumSeparation=*minimumSeparation;
+ sp->multicast=multicastIPAddress;
switch (sType) {
case BEST_EFFORTS:
sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
parameterUpdateCSChangeFromSubscription(csChange,sp);
csChange->guid=guid;
csChange->alive=ORTE_TRUE;
- csChange->cdrStream.buffer=NULL;
+ CDR_codec_init_static(&csChange->cdrCodec);
CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
pthread_rwlock_unlock(&d->writerSubscriptions.lock);
pthread_rwlock_unlock(&d->subscriptions.lock);
pthread_rwlock_unlock(&d->typeEntry.lock);
pthread_rwlock_unlock(&d->objectEntry.objRootLock);
pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
+
return cstReader;
}
CSChangeAttributes_init_head(csChange);
csChange->guid=cstReader->guid;
csChange->alive=ORTE_FALSE;
- csChange->cdrStream.buffer=NULL;
+ csChange->cdrCodec.buffer=NULL;
CSTWriterAddCSChange(cstReader->domain,
&cstReader->domain->writerSubscriptions,
csChange);
parameterUpdateCSChangeFromSubscription(csChange,sp);
csChange->guid=cstReader->guid;
csChange->alive=ORTE_TRUE;
- csChange->cdrStream.buffer=NULL;
+ csChange->cdrCodec.buffer=NULL;
CSTWriterAddCSChange(cstReader->domain,
&cstReader->domain->writerSubscriptions,csChange);
pthread_rwlock_unlock(&cstReader->lock);
ORTESubscriptionWaitForPublications(ORTESubscription *cstReader,NtpTime wait,
unsigned int retries,unsigned int noPublications) {
unsigned int wPublications;
- u_int32_t sec,ms;
+ uint32_t sec,ms;
if (!cstReader) return ORTE_BAD_HANDLE;
NtpTimeDisAssembToMs(sec,ms,wait);
pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
return ORTE_OK;
}
+
+
+/*****************************************************************************/
+inline void *
+ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
+ return cstReader->objectEntryOID->instance;
+}
+