-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxillybus_accel.py
59 lines (44 loc) · 1.3 KB
/
xillybus_accel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import numpy
import threading
import time
class WriteThread(threading.Thread):
def __init__(self, data, device):
threading.Thread.__init__(self)
self.data = data
self.device = device
def run(self):
w = open(self.device,'w')
w.write(self.data)
w.close()
class XillybusPipe():
def __init__(self, nbits):
self.nbits = nbits
self.open()
def open(self):
self.r = open('/dev/xillybus_read_%d' % self.nbits,'r')
self.w = open('/dev/xillybus_write_%d' % self.nbits, 'w')
def process(self, data_in):
total_size = data_in.size * data_in.dtype.itemsize
start_time = time.time()
self.w.write(data_in)
result = self.r.read(total_size)
stop_time = time.time()
self.last_time = stop_time-start_time
return result
def read(self, n_bytes=None):
start_time = time.time()
if (n_bytes is None):
result = self.r.read()
else:
result = self.r.read(n_bytes)
stop_time = time.time()
self.last_time = stop_time-start_time
return result
def close(self):
self.r.close()
self.w.close()
def xb_accel(data_array, nbits):
r = open('/dev/xillybus_read_%d' % nbits,'r')
w = WriteThread(data_array, '/dev/xillybus_write_%d' % nbits)
w.start()
return r.read(data_array.size * data_array.dtype.itemsize)