This repository was archived by the owner on Nov 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathflight_interface_worker.py
More file actions
80 lines (64 loc) · 2.15 KB
/
flight_interface_worker.py
File metadata and controls
80 lines (64 loc) · 2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
Gets odometry information from drone.
"""
import inspect
import os
import pathlib
import queue
import time
from utilities.workers import queue_proxy_wrapper
from utilities.workers import worker_controller
from . import flight_interface
from ..logger import logger
def flight_interface_worker(
address: str,
timeout: float,
baud_rate: int,
period: float,
output_queue: queue_proxy_wrapper.QueueProxyWrapper,
most_recent_odometry_queue: queue_proxy_wrapper.QueueProxyWrapper,
controller: worker_controller.WorkerController,
) -> None:
"""
Worker process.
address, timeout is initial setting.
period is minimum period between loops.
output_queue is the data queue.
controller is how the main process communicates to this worker process.
"""
if most_recent_odometry_queue.maxsize != 1:
print("ERROR: most_recent_odometry_queue must have a maximum size of 1")
return
# TODO: Error handling
worker_name = pathlib.Path(__file__).stem
process_id = os.getpid()
result, local_logger = logger.Logger.create(f"{worker_name}_{process_id}", True)
if not result:
print("ERROR: Worker failed to create logger")
return
# Get Pylance to stop complaining
assert local_logger is not None
frame = inspect.currentframe()
local_logger.info("Logger initialized", frame)
result, interface = flight_interface.FlightInterface.create(
address, timeout, baud_rate, local_logger
)
if not result:
frame = inspect.currentframe()
local_logger.error("Worker failed to create class object", frame)
return
# Get Pylance to stop complaining
assert interface is not None
while not controller.is_exit_requested():
controller.check_pause()
time.sleep(period)
result, value = interface.run()
if not result:
continue
# Replace any existing odometry data with the latest odometry data
try:
most_recent_odometry_queue.queue.get_nowait()
except queue.Empty:
pass
most_recent_odometry_queue.queue.put(value)
output_queue.queue.put(value)