-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathresponse.py
More file actions
181 lines (146 loc) · 5.92 KB
/
response.py
File metadata and controls
181 lines (146 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from serial import Serial
from signal import signal, SIGINT, SIGTERM
import sys
from time import time, sleep
from typing import Final
from utils_mac import contsruct_payload_from_json, get_json_from_packet, PACKET_HEADER, PACKET_FOOTER
from serial.tools.list_ports import comports
_RW_DO_EXIT = False
_RW_SERIAL_CONN : Serial = None
# IMPORTANT: at one point, around software version < 13.10, we changed the code on the
# InterceptIR that no longer required us to set the IS_OLD_INTERCEPTIR flag to true.
IS_OLD_INTERCEPTIR = False
PORT_DESCRIPTION = 'Gadget Serial'
PORT_NAME = None
def exit_handler( sig, frame ) -> None:
global _RW_DO_EXIT
print( f'Signal caught: {sig}' )
_RW_DO_EXIT = True
sys.exit(0)
def openSerial():
global PORT_NAME, _RW_SERIAL_CONN
_RW_SERIAL_CONN = Serial(
PORT_NAME,
baudrate=115200,
timeout=None
)
def closeSerial() -> None:
global _RW_SERIAL_CONN
if _RW_SERIAL_CONN and _RW_SERIAL_CONN.is_open:
print( 'Closing serial connection...' )
_RW_SERIAL_CONN.close()
def readSerial():
global _RW_SERIAL_CONN, _RW_DO_EXIT
buffer:bytes = b''
timeFirstByte = None
# Local function for handling invalid packet structure
def _readError( msg: str ):
print( msg )
_RW_SERIAL_CONN.read_all() # flush read buffer
# Read serial buffer one byte at a time.
# We explicitly AVOID use of `Serial.read_until(...)` due to existing bug(s) which lead to undefined behavior.
while not _RW_DO_EXIT:
# Read one byte
byte:bytes = _RW_SERIAL_CONN.read( 1 ) # 0.2s timeout (set upon Serial obj construction)
# If read timed out, read again
if len( byte ) == 0:
continue
# Ensure first two bytes are the packet header
elif ( buffer == b'' ) and ( byte != PACKET_HEADER[0].to_bytes(1, "little", signed=False) ): # Accessing an array of bytes by index returns an int, so we re-cast to bytes for comparison
_readError( f'FIRST BYTE WAS {byte}, NOT {PACKET_HEADER[0].to_bytes(1, "little", signed=False)}' )
return None
elif ( buffer == PACKET_HEADER[0].to_bytes(1, "little", signed=False) ) and ( byte != PACKET_HEADER[1].to_bytes(1, "little", signed=False) ):
_readError( f'SECOND BYTE WAS {byte}, NOT {PACKET_HEADER[1].to_bytes(1, "little", signed=False)}' )
return None
# Append byte to packet buffer
buffer += byte
# If the packet has taken longer than 2 seconds to receive, consider it invalid and exit loop
if timeFirstByte == None:
timeFirstByte = time()
elif time() - timeFirstByte > 3.0:
_readError( 'PACKET STILL INCOMPLETE AFTER 3.0 SECONDS' )
return None
# If we've received the packet footer, stop reading
elif buffer.endswith( PACKET_FOOTER ):
break
if _RW_DO_EXIT:
return None
# Parse JSON message from packet
return get_json_from_packet( buffer, do_print=False )
def wait_for_app():
print('wait_for_app')
# Decided to give user 1/2s interval feedback that script is still running
wait_str = 'Waiting for app and API to load. Please wait... '
start : Final[float] = time()
cutoff : int = -3
while True:
print( ' ' * len(wait_str), end='\r' ) # erase line
print( wait_str[:cutoff], end='\r' ) # update line
sleep( 0.5 )
if _RW_DO_EXIT:
closeSerial()
return 0
elif time() - start >= 60.0:
print() # newline
break
elif cutoff == -1:
cutoff = -3
else:
cutoff += 1
def main():
global PORT_NAME
print('main')
signal( SIGINT, exit_handler )
signal( SIGTERM, exit_handler )
json:str|None = None
while PORT_NAME is None:
ports = comports()
for port, desc, hwid in ports:
if PORT_DESCRIPTION in desc:
PORT_NAME = port
break
sleep(0.5)
print('Waiting for port information...')
if IS_OLD_INTERCEPTIR:
print( 'Waiting for InterceptIR to open serial connection...' )
# Wait for IIR to open serial port.
# Once open, do not send any commands but instead listen for API mode request
while True:
try:
openSerial()
break
except IOError as e:
continue
# Wait for API mode request
print( 'Waiting for API mode request...' )
json = readSerial()
if _RW_DO_EXIT:
closeSerial()
return 0
elif json == '{"request":"which_mode"}':
# Choose USB mode
_RW_SERIAL_CONN.write( contsruct_payload_from_json('{"mode":"usb"}') )
print( 'Selecting USB API mode...' )
else:
print( f'Expected API mode request but instead received: "{json}"\nQuitting...' )
closeSerial()
return 1
# # Give the app and API time to load...
# # print( 'Waiting for app and API to load. Please wait...' )
wait_for_app()
else:
openSerial()
if _RW_SERIAL_CONN == None:
print('Could not open port')
return False
print('Ready to receive API commands...')
while True:
if _RW_SERIAL_CONN.in_waiting > 0:
resp_packet = _RW_SERIAL_CONN.read_until(PACKET_FOOTER)
print('- - - Response from API - - -')
print(f'packet (raw): {resp_packet}')
print('- - - - - - - - - -')
resp_json = get_json_from_packet(resp_packet)
print(resp_json)
if __name__ == '__main__':
main()