]> rtime.felk.cvut.cz Git - frescor/streamer.git/blobdiff - streamer.c
Proper CPU contracts in streamer
[frescor/streamer.git] / streamer.c
index 4185c1689f8e3808311de72c43ff158d8d1de3cf..33df5bcf4a7d1c7a59d6924cfa86a649a8d87712 100644 (file)
@@ -5,6 +5,8 @@
  */
 #include <unistd.h>
 #include <stdlib.h>
+#include <signal.h>
+#include <pthread.h>
 
 #include <libavformat/avformat.h>
 #include <libavdevice/avdevice.h>
 #include "codec.h"
 #include "rt.h"
 
+#include "streamer_config.h"
+
+#ifdef CONFIG_OC_ULUT
+#include <ul_log.h>
+#include <ul_logreg.h>
+#endif
+
+#ifdef CONFIG_FFMPEG_WITH_FRSH
+#include <frsh.h>
+
+/*temporrary solution to pass network parameters */
+extern long int frsh_rtp_budget, frsh_rtp_period_ms, frsh_rtp_deadline_ms;
+#endif /*CONFIG_STREAMER_WITH_FRSH*/
+
 static const char *sdp_file = "sdp.txt";
 static const char *vdev = "/dev/video0";
 static const char *dst = "127.0.0.1";
 static int dport = 20000;
-static int width = 352;
-static int height = 288;
-int fps = 25;
+static int width = 320;
+static int height = 240;
+static int bitrate = 1000000;
+int fps = 30;
 static const char *impform = "video4linux2";
+AVFormatContext *s, *os;
 
 static void sdp_print(AVFormatContext *s, const char *fname)
 {
@@ -35,12 +53,36 @@ static void sdp_print(AVFormatContext *s, const char *fname)
     fclose(f);
 }
 
