-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcomm.py
More file actions
141 lines (105 loc) · 3.87 KB
/
comm.py
File metadata and controls
141 lines (105 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Serial port comm routines
$Id: comm.py 861 2018-03-15 23:44:30Z rnee $
"""
import time
from typing import Optional, TextIO
class Comm:
"""Serial port wrapper that supports logging and mocking"""
def __init__(self, ser, logf: Optional[TextIO]=None):
self.ser = ser
self.logf = logf
self.time0 = time.time()
self.read_count = 0
self.write_count = 0
# log initial state of lines
self._log(b'\x01' if ser.dtr else b'\x00', "DTR")
self._log(b'\x01' if ser.dsr else b'\x00', "DSR")
self._log(b'\x01' if ser.cts else b'\x00', "CTS")
self._log(b'\x01' if ser.rts else b'\x00', "RTS")
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
@property
def port(self):
return self.ser.port
def read(self, request=None):
""" read a requested number of bytes. If request is missing or zero read available """
if not request:
request = self.avail()
data = self.ser.read(request)
total = len(data)
self.read_count += total
if total > 0:
self._log(data, "READ")
else:
self._log(data, "TIME")
return total, data
def read_line(self):
""""read line up to newline character or timeout"""
data = self.ser.readline()
self._log(data, "RDLN")
return data
def write(self, data):
"""Write bytes"""
self.ser.write(data)
self.write_count += len(data)
self._log(data, "WRIT")
def avail(self):
""" return number of bytes available to read """
return self.ser.in_waiting
def flush(self):
"""flush input buffer"""
waiting = self.avail()
if waiting > 0:
(count, data) = self.read(waiting)
self._log(data, "DISC")
def close(self):
"""close port and log file"""
self.ser.close()
self.ser = None
if self.logf:
self.logf.write("\nbytes read: {} bytes writen: {}".format(
self.read_count, self.write_count))
def dtr_active(self, state):
"""set state of DTR line"""
self.ser.dtr = state
self._log(b'high' if state else b'low', "ADTR")
def pulse_dtr(self, duration: float=0.001):
"""pulse DTR line duration in seconds"""
self.ser.dtr = True
time.sleep(duration)
self.ser.dtr = False
self._log(bytes("{0}".format(duration), 'utf-8'), "PDTR")
def pulse_rts(self, duration: float=0.001):
"""pulse DTR line duration in seconds. Autodetects polarity"""
init_state = self.ser.rts
self.ser.rts = not init_state
time.sleep(duration)
self.ser.rts = init_state
self._log(bytes("{0}".format(duration), 'utf-8'), "PRTS")
def pulse_break(self, duration):
"""send a break"""
self.ser.send_break(duration)
self._log(bytes("{0}".format(duration), 'utf-8'), "PBRK")
def set_timeout(self, duration):
"""set timeout in seconds"""
self.ser.timeout = duration
self._log(bytes("{0}".format(duration), 'utf-8'), "STTO")
def _log(self, data: bytes, desc: str):
"""write a log string"""
if self.logf:
timex = time.time() - self.time0
self.logf.write("%8.3f %4s %3d : " %
(timex, desc, len(data)))
for n, c in enumerate(data):
if ord(' ') <= c <= ord('~'):
self.logf.write("%02x[%s] " % (c, chr(c)))
else:
self.logf.write("%02x[ ] " % c)
if (n + 1) % 16 == 0:
self.logf.write("\n : ")
self.logf.write("\n")
avail = self.avail()
if avail > 0:
self.logf.write("%8.3f INBF %3d\n" % (timex, avail))