]> rtime.felk.cvut.cz Git - frescor/forb.git/blobdiff - src/iop.c
Handle EINTR in epoll
[frescor/forb.git] / src / iop.c
index 15835b78a6466a5b044c56368298f80b5bb3ef45..f6dca49e57319e83b43b12865963f697447ea549 100644 (file)
--- a/src/iop.c
+++ b/src/iop.c
@@ -107,6 +107,10 @@ forb_iop_prepare_request(forb_request_t *req,
        if (ret) {
                /* Request body is 8 byte aligned */
                ret = FORB_CDR_put_align(&req->cdr_request, 8);
+               char str[50];
+               ul_logdeb("preparing request: id=%d  dest=%s  iface=%s method=%d\n", req->request_id,
+                         forb_server_id_to_string(str, &req->obj->server, sizeof(str)),
+                         iface, method_ind);
        }
        return ret;
 }
@@ -145,6 +149,8 @@ forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *cod
                                case forb_iop_HELLO:
                                        break;
                                default:
+                                       ul_logerr("rcvd wrong message type: %d\n",
+                                                 mh->message_type);
                                        return false;
                        }
                        codec->data_endian = (mh->flags && forb_iop_LITTLE_ENDIAN) ?
@@ -153,6 +159,8 @@ forb_iop_process_message_header(forb_iop_message_header *mh, FORB_CDR_Codec *cod
                        return true;
                        break;                  
                default:
+                       ul_logerr("rcvd wrong protocol versio: %d.%d\n",
+                                 mh->proto_version.major, mh->proto_version.minor);
                        return false;
        }
 }
@@ -209,8 +217,14 @@ forb_iop_send_reply(forb_t *forb,
 
        peer = forb_get_next_hop(forb, dest, &timeout);
        if (!peer) {
-               ul_logerr("Reply destination not found\n");
+               char str[60];
+               forb_server_id_to_string(str, dest, sizeof(str));
+               ul_logerr("Reply destination not found: %s\n", str);
                goto err;
+       } else {
+               char str[60];
+               forb_server_id_to_string(str, dest, sizeof(str));
+               ul_logdeb("sending reply: dest=%s, id=%u\n", str, request_id);
        }
        forb_proto_send(peer, codec);
        forb_peer_put(peer);
@@ -267,6 +281,7 @@ process_request(forb_port_t *port, FORB_CDR_Codec *codec, uint32_t message_size)
                        forb_peer_put(port->new_peer);
                        port->new_peer = NULL;
                } else {
+                       ul_logdeb("discovered new_peer from incomming connection\n");
                        peer = port->new_peer;
                        port->new_peer = NULL;
                        forb_new_peer_discovered(port, peer, request_header.source,
@@ -492,7 +507,8 @@ void *forb_iop_receiver_thread(void *arg)
        forb_port_t *port = arg;
        const forb_proto_t *proto = port->desc.proto;
        FORB_CDR_Codec *c = &port->codec;
-       size_t rcvd, len;
+       ssize_t rcvd;
+       size_t len;
        forb_iop_message_header mh;
        bool header_received = false;
 
@@ -508,7 +524,7 @@ void *forb_iop_receiver_thread(void *arg)
                                   &c->buffer[c->wptr],
                                   c->wptr_max - c->wptr);
                if (rcvd < 0) {
-                       ul_logmsg("recv returned error %d\n", rcvd);
+                       ul_logerr("recv returned error %d (%s), exiting\n", rcvd, strerror(errno));
                        return NULL;
                }
                c->wptr += rcvd;
@@ -524,6 +540,10 @@ void *forb_iop_receiver_thread(void *arg)
                                                ul_logerr("Header doesn't start at 8 byte bounday\n");
                                        }
                                        header_received = forb_iop_process_message_header(&mh, c);
+                                       if (!header_received) {
+                                               ul_logerr("Wrong header received\n");
+                                               /* TODO: We should probably reset the buffer here */
+                                       }
                                        len = FORB_CDR_data_size(c);
                                } else {
                                        break; /* Wait for more data to arrive*/
@@ -604,4 +624,76 @@ void *forb_iop_discovery_thread(void *arg)
        return NULL;
 }
 
+/** 
+ * Sends REQUEST message to another FORB.
+ *
+ * The request @a req has to be prepared by
+ * forb_iop_prepare_request(). Then, this function adds a message
+ * header, connects to the destination FORB and sends the request.
+ *
+ * If no exception is reported, then the caller must wait for response
+ * by calling forb_wait_for_reply().
+ * 
+ * @param req A request prepared by forb_iop_prepare_request()
+ * @param env Environment for returning exceptions
+ */
+void
+forb_request_send(forb_request_t *req, CORBA_Environment *env)
+{
+       CORBA_boolean ret;
+       forb_peer_t *peer;
+       ssize_t size;
+       size_t len;
+       fosa_abs_time_t timeout;
+       forb_t *forb = forb_object_to_forb(req->obj);
+
+       if (!forb) {
+               env->major = FORB_EX_INTERNAL;
+               return;
+       }
 
+       req->env = env;     /* Remember, where to return exceptions */
+
+       ret = forb_iop_prepend_message_header(&req->cdr_request, forb_iop_REQUEST);
+       if (!ret) {
+               /* This should never happen */
+               env->major = FORB_EX_INTERNAL;
+               return;
+       }
+
+       fosa_clock_get_time(FOSA_CLOCK_ABSOLUTE, &timeout);
+       timeout = fosa_abs_time_incr(timeout,
+                                    fosa_msec_to_rel_time(1000));
+       peer = forb_get_next_hop(forb, &req->obj->server, &timeout);
+       if (!peer) {
+               char str[50];
+               ul_logerr("Cannot find peer to send request for server %s\n",
+                         forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
+               env->major = FORB_EX_COMM_FAILURE;
+               return;
+       }
+       /* Register the request with forb so we can match incomming
+        * reply to this request. */
+       ret = forb_request_insert(forb, req);
+       if (ret <= 0) {
+               ul_logerr("Insert request error %d\n", ret);
+               env->major = FORB_EX_INTERNAL;
+               goto err_peer_put;
+       }
+
+       {
+               char str[50];
+               ul_logdeb("sending request: id=%d  dest=%s\n", req->request_id,
+                         forb_server_id_to_string(str, &req->obj->server, sizeof(str)));
+       }
+       len = FORB_CDR_data_size(&req->cdr_request);
+       fosa_mutex_lock(&peer->send_lock);
+       size = forb_proto_send(peer, &req->cdr_request);
+       fosa_mutex_unlock(&peer->send_lock);
+       if (size <= 0 || size != len) {
+               env->major = FORB_EX_COMM_FAILURE;
+               /* Request is deleted when the stub calls forb_request_destroy() */
+       }
+ err_peer_put:
+       forb_peer_put(peer);
+}