From: Pavel Pisa Date: Tue, 27 Oct 2015 20:58:13 +0000 (+0100) Subject: Simple tool to capture scans written in Python. X-Git-Url: https://rtime.felk.cvut.cz/gitweb/fpga/lx-cpu1/lx-dad.git/commitdiff_plain/83356113bc7f2c5d09dfcb8ceaa47240201f9f49 Simple tool to capture scans written in Python. Signed-off-by: Pavel Pisa --- diff --git a/host/app/dadpyscan/dadpyscan.py b/host/app/dadpyscan/dadpyscan.py new file mode 100755 index 0000000..f1170cc --- /dev/null +++ b/host/app/dadpyscan/dadpyscan.py @@ -0,0 +1,168 @@ +#!/usr/bin/python + +import sys +import serial +import time +import numpy +import argparse +import random + +class rdevice(object): + def __init__(self, rcon = None): + self.rcon = rcon; + self.wait_stamp_sqn = random.randint(1, 1000) + + def set_rcon(self, rcon): + if self.rcon is not None: + self.rcon.close(); + self.rcon = None; + self.rcon = rcon; + + def init_communication(self): + s = self.rcon.read(1024) + + self.rcon.write("InvBuff\n") + s = self.rcon.read(1024) + + self.rcon.write("ECHO:0\n") + s = self.rcon.read(1024) + self.rcon.write("VER?\n") + s = self.rcon.read(1024) + print(s); + + # Purge + s = self.rcon.read(2000) + print(s); + + def set_int_param(self, param, val): + valstr=str(int(val)) + self.rcon.write(param+':'+valstr+'\n') + + def wait_stamp(self): + s = self.rcon.read(1024) + self.wait_stamp_sqn = self.wait_stamp_sqn + 1 + sqnstr = str(int(self.wait_stamp_sqn)) + self.rcon.write("STAMP:"+sqnstr+"\n") + while True: + s = self.rcon.read(1024) + for sl in s.splitlines(): + if (sl == "STAMP="+sqnstr): + return + time.sleep(1) + + def get_multiline(self, last_line = None, try_cycles = 10): + res = '' + try_cnt = 0; + while True: + s = self.rcon.read(10*1024) + if len(s) == 0: + if last_line is None: + return res + else: + if try_cnt > try_cycles: + return None + try_cnt = try_cnt + 1 + try_cnt = 0 + res = res + s + if last_line is not None: + for sl in res.splitlines(): + if (sl == last_line): + return res + +def rdevice_com_test(rcon): + rcon.write("init\n"); + rcon.write("run\n"); + wait_stamp(rcon) + print("Wait stamp done") + + +if __name__ == '__main__': + help_msg = '''SYNOPSIS: dadpyscan.py [-d /dev/ttyXXX] + Control LX DAD''' + + parser = argparse.ArgumentParser(description='BlueBot rdevice commander') + parser.add_argument('-s', '--skip-setup', dest='skip_setup', action='store_true', + default=False, help='skip hard-home inicialization of rdevice') + parser.add_argument('-d', '--tty-device', dest='tty_dev', type=str, + default='/dev/ttyACM0', help='tty line/device to rdevice') + parser.add_argument('-a', '--action', dest='action', type = str, + default=None, help='action to run') + + args = parser.parse_args() + + tty_dev = args.tty_dev + skip_setup = args.skip_setup + action = args.action + scan_count = 20 + + if action is None: + action = 'cyclic' + + random.seed() + + r = rdevice(); + + if True: + print("Opening %s ...\n" % tty_dev) + ser = serial.Serial(tty_dev, + baudrate=57600, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=0.1) + + r.set_rcon(ser) + + #ser.open() + + #ser.write("SPIMST0:4(%02X,%02X)\n"%((output_data>>8)&0xFF,(output_data)&0xFF)) + #s = ser.read(1000) + #print(s) + + r.init_communication() + + if not skip_setup or (action == 'home'): + print("Setup") + else: + print("Setup skipped") + + print('action:'+action) + + if action == 'cyclic': + r.wait_stamp() + r.rcon.write("init\n"); + scan_cnt = 0 + scan_all=numpy.ndarray(shape=(0,0), dtype=float) + while True: + r.wait_stamp() + r.rcon.write("run\n"); + s = r.get_multiline(last_line = 'end') + if s is None: + print("scan read timeout") + continue + si = s.find('bank') + ei = s.find('end') + if (si == -1) or (ei == -1): + print("bank, end not found") + continue + si = s.find('\n', si, ei) + scan_act=numpy.fromstring(s[si:ei], dtype=float, count=-1, sep=' ') + print(scan_act); + if scan_all.size == 0: + scan_all=numpy.ndarray(shape=(scan_count,scan_act.size), dtype=float) + scan_all[scan_cnt,:]=scan_act + scan_cnt = scan_cnt + 1 + if scan_cnt >= scan_count: + F=open('scan.dat', mode='wt') + numpy.savetxt(F, scan_all, fmt='%8g') + F.write('min:') + numpy.savetxt(F, numpy.amin(scan_all, axis = 0), fmt='%8g', newline = ' ') + F.write('\n') + F.write('max:') + numpy.savetxt(F, numpy.amax(scan_all, axis = 0), fmt='%8g', newline = ' ') + F.write('\n') + F.write('std:') + numpy.savetxt(F, numpy.std(scan_all, axis = 0), fmt='%8g', newline = ' ') + F.write('\n') + F.close() + exit(0)