]> rtime.felk.cvut.cz Git - fpga/lx-cpu1/lx-dad.git/blob - host/app/dadpyscan/dadpyscan.py
Graph plot added into Python test software.
[fpga/lx-cpu1/lx-dad.git] / host / app / dadpyscan / dadpyscan.py
1 #!/usr/bin/python
2
3 import sys
4 import serial
5 import time
6 import numpy
7 import argparse
8 import random
9 import time
10 import matplotlib
11 matplotlib.use('GTKAgg')
12 #matplotlib.interactive(True)
13 from matplotlib import pyplot as plt
14
15 class rdevice(object):
16   def __init__(self, rcon = None):
17     self.rcon = rcon;
18     self.wait_stamp_sqn = random.randint(1, 1000)
19
20   def set_rcon(self, rcon):
21     if self.rcon is not None:
22       self.rcon.close();
23       self.rcon = None;
24     self.rcon = rcon;
25
26   def init_communication(self):
27     s = self.rcon.read(1024)
28
29     self.rcon.write("InvBuff\n")
30     s = self.rcon.read(1024)
31
32     self.rcon.write("ECHO:0\n")
33     s = self.rcon.read(1024)
34     self.rcon.write("VER?\n")
35     s = self.rcon.read(1024)
36     print(s);
37
38     # Purge
39     s = self.rcon.read(2000)
40     print(s);
41
42   def set_int_param(self, param, val):
43     valstr=str(int(val))
44     self.rcon.write(param+':'+valstr+'\n')
45
46   def wait_stamp(self):
47     s = self.rcon.read(1024)
48     self.wait_stamp_sqn = self.wait_stamp_sqn + 1
49     sqnstr = str(int(self.wait_stamp_sqn))
50     self.rcon.write("STAMP:"+sqnstr+"\n")
51     while True:
52       s = self.rcon.read(1024)
53       for sl in s.splitlines():
54         if (sl == "STAMP="+sqnstr):
55           return
56       time.sleep(1)
57
58   def get_multiline(self, last_line = None, try_cycles = 10):
59     res = ''
60     try_cnt = 0;
61     while True:
62       s = self.rcon.read(10*1024)
63       if len(s) == 0:
64         if last_line is None:
65           return res
66         else:
67            if try_cnt > try_cycles:
68              return None
69            try_cnt = try_cnt + 1
70       try_cnt = 0
71       res = res + s
72       if last_line is not None:
73         for sl in res.splitlines():
74           if (sl == last_line):
75             return res
76
77   def get_multiline_vector(self, command = None, first_line = 'begin',
78                           last_line = 'end', try_cycles = 10):
79     self.wait_stamp()
80     self.rcon.write(command);
81     s = self.get_multiline(last_line = last_line)
82     if s is None:
83       print("scan read timeout")
84       return None
85     si = s.find(first_line)
86     ei = s.find(last_line)
87     if (si == -1) or (ei == -1):
88       print("begin, end not found")
89       return None
90     si = s.find('\n', si, ei)
91     return numpy.fromstring(s[si:ei], dtype=float, count=-1, sep=' ')
92
93 def rdevice_com_test(rcon):
94   rcon.write("init\n");
95   rcon.write("run\n");
96   wait_stamp(rcon)
97   print("Wait stamp done")
98
99
100 if __name__ == '__main__':
101   help_msg = '''SYNOPSIS: dadpyscan.py [-d /dev/ttyXXX]
102         Control LX DAD'''
103
104   parser = argparse.ArgumentParser(description='BlueBot rdevice commander')
105   parser.add_argument('-s', '--skip-setup', dest='skip_setup', action='store_true',
106                       default=False, help='skip hard-home inicialization of rdevice')
107   parser.add_argument('-d', '--tty-device', dest='tty_dev', type=str,
108                     default='/dev/ttyACM0', help='tty line/device to rdevice')
109   parser.add_argument('-a', '--action', dest='action', type = str,
110                       default=None, help='action to run')
111
112   args = parser.parse_args()
113
114   tty_dev = args.tty_dev
115   skip_setup = args.skip_setup
116   action = args.action
117   scan_count = 20
118
119   if action is None:
120     action = 'cyclic'
121
122   random.seed()
123
124   r = rdevice();
125
126   if True:
127     print("Opening %s ...\n" % tty_dev)
128     ser = serial.Serial(tty_dev,
129           baudrate=57600,
130           bytesize=serial.EIGHTBITS,
131           parity=serial.PARITY_NONE,
132           stopbits=serial.STOPBITS_ONE,
133           timeout=0.1)
134
135     r.set_rcon(ser)
136
137     #ser.open()
138
139     #ser.write("SPIMST0:4(%02X,%02X)\n"%((output_data>>8)&0xFF,(output_data)&0xFF))
140     #s = ser.read(1000)
141     #print(s)
142
143     r.init_communication()
144
145   if not skip_setup or (action == 'home'):
146     print("Setup")
147   else:
148     print("Setup skipped")
149
150   print('action:'+action)
151
152   if action == 'cyclic':
153     r.wait_stamp()
154     r.rcon.write("init\n");
155     scan_cnt = 0
156     scan_all=numpy.ndarray(shape=(0,0), dtype=float)
157     while True:
158       scan_act=r.get_multiline_vector(command = 'run\n',
159                         first_line = 'bank', last_line = 'end')
160       if scan_act is None:
161         continue
162       print(scan_act)
163       if scan_all.size == 0:
164         scan_all=numpy.ndarray(shape=(scan_count,scan_act.size), dtype=float)
165       scan_all[scan_cnt,:]=scan_act
166       scan_cnt = scan_cnt + 1
167       if scan_cnt >= scan_count:
168         F=open('scan.dat', mode='wt')
169         numpy.savetxt(F, scan_all, fmt='%8g')
170         F.write('min:')
171         numpy.savetxt(F, numpy.amin(scan_all, axis = 0), fmt='%8g', newline = ' ')
172         F.write('\n')
173         F.write('max:')
174         numpy.savetxt(F, numpy.amax(scan_all, axis = 0), fmt='%8g', newline = ' ')
175         F.write('\n')
176         F.write('std:')
177         numpy.savetxt(F, numpy.std(scan_all, axis = 0), fmt='%8g', newline = ' ')
178         F.write('\n')
179         F.close()
180         exit(0)
181
182   if action == 'plot':
183     plt_block = True
184
185     if not plt_block:
186       fig, ax = plt.subplots(1, 1)
187       plt.show(False)
188       plt.draw()
189     r.wait_stamp()
190     r.rcon.write("init\n");
191     scan_cnt = 0
192     scan_last = None;
193     plt_first = True
194
195     while True:
196       scan_act=r.get_multiline_vector(command = 'run\n',
197                         first_line = 'bank', last_line = 'end')
198       if scan_act is None:
199         continue
200       print(scan_act)
201       if scan_last is None:
202         scan_last = scan_act
203       if plt_block or plt_first:
204         fig, ax = plt.subplots(1, 1)
205         last_line = ax.plot(range(len(scan_last)), scan_last, 'y')[0]
206         act_line = ax.plot(range(len(scan_act)), scan_act, 'r')[0]
207         plt_first = False
208       else:
209         last_line.set_data(range(len(scan_last)), scan_last)
210         act_line.set_data(range(len(scan_act)), scan_act)
211         #act_line.set_xdata(scan_act)
212       fig.canvas.draw()
213       plt.show(plt_block)
214
215       scan_last = scan_act
216       time.sleep(1)