]> rtime.felk.cvut.cz Git - fpga/lx-cpu1/lx-dad.git/commitdiff
Simple tool to capture scans written in Python.
authorPavel Pisa <pisa@cmp.felk.cvut.cz>
Tue, 27 Oct 2015 20:58:13 +0000 (21:58 +0100)
committerPavel Pisa <pisa@cmp.felk.cvut.cz>
Tue, 27 Oct 2015 20:58:13 +0000 (21:58 +0100)
Signed-off-by: Pavel Pisa <pisa@cmp.felk.cvut.cz>
host/app/dadpyscan/dadpyscan.py [new file with mode: 0755]

diff --git a/host/app/dadpyscan/dadpyscan.py b/host/app/dadpyscan/dadpyscan.py
new file mode 100755 (executable)
index 0000000..f1170cc
--- /dev/null
@@ -0,0 +1,168 @@
+#!/usr/bin/python
+
+import sys
+import serial
+import time
+import numpy
+import argparse
+import random
+
+class rdevice(object):
+  def __init__(self, rcon = None):
+    self.rcon = rcon;
+    self.wait_stamp_sqn = random.randint(1, 1000)
+
+  def set_rcon(self, rcon):
+    if self.rcon is not None:
+      self.rcon.close();
+      self.rcon = None;
+    self.rcon = rcon;
+
+  def init_communication(self):
+    s = self.rcon.read(1024)
+
+    self.rcon.write("InvBuff\n")
+    s = self.rcon.read(1024)
+
+    self.rcon.write("ECHO:0\n")
+    s = self.rcon.read(1024)
+    self.rcon.write("VER?\n")
+    s = self.rcon.read(1024)
+    print(s);
+
+    # Purge
+    s = self.rcon.read(2000)
+    print(s);
+
+  def set_int_param(self, param, val):
+    valstr=str(int(val))
+    self.rcon.write(param+':'+valstr+'\n')
+
+  def wait_stamp(self):
+    s = self.rcon.read(1024)
+    self.wait_stamp_sqn = self.wait_stamp_sqn + 1
+    sqnstr = str(int(self.wait_stamp_sqn))
+    self.rcon.write("STAMP:"+sqnstr+"\n")
+    while True:
+      s = self.rcon.read(1024)
+      for sl in s.splitlines():
+        if (sl == "STAMP="+sqnstr):
+          return
+      time.sleep(1)
+
+  def get_multiline(self, last_line = None, try_cycles = 10):
+    res = ''
+    try_cnt = 0;
+    while True:
+      s = self.rcon.read(10*1024)
+      if len(s) == 0:
+        if last_line is None:
+          return res
+        else:
+           if try_cnt > try_cycles:
+             return None
+           try_cnt = try_cnt + 1
+      try_cnt = 0
+      res = res + s
+      if last_line is not None:
+        for sl in res.splitlines():
+          if (sl == last_line):
+            return res
+
+def rdevice_com_test(rcon):
+  rcon.write("init\n");
+  rcon.write("run\n");
+  wait_stamp(rcon)
+  print("Wait stamp done")
+
+
+if __name__ == '__main__':
+  help_msg = '''SYNOPSIS: dadpyscan.py [-d /dev/ttyXXX]
+        Control LX DAD'''
+
+  parser = argparse.ArgumentParser(description='BlueBot rdevice commander')
+  parser.add_argument('-s', '--skip-setup', dest='skip_setup', action='store_true',
+                      default=False, help='skip hard-home inicialization of rdevice')
+  parser.add_argument('-d', '--tty-device', dest='tty_dev', type=str,
+                    default='/dev/ttyACM0', help='tty line/device to rdevice')
+  parser.add_argument('-a', '--action', dest='action', type = str,
+                      default=None, help='action to run')
+
+  args = parser.parse_args()
+
+  tty_dev = args.tty_dev
+  skip_setup = args.skip_setup
+  action = args.action
+  scan_count = 20
+
+  if action is None:
+    action = 'cyclic'
+
+  random.seed()
+
+  r = rdevice();
+
+  if True:
+    print("Opening %s ...\n" % tty_dev)
+    ser = serial.Serial(tty_dev,
+          baudrate=57600,
+          bytesize=serial.EIGHTBITS,
+          parity=serial.PARITY_NONE,
+          stopbits=serial.STOPBITS_ONE,
+          timeout=0.1)
+
+    r.set_rcon(ser)
+
+    #ser.open()
+
+    #ser.write("SPIMST0:4(%02X,%02X)\n"%((output_data>>8)&0xFF,(output_data)&0xFF))
+    #s = ser.read(1000)
+    #print(s)
+
+    r.init_communication()
+
+  if not skip_setup or (action == 'home'):
+    print("Setup")
+  else:
+    print("Setup skipped")
+
+  print('action:'+action)
+
+  if action == 'cyclic':
+    r.wait_stamp()
+    r.rcon.write("init\n");
+    scan_cnt = 0
+    scan_all=numpy.ndarray(shape=(0,0), dtype=float)
+    while True:
+      r.wait_stamp()
+      r.rcon.write("run\n");
+      s = r.get_multiline(last_line = 'end')
+      if s is None:
+        print("scan read timeout")
+        continue
+      si = s.find('bank')
+      ei = s.find('end')
+      if (si == -1) or (ei == -1):
+        print("bank, end not found")
+        continue
+      si = s.find('\n', si, ei)
+      scan_act=numpy.fromstring(s[si:ei], dtype=float, count=-1, sep=' ')
+      print(scan_act);
+      if scan_all.size == 0:
+        scan_all=numpy.ndarray(shape=(scan_count,scan_act.size), dtype=float)
+      scan_all[scan_cnt,:]=scan_act
+      scan_cnt = scan_cnt + 1
+      if scan_cnt >= scan_count:
+        F=open('scan.dat', mode='wt')
+        numpy.savetxt(F, scan_all, fmt='%8g')
+        F.write('min:')
+        numpy.savetxt(F, numpy.amin(scan_all, axis = 0), fmt='%8g', newline = ' ')
+        F.write('\n')
+        F.write('max:')
+        numpy.savetxt(F, numpy.amax(scan_all, axis = 0), fmt='%8g', newline = ' ')
+        F.write('\n')
+        F.write('std:')
+        numpy.savetxt(F, numpy.std(scan_all, axis = 0), fmt='%8g', newline = ' ')
+        F.write('\n')
+        F.close()
+        exit(0)