GUID_PRINTF(d->guid));
}
-static void
+static int
initTaskProp(TaskProp *tp, ORTEDomain *d, int buffSize)
{
tp->d = d;
tp->terminate = ORTE_TRUE;
tp->sock.port = 0;
CDR_codec_init_static(&tp->mb.cdrCodec);
- if (buffSize)
+ if (buffSize) {
CDR_buffer_init(&tp->mb.cdrCodec, buffSize);
- sock_init_udp(&tp->sock);
+ if (!tp->mb.cdrCodec.buffer)
+ return -1;
+ }
+ return sock_init_udp(&tp->sock);
}
static int
return 0;
}
-ORTEDomain *
-ORTEDomainCreate(int domain, ORTEDomainProp *prop,
- ORTEDomainAppEvents *events, Boolean manager)
+static void
+publicationsInit(CSTPublications *p)
{
- ORTEDomain *d;
- ObjectEntryOID *objectEntryOID;
- AppParams *appParams;
- CSTWriterParams cstWriterParams;
- CSTReaderParams cstReaderParams;
- char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
- char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
- char sbuff[128];
- int i;
- uint16_t port = 0;
- int errno_save = 0;
-
- debug(30, 2) ("ORTEDomainCreate: %s compiled: %s,%s\n",
- ORTE_PACKAGE_STRING, __DATE__, __TIME__);
-
- debug(30, 10) ("ORTEDomainCreate: start\n");
- //Create domainApplication
- d = MALLOC(sizeof(ORTEDomain));
- if (!d)
- return NULL; //no memory
- //initialization local values
- d->domain = domain;
-
- //create domainProp
- if (prop != NULL) {
- memcpy(&d->domainProp, prop, sizeof(ORTEDomainProp));
- } else {
- if (!ORTEDomainPropDefaultGet(&d->domainProp)) {
- goto err_domainProp;
- }
- }
- ORTEDomainProp *dp = &d->domainProp;
-
- initTaskProp(&d->taskRecvUnicastMetatraffic, d, dp->recvBuffSize);
- initTaskProp(&d->taskRecvUnicastUserdata, d, !manager ? dp->recvBuffSize : 0);
- initTaskProp(&d->taskRecvMulticastMetatraffic, d, !manager && dp->multicast.enabled ? dp->recvBuffSize : 0);
- initTaskProp(&d->taskRecvMulticastUserdata, d, !manager && dp->multicast.enabled ? dp->recvBuffSize : 0);
+ p->counter = 0;
+ CSTWriter_init_root_field(p);
+ pthread_rwlock_init(&p->lock, NULL);
+}
- initTaskProp(&d->taskSend, d, dp->sendBuffSize);
- d->taskSend.mb.cdrCodec.wptr_max = dp->wireProp.metaBytesPerPacket;
- assert(d->taskSend.mb.cdrCodec.wptr_max <= d->taskSend.mb.cdrCodec.buf_len);
- d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
+static void
+subscriptionsInit(CSTSubscriptions *s)
+{
+ s->counter = 0;
+ CSTReader_init_root_field(s);
+ pthread_rwlock_init(&s->lock, NULL);
+}
- if (objectEntryInit(&d->objectEntry) == -1)
- goto err_sock;
+static void
+psEntryInit(PSEntry *e)
+{
+ PublicationList_init_root_field(e);
+ pthread_rwlock_init(&e->publicationsLock, NULL);
+ SubscriptionList_init_root_field(e);
+ pthread_rwlock_init(&e->subscriptionsLock, NULL);
+}
- //publication,subscriptions
- d->publications.counter = d->subscriptions.counter = 0;
- CSTWriter_init_root_field(&d->publications);
- CSTReader_init_root_field(&d->subscriptions);
- pthread_rwlock_init(&d->publications.lock, NULL);
- pthread_rwlock_init(&d->subscriptions.lock, NULL);
- //publication,subscriptions lists
- PublicationList_init_root_field(&d->psEntry);
- pthread_rwlock_init(&d->psEntry.publicationsLock, NULL);
- SubscriptionList_init_root_field(&d->psEntry);
- pthread_rwlock_init(&d->psEntry.subscriptionsLock, NULL);
-
- //pattern
+static void
+patternEntryInit(ORTEDomain *d)
+{
pthread_rwlock_init(&d->patternEntry.lock, NULL);
- ORTEPatternRegister(d, ORTEPatternCheckDefault, ORTEPatternMatchDefault, NULL);
Pattern_init_head(&d->patternEntry);
+ ORTEPatternRegister(d, ORTEPatternCheckDefault, ORTEPatternMatchDefault, NULL);
+}
+
+static void
+printLocalAddresses(ORTEDomain *d)
+{
+ char iflocal[MAX_INTERFACES*MAX_STRING_IPADDRESS_LENGTH];
+ char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+ int i;
- //print local IP addresses
iflocal[0] = 0;
if (d->domainProp.IFCount) {
for (i = 0; i < d->domainProp.IFCount; i++)
debug(30, 2) ("ORTEDomainCreate: localIPAddres(es) %s\n", iflocal);
} else {
debug(30, 2) ("ORTEDomainCreate: no active interface card\n");
- if (d->domainProp.multicast.enabled) {
- debug(30, 0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
- goto err_domainProp;
- }
}
+}
- //DomainEvents
- if (events != NULL) {
- memcpy(&d->domainEvents, events, sizeof(ORTEDomainAppEvents));
- } else {
- memset(&d->domainEvents, 0, sizeof(ORTEDomainAppEvents));
- }
+static void
+typeEntryInit(TypeEntry *e)
+{
+ ORTEType_init_root_field(e);
+ pthread_rwlock_init(&e->lock, NULL);
+}
- //TypeRegister
- ORTEType_init_root_field(&d->typeEntry);
- pthread_rwlock_init(&d->typeEntry.lock, NULL);
+static int32_t
+bindUnicastMetatrafficSock(ORTEDomain *d, Boolean manager)
+{
+ uint16_t port = 0;
- /************************************************************************/
- /* UnicastMetatraffic */
Domain2Port(d->domain, port);
if (manager) {
if (d->domainProp.multicast.enabled) {
debug(30, 2) ("ORTEDomainCreate: joint to mgroup %s\n",
IPAddressToString(d->domainProp.multicast.ipAddress, sIPAddress));
} else
- goto err_sock;
+ return -1;
}
if (sock_bind(&d->taskRecvUnicastMetatraffic.sock, port, d->domainProp.listen) == -1) {
- goto err_sock;
+ return -1;
}
} else {
/* give me receiving port (metatraffic) */
if (sock_bind(&d->taskRecvUnicastMetatraffic.sock, 0, d->domainProp.listen) == -1) {
- goto err_sock;
+ return -1;
}
}
debug(30, 2) ("ORTEDomainCreate: bind on port(RecvUnicastMetatraffic): %u\n",
d->taskRecvUnicastMetatraffic.sock.port);
- /************************************************************************/
- /* MulticastMetatraffic */
- if (d->domainProp.multicast.enabled && !manager) {
- char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
- struct ip_mreq mreq;
+ return port;
+}
+
+static int
+bindMulticastMetatrafficSock(ORTEDomain *d)
+{
+ char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
+ struct ip_mreq mreq;
+ Port mport;
+ int reuse = 1;
+
+ //reuseaddr
+ sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET,
+ SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
+ debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
+ reuse);
+
+ //multicast loop
+ sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
+ IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
+ sizeof(d->domainProp.multicast.loopBackEnabled));
+ debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
+ d->domainProp.multicast.loopBackEnabled);
+
+ //joint to multicast group
+ mreq.imr_multiaddr.s_addr = htonl(d->domainProp.multicast.ipAddress);
+ mreq.imr_interface.s_addr = htonl(INADDR_ANY);
+ if (sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
+ IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
+ debug(30, 2) ("ORTEDomainCreate: joint to mgroup %s\n",
+ IPAddressToString(d->domainProp.multicast.ipAddress, sIPAddress));
+ }
+
+ /* receiving multicast port (metatraffic) */
+ Domain2PortMulticastMetatraffic(d->domain, mport);
+ if (sock_bind(&d->taskRecvMulticastMetatraffic.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
+ return -1;
+ }
+ debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
+ d->taskRecvMulticastMetatraffic.sock.port);
+ return 0;
+}
+
+static int
+bindUserDataSockets(ORTEDomain *d)
+{
+ if (sock_bind(&d->taskRecvUnicastUserdata.sock, 0, d->domainProp.listen) == -1) {
+ return -1;
+ }
+ debug(30, 2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
+ d->taskRecvUnicastUserdata.sock.port);
+
+ if (d->domainProp.multicast.enabled) {
Port mport;
int reuse = 1;
//reuseaddr
- sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, SOL_SOCKET,
+ sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET,
SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
reuse);
//multicast loop
- sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
+ sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
sizeof(d->domainProp.multicast.loopBackEnabled));
debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
d->domainProp.multicast.loopBackEnabled);
- //joint to multicast group
- mreq.imr_multiaddr.s_addr = htonl(d->domainProp.multicast.ipAddress);
- mreq.imr_interface.s_addr = htonl(INADDR_ANY);
- if (sock_setsockopt(&d->taskRecvMulticastMetatraffic.sock, IPPROTO_IP,
- IP_ADD_MEMBERSHIP, (const char *)&mreq, sizeof(mreq)) >= 0) {
- debug(30, 2) ("ORTEDomainCreate: joint to mgroup %s\n",
- IPAddressToString(d->domainProp.multicast.ipAddress, sIPAddress));
- }
-
- /* receiving multicast port (metatraffic) */
- Domain2PortMulticastMetatraffic(d->domain, mport);
- if (sock_bind(&d->taskRecvMulticastMetatraffic.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
- goto err_sock;
- }
- debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastMetatraffic): %u\n",
- d->taskRecvMulticastMetatraffic.sock.port);
- }
-
- /************************************************************************/
- /* UserData */
- if (!manager) {
- /* give me receiving port (userdata) */
- if (sock_bind(&d->taskRecvUnicastUserdata.sock, 0, d->domainProp.listen) == -1) {
- goto err_sock;
- }
- debug(30, 2) ("ORTEDomainCreate: bind on port(RecvUnicatUserdata): %u\n",
- d->taskRecvUnicastUserdata.sock.port);
-
- if (d->domainProp.multicast.enabled) {
- Port mport;
- int reuse = 1;
-
- //reuseaddr
- sock_setsockopt(&d->taskRecvMulticastUserdata.sock, SOL_SOCKET,
- SO_REUSEADDR, (const char *)&reuse, sizeof(reuse));
- debug(30, 2) ("ORTEDomainCreate: set value SO_REUSEADDR: %u\n",
- reuse);
-
- //multicast loop
- sock_setsockopt(&d->taskRecvMulticastUserdata.sock, IPPROTO_IP,
- IP_MULTICAST_LOOP, (const char *)&d->domainProp.multicast.loopBackEnabled,
- sizeof(d->domainProp.multicast.loopBackEnabled));
- debug(30, 2) ("ORTEDomainCreate: set value IP_MULTICAST_LOOP: %u\n",
- d->domainProp.multicast.loopBackEnabled);
-
- /* receiving multicast port (userdata) */
- Domain2PortMulticastUserdata(d->domain, mport);
- if (sock_bind(&d->taskRecvMulticastUserdata.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
- goto err_sock;
- }
- debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
- d->taskRecvMulticastUserdata.sock.port);
+ /* receiving multicast port (userdata) */
+ Domain2PortMulticastUserdata(d->domain, mport);
+ if (sock_bind(&d->taskRecvMulticastUserdata.sock, (uint16_t)mport, d->domainProp.listen) == -1) {
+ return -1;
}
+ debug(30, 2) ("ORTEDomainCreate: bind on port(RecvMulticastUserdata): %u\n",
+ d->taskRecvMulticastUserdata.sock.port);
}
+ return 0;
+}
- /************************************************************************/
- /* Send */
- /* give me sending port */
+static int
+bindSendSock(ORTEDomain *d)
+{
if (sock_bind(&d->taskSend.sock, 0, d->domainProp.listen) == -1) {
- goto err_sock;
+ return -1;
}
debug(30, 2) ("ORTEDomainCreate: bind on port(Send): %u\n",
d->taskSend.sock.port);
d->domainProp.multicast.ttl);
}
}
+ return 0;
+}
- /************************************************************************/
- /* tests for valid resources */
- if ((d->taskRecvUnicastMetatraffic.sock.fd < 0) ||
- (d->taskSend.sock.fd < 0) ||
- (d->domainProp.multicast.enabled &&
- (d->taskRecvUnicastUserdata.sock.fd < 0)) ||
- (d->domainProp.multicast.enabled &&
- (d->taskRecvMulticastUserdata.sock.fd < 0)) ||
- (d->domainProp.multicast.enabled &&
- (d->taskRecvMulticastMetatraffic.sock.fd < 0))) {
- debug(30, 0) ("ORTEDomainCreate: Error creating socket(s).\n");
- goto err_sock;
- }
-
- if ((!d->taskRecvUnicastMetatraffic.mb.cdrCodec.buffer) ||
- (!d->taskSend.mb.cdrCodec.buffer) ||
- (d->domainProp.multicast.enabled && !manager &&
- !d->taskRecvUnicastUserdata.mb.cdrCodec.buffer) ||
- (d->domainProp.multicast.enabled && !manager &&
- !d->taskRecvMulticastUserdata.mb.cdrCodec.buffer) ||
- (d->domainProp.multicast.enabled && !manager &&
- !d->taskRecvMulticastMetatraffic.mb.cdrCodec.buffer)) { //no a memory
- debug(30, 0) ("ORTEDomainCreate: Error creating buffer(s).\n");
- goto err_sock;
- }
-
- generateLocalGUID(d, manager);
-
- //create HEADER of message for sending task
- RTPSHeaderCreate(&d->taskSend.mb.cdrCodec, d->guid.hid, d->guid.aid);
- d->taskSend.mb.needSend = ORTE_FALSE;
- d->taskSend.mb.containsInfoReply = ORTE_FALSE;
- d->taskSend.mb.cdrCodecDirect = NULL;
+static AppParams *
+appParamsNew(ORTEDomain *d)
+{
+ int i;
+ char sbuff[128];
+ AppParams *appParams = (AppParams *)MALLOC(sizeof(AppParams));
- //Self object data & fellow managers object data
- appParams = (AppParams *)MALLOC(sizeof(AppParams));
if (!appParams) {
- goto err_sock;
+ return NULL;
}
AppParamsInit(appParams);
appParams->expirationTime = d->domainProp.baseProp.expirationTime;
appParams->managerKeyList[appParams->managerKeyCount++] =
StringToIPAddress(sbuff);
}
- d->appParams = appParams;
+ return appParams;
+}
+
+ORTEDomain *
+ORTEDomainCreate(int domain, ORTEDomainProp *prop,
+ ORTEDomainAppEvents *events, Boolean manager)
+{
+ ORTEDomain *d;
+ ObjectEntryOID *objectEntryOID;
+ AppParams *appParams;
+ CSTWriterParams cstWriterParams;
+ CSTReaderParams cstReaderParams;
+ char sbuff[128];
+ int i;
+ int errno_save = 0;
+
+ debug(30, 2) ("ORTEDomainCreate: %s compiled: %s,%s\n",
+ ORTE_PACKAGE_STRING, __DATE__, __TIME__);
+
+ debug(30, 10) ("ORTEDomainCreate: start\n");
+ //Create domainApplication
+ d = MALLOC(sizeof(ORTEDomain));
+ if (!d)
+ return NULL; //no memory
+ //initialization local values
+ d->domain = domain;
+
+ //create domainProp
+ if (prop != NULL) {
+ memcpy(&d->domainProp, prop, sizeof(ORTEDomainProp));
+ } else {
+ if (!ORTEDomainPropDefaultGet(&d->domainProp)) {
+ goto err_domainProp;
+ }
+ }
+ if (!d->domainProp.IFCount && d->domainProp.multicast.enabled) {
+ debug(30, 0) ("ORTEDomainCreate: for multicast have to be active an interface\n");
+ goto err_domainProp;
+ }
+ ORTEDomainProp *dp = &d->domainProp;
+
+ if (-1 == initTaskProp(&d->taskRecvUnicastMetatraffic, d, dp->recvBuffSize) ||
+ -1 == initTaskProp(&d->taskRecvUnicastUserdata, d, !manager ? dp->recvBuffSize : 0) ||
+ -1 == initTaskProp(&d->taskRecvMulticastMetatraffic, d, !manager && dp->multicast.enabled ? dp->recvBuffSize : 0) ||
+ -1 == initTaskProp(&d->taskRecvMulticastUserdata, d, !manager && dp->multicast.enabled ? dp->recvBuffSize : 0) ||
+ -1 == initTaskProp(&d->taskSend, d, dp->sendBuffSize))
+ goto err_domainProp;
+
+ d->taskSend.mb.cdrCodec.wptr_max = dp->wireProp.metaBytesPerPacket;
+ assert(d->taskSend.mb.cdrCodec.wptr_max <= d->taskSend.mb.cdrCodec.buf_len);
+ d->taskSend.mb.cdrCodec.data_endian = FLAG_ENDIANNESS;
+
+ if (objectEntryInit(&d->objectEntry) == -1)
+ goto err_sock;
+
+ publicationsInit(&d->publications);
+ subscriptionsInit(&d->subscriptions);
+ psEntryInit(&d->psEntry);
+
+ patternEntryInit(d);
+
+ printLocalAddresses(d);
+
+ //DomainEvents
+ if (events != NULL) {
+ memcpy(&d->domainEvents, events, sizeof(ORTEDomainAppEvents));
+ } else {
+ memset(&d->domainEvents, 0, sizeof(ORTEDomainAppEvents));
+ }
+
+ typeEntryInit(&d->typeEntry);
+
+ // Bind sockets
+ int32_t ret = bindUnicastMetatrafficSock(d, manager);
+ if (ret == -1)
+ goto err_sock;
+ uint16_t port = ret & 0xffff;
+
+ if (!manager) {
+ if (d->domainProp.multicast.enabled) {
+ if (bindMulticastMetatrafficSock(d) == -1)
+ goto err_sock;
+ }
+
+ if (bindUserDataSockets(d) == -1)
+ goto err_sock;
+ }
+
+ if (bindSendSock(d) == -1)
+ goto err_sock;
+
+ generateLocalGUID(d, manager);
+
+ //create HEADER of message for sending task
+ RTPSHeaderCreate(&d->taskSend.mb.cdrCodec, d->guid.hid, d->guid.aid);
+ d->taskSend.mb.needSend = ORTE_FALSE;
+ d->taskSend.mb.containsInfoReply = ORTE_FALSE;
+ d->taskSend.mb.cdrCodecDirect = NULL;
+
+ /* Create application object */
+ d->appParams = appParamsNew(d);
+ if (!d->appParams)
+ goto err_sock;
+
//insert object, doesn't need to be locked
- d->objectEntryOID = objectEntryAdd(d, &d->guid, (void *)appParams);
+ d->objectEntryOID = objectEntryAdd(d, &d->guid, (void *)d->appParams);
d->objectEntryOID->privateCreated = ORTE_TRUE;
guid.aid = AID_UNKNOWN;
guid.oid = OID_APP;
if (!objectEntryFind(d, &guid)) {
+ char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
appParams = (AppParams *)MALLOC(sizeof(AppParams));
AppParamsInit(appParams);
appParams->hostId = guid.hid;
guid.aid = AID_UNKNOWN;
guid.oid = OID_APP;
if (!objectEntryFind(d, &guid)) {
+ char sIPAddress[MAX_STRING_IPADDRESS_LENGTH];
appParams = (AppParams *)MALLOC(sizeof(AppParams));
AppParamsInit(appParams);
appParams->hostId = guid.hid;
cstReaderParams.delayResponceTimeMax = d->domainProp.baseProp.delayResponceTimeACKMax;
cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
if (manager) {
- cstReaderParams.ACKMaxRetries = d->domainProp.baseProp.ACKMaxRetries;
cstReaderParams.repeatActiveQueryTime = iNtpTime; //RM cann't repeatly send ACKf
} else {
cstReaderParams.repeatActiveQueryTime = d->domainProp.baseProp.repeatActiveQueryTime;