+static void
+usage(void)
+{
+       printf("usage: streamer [ options ]\n");
+       printf("  -w <number>    send image width\n");
+       printf("  -h <number>    send image height\n");
+       printf("  -r <number>    refresh rate\n");
+       printf("  -r <path>      video device [%s]\n", vdev);
+       printf("  -m <addr>      destination IP address\n");
+       printf("  -i <string>    input video device format [%s]\n", impform);
+       printf("  -p <port>      destination port [%d]\n", dport);
+       printf("  -b <bitrate>   bitrate in b/s [%d]\n", bitrate);
+       printf("  -s <sdp_file>  name of output sdp file [%s]\n", sdp_file);
+      #ifdef CONFIG_OC_ULUT
+       printf("  -l <number>|<domain>=<number>,...\n");
+      #endif /*CONFIG_OC_ULUT*/
+}
+
 static int args_parse(int argc, char *argv[])
 {
   int v;
 
-  while ((v = getopt(argc, argv, "w:h:r:d:m:i:")) >= 0) {
+  while ((v = getopt(argc, argv, "w:h:r:d:m:i:l:b:p:s:")) >= 0) {
     switch (v) {
+      case 's':
+       sdp_file = optarg;
+       break;
+      case 'p':
+        dport = atoi(optarg);
+        break;
       case 'w':
         width = atoi(optarg);
         break;
@@ -50,6 +92,9 @@ static int args_parse(int argc, char *argv[])
       case 'r':
         fps = atoi(optarg);
         break;
+      case 'b':
+       bitrate = atoi(optarg);
+       break;
       case 'd':
         vdev = optarg;
         break;
@@ -63,8 +108,15 @@ static int args_parse(int argc, char *argv[])
        else if(!strcmp(impform, "v4l2"))
          impform = "video4linux2";
         break;
-      default: /* ’?’ */
+      #ifdef CONFIG_OC_ULUT
+      case 'l':
+       ul_log_domain_arg2levels(optarg);
+       break;
+      #endif /*CONFIG_OC_ULUT*/
+
+      default: /* unknown option */
         fprintf(stderr, "%s: illegal option %c\n", argv[0], v);
+       usage();
         exit(-1);
     }
   }
@@ -72,10 +124,127 @@ static int args_parse(int argc, char *argv[])
   return 0;
 }
 
-int main(int argc, char *argv[])
+int streamer_run_done_rq;
+
+int
+timespec_subtract (struct timespec *result,
+                  struct timespec *x,
+                  struct timespec *y)
+{
+  /* Perform the carry for the later subtraction by updating Y. */
+  if (x->tv_nsec < y->tv_nsec) {
+    int num_sec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
+    y->tv_nsec -= 1000000000 * num_sec;
+    y->tv_sec += num_sec;
+  }
+  if (x->tv_nsec - y->tv_nsec > 1000000000) {
+    int num_sec = (x->tv_nsec - y->tv_nsec) / 1000000000;
+    y->tv_nsec += 1000000000 * num_sec;
+    y->tv_sec -= num_sec;
+  }
+
+  /* Compute the time remaining to wait.
+     `tv_nsec' is certainly positive. */
+  result->tv_sec = x->tv_sec - y->tv_sec;
+  result->tv_nsec = x->tv_nsec - y->tv_nsec;
+
+  /* Return 1 if result is negative. */
+  return x->tv_sec < y->tv_sec;
+}
+
+
+void* streamer_run(void* args)
 {
-  AVFormatContext *s, *os;
   int done;
+  int frame = 0;
+  int fps_avg = 0;
+  struct timespec start, end, d;
+  static unsigned max_size = 0, min_size = -1;
+  static double avg_size = 0;
+  clock_gettime(CLOCK_MONOTONIC, &start);
+  start.tv_sec--;              /* Avoid division by zero */
+  done = 0;
+  while (!(done = streamer_run_done_rq)) {
+    AVPacket *pkt;
+    pkt = read_input_packet(s);
+
+    clock_gettime(CLOCK_MONOTONIC, &end);
+    timespec_subtract(&d, &end, &start);
+    start = end;
+    int fps_now = (1000<<16)/(d.tv_sec*1000+d.tv_nsec/1000000);
+    fps_avg += (fps_now - fps_avg) >> 4;
+    
+    if (pkt == NULL) {
+      done = 1;
+    } else {
+      AVFrame *f;
+      AVPacket *opkt;
+
+      pkt->pts += s->streams[pkt->stream_index]->start_time;
+      //rt_job_start(pkt->pts);
+      f = pkt_decode(s, pkt);
+      if (f) {
+        int keyframe;
+        opkt = pkt_encode(os, f);
+       keyframe = os->streams[0]->codec->coded_frame->key_frame;
+        if (opkt) {
+          pkt_send(os, opkt);
+
+         if (opkt->size > max_size)
+                 max_size = opkt->size;
+         if (opkt->size < min_size)
+                 min_size = opkt->size;
+         avg_size = avg_size*frame/(frame+1) + (double)opkt->size/(frame+1);
+         printf("%5d%c: %2d (%4.1f) fps  opkt size: %5d b  max=%5u b min=%5u b avg=%5.0f\n",
+                frame, keyframe ? '*':' ',
+                fps_avg>>16, 1000.0/(d.tv_sec*1000+d.tv_nsec/1000000),
+                opkt->size, max_size, min_size, avg_size);
+         if (frame % 100 == 0) max_size=0;
+         frame++;// = (frame + 1) % fps;
+        }
+      }
+      //rt_job_end();
+      av_free_packet(pkt);
+    }
+  }
+
+  return NULL;
+}
+
+void wait_for_ending_command(void) {
+  sigset_t sigset;
+  sigemptyset(&sigset);
+  sigaddset(&sigset, SIGINT);
+  sigaddset(&sigset, SIGTERM);
+  sigwaitinfo(&sigset, NULL);
+}
+static void block_signals(void) {
+  sigset_t sigset;
+  sigemptyset(&sigset);
+  sigaddset(&sigset, SIGINT);
+  sigaddset(&sigset, SIGTERM);
+  sigprocmask(SIG_BLOCK,&sigset,NULL);
+  pthread_sigmask(SIG_BLOCK,&sigset,NULL);
+}
+
+
+int main(int argc, char *argv[])
+{
+
+  //long int cpu_budget, cpu_period;
+  int ret;
+
+  block_signals();
+
+#ifdef CONFIG_FFMPEG_WITH_FRSH
+  ret = frsh_init();
+  if (ret) PERROR_AND_EXIT(ret, "frsh_init1");
+
+  /* fill default network contract params */
+  frsh_rtp_budget = 100*bitrate/8/100; 
+  frsh_rtp_period_ms = 1000;
+  frsh_rtp_deadline_ms = 1000/fps;
+#endif /*CONFIG_FFMPEG_WITH_FRSH*/
 
   avcodec_register_all();
   av_register_all();
@@ -90,7 +259,7 @@ int main(int argc, char *argv[])
     return -1;
   }
   codec_open(s);
-  os = open_output_stream(dst, dport, CODEC_TYPE_VIDEO);
+  os = open_output_stream(dst, dport, CODEC_TYPE_VIDEO, fps);
   if (os == NULL) {
     fprintf(stderr, "Cannot open output stream\n");
 
@@ -99,33 +268,78 @@ int main(int argc, char *argv[])
   os->streams[0]->codec->width = s->streams[0]->codec->width;
   os->streams[0]->codec->height = s->streams[0]->codec->height;
   os->streams[0]->codec->time_base = s->streams[0]->codec->time_base;
+  os->streams[0]->codec->bit_rate = bitrate;
   codec_connect(s->streams[0]->codec, os->streams[0]->codec);
   out_codec_open(os);
   dump_format(os, 0, os->filename, 1);
   sdp_print(os, sdp_file);
-  done = 0;
-  while (!done) {
-    AVPacket *pkt;
-    pkt = read_input_packet(s);
-    if (pkt == NULL) {
-      done = 1;
-    } else {
-      AVFrame *f;
-      AVPacket *opkt;
 
-      pkt->pts += s->streams[pkt->stream_index]->start_time;
-      //rt_job_start(pkt->pts);
-      f = pkt_decode(s, pkt);
-      if (f) {
-        opkt = pkt_encode(os, f);
-        if (opkt) {
-          pkt_send(os, opkt);
-        }
-      }
-      //rt_job_end();
-      av_free_packet(pkt);
-    }
-  }
+#if CONFIG_FFMPEG_WITH_FRSH
+  frsh_thread_attr_t frsh_attr;
+  frsh_thread_id_t thread;
+  frsh_vres_id_t cpu_vres;
+  frsh_contract_t cpu_contract;
+  frsh_rel_time_t cpu_budget, cpu_period;
+  cpu_budget = fosa_msec_to_rel_time(5);
+  cpu_period = fosa_msec_to_rel_time(1000/fps);
+  /* Contract negotiation for CPU */
+  ret = frsh_contract_init(&cpu_contract);
+  if (ret) PERROR_AND_EXIT(ret, "CPU:frsh_contract_init");
+  ret = frsh_contract_set_basic_params(&cpu_contract,
+                                            &cpu_budget,
+                                            &cpu_period,
+                                            FRSH_WT_BOUNDED,
+                                            FRSH_CT_REGULAR);
+       if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_basic_params");
+       ret = frsh_contract_set_resource_and_label(&cpu_contract, 
+                       FRSH_RT_PROCESSOR, FRSH_CPU_ID_DEFAULT, "camera_ctrl");
+       if (ret) PERROR_AND_EXIT(ret, "frsh_contract_set_resource_and_label");
+
+       ret = frsh_contract_negotiate(&cpu_contract, &cpu_vres);
+       if (ret) PERROR_AND_EXIT(ret, "frsh_contract_negotiate");
+       printf("Aqcpu vres negotiated\n");
+
+       pthread_attr_init(&frsh_attr);
+       ret = frsh_thread_create_and_bind(cpu_vres, &thread, &frsh_attr, 
+                               streamer_run, (void*) NULL);
+       if (ret) PERROR_AND_EXIT(ret, "frsh_thread_create_and_bind");
+
+        wait_for_ending_command();
+
+       streamer_run_done_rq = 1;
+       
+       pthread_join(thread.pthread_id, (void**) NULL); 
+
+       printf("Ending contracts\n");
+
+       ret = frsh_contract_cancel(cpu_vres);
+       if (ret) PERROR_AND_EXIT(ret, "frsh_contract_cancel");
+
+       printf("Finishing\n");
+
+        close_output_stream(os);
+#else
+       pthread_attr_t attr;
+       pthread_t streamer_th;
+
+       pthread_attr_init(&attr);
+
+       ret = pthread_create(&streamer_th, &attr, streamer_run, (void*) NULL);
+       if (ret) 
+                       printf("Failed to create streamer thread\n.");
+
+        wait_for_ending_command();
+
+       streamer_run_done_rq = 1;
+
+       pthread_join(streamer_th, (void**) NULL);
+
+       printf("Finishing\n");
+
+        close_output_stream(os);
+#endif
 
   return 0;
 }