* $Id: ORTESubscription.c,v 0.0.0.1 2003/11/21
*
* DEBUG: section 33 Functions working over subscription
- * AUTHOR: Petr Smolik petr.smolik@wo.cz
*
- * ORTE - OCERA Real-Time Ethernet http://www.ocera.org/
+ * -------------------------------------------------------------------
+ * ORTE
+ * Open Real-Time Ethernet
+ *
+ * Copyright (C) 2001-2006
+ * Department of Control Engineering FEE CTU Prague, Czech Republic
+ * http://dce.felk.cvut.cz
+ * http://www.ocera.org
+ *
+ * Author: Petr Smolik petr@smoliku.cz
+ * Advisor: Pavel Pisa
+ * Project Responsible: Zdenek Hanzalek
* --------------------------------------------------------------------
*
* This program is free software; you can redistribute it and/or modify
*
*/
-#include "orte.h"
+#include "orte_all.h"
GAVL_CUST_NODE_INT_IMP(SubscriptionList,
PSEntry, ObjectEntryOID, GUID_RTPS,
ORTESubscriptionCreate(ORTEDomain *d,SubscriptionMode mode,SubscriptionType sType,
const char *topic,const char *typeName,void *instance,NtpTime *deadline,
NtpTime *minimumSeparation,ORTERecvCallBack recvCallBack,
- void *recvCallBackParam) {
+ void *recvCallBackParam, IPAddress multicastIPAddress) {
GUID_RTPS guid;
CSTReader *cstReader;
CSTReaderParams cstReaderParams;
pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
printf("before call ORTESubscriptionCreateBestEffort is necessary to register \n\
ser./deser. function for a given typeName!!!\n");
+ FREE(cstReader);
return NULL;
}
pthread_rwlock_wrlock(&d->subscriptions.lock);
+ // join to multicast group
+ if (IN_MULTICAST(multicastIPAddress)) {
+ char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+ struct ip_mreq mreq;
+
+ mreq.imr_multiaddr.s_addr=htonl(multicastIPAddress);
+ mreq.imr_interface.s_addr=htonl(INADDR_ANY);
+ if(sock_setsockopt(&d->taskRecvMulticastUserdata.sock,IPPROTO_IP,
+ IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq))>=0) {
+ debug(33,2) ("ORTESubscriptionCreate: listening to mgroup %s\n",
+ IPAddressToString(multicastIPAddress,sIPAddress));
+ }
+ }
//generate new guid of publisher
d->subscriptions.counter++;
guid.hid=d->guid.hid;guid.aid=d->guid.aid;
guid.oid=(d->subscriptions.counter<<8)|OID_SUBSCRIPTION;
sp=(ORTESubsProp*)MALLOC(sizeof(ORTESubsProp));
memcpy(sp,&d->domainProp.subsPropDefault,sizeof(ORTESubsProp));
- strcpy(sp->topic,topic);
- strcpy(sp->typeName,typeName);
+ strcpy((char *)sp->topic,topic);
+ strcpy((char *)sp->typeName,typeName);
sp->deadline=*deadline;
sp->minimumSeparation=*minimumSeparation;
+ sp->multicast=multicastIPAddress;
switch (sType) {
case BEST_EFFORTS:
sp->reliabilityRequested=PID_VALUE_RELIABILITY_BEST_EFFORTS;
parameterUpdateCSChangeFromSubscription(csChange,sp);
csChange->guid=guid;
csChange->alive=ORTE_TRUE;
- csChange->cdrStream.buffer=NULL;
+ CDR_codec_init_static(&csChange->cdrCodec);
CSTWriterAddCSChange(d,&d->writerSubscriptions,csChange);
pthread_rwlock_unlock(&d->writerSubscriptions.lock);
pthread_rwlock_unlock(&d->subscriptions.lock);
pthread_rwlock_unlock(&d->typeEntry.lock);
pthread_rwlock_unlock(&d->objectEntry.objRootLock);
pthread_rwlock_unlock(&d->objectEntry.htimRootLock);
+
return cstReader;
}
CSChangeAttributes_init_head(csChange);
csChange->guid=cstReader->guid;
csChange->alive=ORTE_FALSE;
- csChange->cdrStream.buffer=NULL;
+ csChange->cdrCodec.buffer=NULL;
CSTWriterAddCSChange(cstReader->domain,
&cstReader->domain->writerSubscriptions,
csChange);
parameterUpdateCSChangeFromSubscription(csChange,sp);
csChange->guid=cstReader->guid;
csChange->alive=ORTE_TRUE;
- csChange->cdrStream.buffer=NULL;
+ csChange->cdrCodec.buffer=NULL;
CSTWriterAddCSChange(cstReader->domain,
&cstReader->domain->writerSubscriptions,csChange);
pthread_rwlock_unlock(&cstReader->lock);
htimerUnicastCommon_get_expire(&cstReader->deadlineTimer))>=0) {
memset(&info,0,sizeof(info));
info.status=DEADLINE;
- info.topic=sp->topic;
- info.type=sp->typeName;
+ info.topic=(char*)sp->topic;
+ info.type=(char*)sp->typeName;
cstReader->objectEntryOID->recvCallBack(&info,
cstReader->objectEntryOID->instance,
cstReader->objectEntryOID->callBackParam);
pthread_rwlock_unlock(&cstReader->domain->objectEntry.objRootLock);
return ORTE_OK;
}
+
+
+/*****************************************************************************/
+inline void *
+ORTESubscriptionGetInstance(ORTESubscription *cstReader) {
+ return cstReader->objectEntryOID->instance;
+}
+