-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
467 lines (385 loc) · 16.3 KB
/
main.py
File metadata and controls
467 lines (385 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
#!/usr/bin/env python3
"""
SC2 Driver IO - Main Application (Skeleton)
Multi-threaded coordinator for Solar Car 2 driver IO system
Threading Architecture:
- Core 0: CAN Reception & Processing (high priority, real-time)
- Core 1: GPS & Lap Counter (medium priority, 10Hz updates)
- Core 2: CSV Data Logging (low priority, I/O heavy)
- Core 3: System Management & Telemetry Coordination (variable priority)
External Module Integration:
- lap_counter/: GPS-based lap counting module
- data_logger/: CSV logging to USB drive module
- can_snooper/: CAN communication module
- telemetry/: Integration with existing C++ backend
"""
import threading
import time
import queue
import signal
import sys
import os
import logging
import json
import subprocess
from typing import Optional, Dict, Any
from pathlib import Path
# External module imports (to be implemented separately)
try:
# These modules will be created in separate folders
from lap_counter import LapCounter
from data_logger import CSVDataLogger
from can_interface import CANInterface
# Note: Integration with existing C++ telemetry system TBD
EXTERNAL_MODULES_AVAILABLE = True
except ImportError:
print("WARNING: External modules not available, running in skeleton mode")
EXTERNAL_MODULES_AVAILABLE = False
# Custom log handler for dashboard integration
class DashboardLogHandler(logging.Handler):
"""Custom log handler that writes to shared file for dashboard"""
def __init__(self, log_file_path):
super().__init__()
self.log_file_path = Path(log_file_path)
self.messages = []
self.max_messages = 100
def emit(self, record):
try:
log_entry = {
'timestamp': record.created,
'level': record.levelname,
'message': self.format(record)
}
# Keep only last max_messages
self.messages.append(log_entry)
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
# Write to file atomically
with open(self.log_file_path, 'w') as f:
json.dump({'messages': self.messages}, f)
except Exception:
pass
# Configure logging
log_file = Path(__file__).parent / 'dashboard_logs.json'
dashboard_handler = DashboardLogHandler(log_file)
dashboard_handler.setLevel(logging.DEBUG)
dashboard_handler.setFormatter(logging.Formatter('%(name)s - %(message)s'))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[dashboard_handler]
)
logger = logging.getLogger(__name__)
class DriverIOSystem:
"""Main driver IO system coordinator - simplified skeleton"""
def __init__(self):
self.running = False
self.threads = {}
self.dashboard_process = None
# Initialize external modules (if available)
if EXTERNAL_MODULES_AVAILABLE:
self.can_interface = CANInterface()
self.lap_counter = LapCounter()
self.csv_logger = CSVDataLogger()
else:
# Placeholder objects for skeleton mode
self.can_interface = None
self.lap_counter = None
self.csv_logger = None
# Inter-thread communication queues
self.can_message_queue = queue.Queue(maxsize=1000)
self.gps_data_queue = queue.Queue(maxsize=100)
self.lap_data_queue = queue.Queue(maxsize=50)
self.telemetry_queue = queue.Queue(maxsize=200)
# Setup signal handlers for clean shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals"""
logger.info(f"Received signal {signum}, initiating shutdown...")
self.shutdown()
def can_reception_thread(self):
"""
Thread 0: CAN message reception and processing
High priority, real-time constraints
"""
logger.info("CAN reception thread started")
# Set high priority for real-time processing
try:
os.nice(-10) # Higher priority (Linux)
except:
pass
while self.running:
try:
if self.can_interface:
# Call external CAN interface module
can_message = self.can_interface.receive_message(timeout=0.01)
if can_message:
# Queue message for other threads
try:
self.can_message_queue.put_nowait(can_message)
except queue.Full:
logger.warning("CAN message queue full, dropping message")
else:
# Skeleton mode - simulate CAN messages
self._simulate_can_message()
time.sleep(0.01)
except Exception as e:
logger.error(f"CAN reception thread error: {e}")
time.sleep(0.1)
logger.info("CAN reception thread stopped")
def gps_lap_counter_thread(self):
"""
Thread 1: GPS processing and lap counting
Medium priority, 10Hz updates
"""
logger.info("GPS/Lap counter thread started")
last_update = time.time()
update_interval = 0.1 # 10Hz
while self.running:
try:
current_time = time.time()
if current_time - last_update >= update_interval:
if self.lap_counter:
# Call external lap counter module
# Assume lap_counter.update() returns lap data if section changed
lap_data = self.lap_counter.update_position()
if lap_data:
# Queue lap data for logging and telemetry
try:
self.lap_data_queue.put_nowait(lap_data)
self.telemetry_queue.put_nowait(('lap_data', lap_data))
except queue.Full:
logger.warning("Lap data queue full")
# Send lap data via CAN (if CAN interface available)
if self.can_interface:
self.can_interface.send_lap_data(lap_data)
else:
# Skeleton mode - simulate lap counting
self._simulate_lap_counting()
last_update = current_time
time.sleep(0.01) # Prevent busy waiting
except Exception as e:
logger.error(f"GPS/Lap counter thread error: {e}")
time.sleep(0.1)
logger.info("GPS/Lap counter thread stopped")
def csv_logging_thread(self):
"""
Thread 2: CSV data logging
Low priority, I/O heavy operations
"""
logger.info("CSV logging thread started")
# Set lower priority for I/O operations
try:
os.nice(10) # Lower priority (Linux)
except:
pass
while self.running:
try:
if self.csv_logger:
# Collect data from queues
can_messages = []
lap_data = None
# Batch process CAN messages
try:
while len(can_messages) < 50: # Process in batches
can_msg = self.can_message_queue.get_nowait()
can_messages.append(can_msg)
except queue.Empty:
pass
# Get latest lap data
try:
lap_data = self.lap_data_queue.get_nowait()
except queue.Empty:
pass
# Call external CSV logger module
if can_messages or lap_data:
self.csv_logger.log_data(can_messages, lap_data)
else:
# Skeleton mode - simulate CSV logging
self._simulate_csv_logging()
time.sleep(0.05) # 20Hz processing rate
except Exception as e:
logger.error(f"CSV logging thread error: {e}")
time.sleep(0.1)
logger.info("CSV logging thread stopped")
def system_management_thread(self):
"""
Thread 3: System management and telemetry coordination
Variable priority, health monitoring
"""
logger.info("System management thread started")
last_stats_update = time.time()
stats_interval = 10.0 # 10 second intervals
while self.running:
try:
current_time = time.time()
# Performance monitoring
if current_time - last_stats_update >= stats_interval:
self._log_performance_stats()
last_stats_update = current_time
# System health checks
self._perform_health_checks()
# Process telemetry queue and coordinate with C++ backend
self._process_telemetry_queue()
time.sleep(1.0) # Low frequency management tasks
except Exception as e:
logger.error(f"System management thread error: {e}")
time.sleep(1.0)
logger.info("System management thread stopped")
def _simulate_can_message(self):
"""Skeleton mode: simulate CAN message reception"""
# Create dummy CAN message for testing
dummy_message = {
'id': 0x123,
'data': b'\x01\x02\x03\x04',
'timestamp': time.time()
}
try:
self.can_message_queue.put_nowait(dummy_message)
except queue.Full:
pass
def _simulate_lap_counting(self):
"""Skeleton mode: simulate lap counting"""
# Create dummy lap data for testing
dummy_lap_data = {
'lap_count': 1,
'current_section': 2,
'section_time': 15.5,
'timestamp': time.time()
}
try:
self.lap_data_queue.put_nowait(dummy_lap_data)
except queue.Full:
pass
def _simulate_csv_logging(self):
"""Skeleton mode: simulate CSV logging"""
logger.debug("Simulating CSV logging operation")
# In real implementation, this would call external CSV logger
def _log_performance_stats(self):
"""Log system performance statistics"""
stats = {
'uptime': time.time() - getattr(self, 'start_time', time.time()),
'can_queue_size': self.can_message_queue.qsize(),
'lap_queue_size': self.lap_data_queue.qsize(),
'telemetry_queue_size': self.telemetry_queue.qsize(),
'threads_alive': sum(1 for t in self.threads.values() if t.is_alive())
}
logger.info(f"Performance Stats: {stats}")
def _perform_health_checks(self):
"""Perform system health checks"""
# Check queue sizes
if self.can_message_queue.qsize() > 800:
logger.warning("CAN message queue getting full")
if self.telemetry_queue.qsize() > 150:
logger.warning("Telemetry queue getting full")
# Check thread health
for name, thread in self.threads.items():
if not thread.is_alive():
logger.error(f"Thread {name} is not alive!")
def _process_telemetry_queue(self):
"""Process telemetry data and coordinate with C++ backend"""
try:
while True:
try:
data_type, data = self.telemetry_queue.get_nowait()
# TODO: Integration with existing C++ telemetry system
# This is where we'd call into the preserved C++ backend
logger.debug(f"Processing telemetry: {data_type}")
except queue.Empty:
break
except Exception as e:
logger.error(f"Telemetry processing error: {e}")
def start(self):
"""Start all system threads"""
logger.info("Starting Driver IO System...")
# Launch the textual dashboard in a separate process
dashboard_script = Path(__file__).parent / 'textual_frontend' / 'textual_dashboard.py'
if dashboard_script.exists():
logger.info("Launching Textual Dashboard...")
try:
self.dashboard_process = subprocess.Popen(
[sys.executable, str(dashboard_script)],
cwd=str(dashboard_script.parent)
)
logger.info("Dashboard launched successfully")
time.sleep(1) # Give dashboard time to start
except Exception as e:
logger.error(f"Failed to launch dashboard: {e}")
else:
logger.warning("Dashboard script not found, running without UI")
self.running = True
self.start_time = time.time()
# Create and start threads
self.threads['can_reception'] = threading.Thread(
target=self.can_reception_thread,
name="CAN-RX",
daemon=True
)
self.threads['gps_lap_counter'] = threading.Thread(
target=self.gps_lap_counter_thread,
name="GPS-LAP",
daemon=True
)
self.threads['csv_logging'] = threading.Thread(
target=self.csv_logging_thread,
name="CSV-LOG",
daemon=True
)
self.threads['system_management'] = threading.Thread(
target=self.system_management_thread,
name="SYS-MGR",
daemon=True
)
# Start all threads
for thread in self.threads.values():
thread.start()
logger.info("All threads started successfully")
logger.info("Driver IO System is running...")
def shutdown(self):
"""Shutdown all system components"""
logger.info("Shutting down Driver IO System...")
self.running = False
# Terminate dashboard process
if self.dashboard_process:
logger.info("Shutting down dashboard...")
try:
self.dashboard_process.terminate()
self.dashboard_process.wait(timeout=5)
except Exception as e:
logger.warning(f"Error shutting down dashboard: {e}")
try:
self.dashboard_process.kill()
except:
pass
# Wait for threads to finish
for name, thread in self.threads.items():
logger.info(f"Waiting for {name} thread to stop...")
thread.join(timeout=5.0)
if thread.is_alive():
logger.warning(f"{name} thread did not stop gracefully")
# Cleanup external modules
if self.can_interface:
self.can_interface.shutdown()
if self.csv_logger:
self.csv_logger.shutdown()
logger.info("Driver IO System shutdown complete")
def main():
"""Main entry point"""
logger.info("SC2 Driver IO System Starting...")
# Create and start the system
system = DriverIOSystem()
try:
system.start()
# Keep main thread alive
while system.running:
time.sleep(1.0)
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
system.shutdown()
logger.info("SC2 Driver IO System stopped")
if __name__ == "__main__":
main()