-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathset_waypoints.py
More file actions
244 lines (209 loc) · 8.53 KB
/
set_waypoints.py
File metadata and controls
244 lines (209 loc) · 8.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""
This script is used to set waypoints for a vehicle using MAVLink.
It connects to the vehicle, loads waypoints from a file, uploads them to the vehicle, arms the vehicle, initiates takeoff, and starts the mission.
"""
from pymavlink import mavutil
import time
from ravage import load_config
# Connect to the vehicle
master = mavutil.mavlink_connection('udp:127.0.0.1:14550')
master.wait_heartbeat()
print("Heartbeat received, connection established!")
def load_waypoints(filename):
"""
Reads waypoints from a file and parses them into a list of dictionaries.
Args:
filename (str): The path to the waypoint file.
Returns:
list: A list of dictionaries, each representing a waypoint.
"""
waypoints = []
with open(filename, 'r') as f:
lines = f.readlines()
# Check the file header
if not lines[0].startswith('QGC WPL 110'):
raise ValueError("Invalid waypoint file format")
# Parse waypoints
for i, line in enumerate(lines[1:]):
values = line.strip().split()
if len(values) != 12:
raise ValueError(f"Invalid waypoint format at line {i+2}: {line}")
waypoint = {
'seq': int(values[0]),
'current': int(values[1]),
'frame': int(values[2]),
'command': int(values[3]),
'param1': float(values[4]),
'param2': float(values[5]),
'param3': float(values[6]),
'param4': float(values[7]),
'x_lat': float(values[8]),
'y_long': float(values[9]),
'z_alt': float(values[10]),
'autocontinue': int(values[11])
}
waypoints.append(waypoint)
return waypoints
def upload_waypoints(waypoints):
"""
Uploads a list of waypoints to the vehicle.
Args:
waypoints (list): A list of dictionaries, each representing a waypoint.
"""
print("Starting waypoint upload...")
# Step 1: Clear existing waypoints
master.waypoint_clear_all_send()
print("Cleared existing waypoints.")
# Step 2: Send total waypoint count
time.sleep(1) # Small delay for safety
master.waypoint_count_send(len(waypoints))
print(f"Sending waypoint count: {len(waypoints)}")
# Step 3: Send each waypoint
for wp in waypoints:
master.mav.mission_item_send(
master.target_system, # Target system
master.target_component, # Target component
wp['seq'], # Sequence number
wp['frame'], # Coordinate frame
wp['command'], # MAV_CMD
wp['current'], # Current waypoint
wp['autocontinue'], # Autocontinue
wp['param1'], wp['param2'], wp['param3'], wp['param4'], # Params
wp['x_lat'], # Latitude
wp['y_long'], # Longitude
wp['z_alt'] # Altitude
)
print(f"Uploaded waypoint {wp['seq']} to ({wp['x_lat']}, {wp['y_long']}, {wp['z_alt']})")
time.sleep(0.1) # Short delay between waypoints
# Step 4: Wait for MISSION_ACK
ack_received = False
while not ack_received:
ack_msg = master.recv_match(type='MISSION_ACK', blocking=True)
if ack_msg:
print(f"Mission ACK received: {ack_msg}")
ack_received = True
print("Waypoint upload complete!")
def arm_and_takeoff(altitude):
"""
Arms the vehicle, switches to GUIDED mode, and initiates takeoff to the specified altitude.
Args:
altitude (float): The target altitude for takeoff in meters.
"""
print("Setting mode to GUIDED...")
master.set_mode('GUIDED')
print("Mode set to GUIDED.")
# Wait for mode confirmation
while True:
msg = master.recv_match(type='HEARTBEAT', blocking=True)
if msg.custom_mode == master.mode_mapping()['GUIDED']:
print("Mode confirmed as GUIDED.")
break
time.sleep(1)
# Wait for home position (valid GPS lock)
print("Waiting for home position to be set (valid GPS fix)...")
while True:
msg = master.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
if msg and msg.lat != 0 and msg.lon != 0:
print(f"Home position set: Latitude={msg.lat / 1e7}, Longitude={msg.lon / 1e7}")
break
else:
print("Waiting for valid GPS fix...")
time.sleep(1)
# Arm the motors
print("Arming motors...")
master.arducopter_arm()
print("Motors armed.")
# # Send takeoff command
# print(f"Sending takeoff command to {altitude} meters...")
# master.mav.command_long_send(
# master.target_system, master.target_component,
# mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, # Command
# 0, 0, 0, 0, 0, 0, 0, altitude # Altitude
# )
# # Wait for acknowledgment of takeoff command
# while True:
# msg = master.recv_match(type='COMMAND_ACK', blocking=True)
# if msg.command == mavutil.mavlink.MAV_CMD_NAV_TAKEOFF:
# if msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
# print("Takeoff command accepted.")
# break
# else:
# print("Takeoff command failed. Retrying...")
# raise Exception("NAV_TAKEOFF command failed. Check GPS and home position.")
# time.sleep(1)
# Monitor altitude to confirm takeoff
# print("Takeoff initiated. Monitoring altitude...")
# while True:
# msg = master.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
# altitude_current = msg.relative_alt / 1000.0 # Altitude in meters
# print(f"Current altitude: {altitude_current:.2f} meters")
# if altitude_current >= altitude * 0.95: # 95% of target altitude
# print("Target altitude reached!")
# break
# time.sleep(1)
def start_mission():
"""
Starts the mission execution from the first waypoint.
"""
print("Starting mission...")
master.mav.command_long_send(
master.target_system,
master.target_component,
mavutil.mavlink.MAV_CMD_MISSION_START, # Command to start the mission
0, # Confirmation
0, # First waypoint to start from
0, 0, 0, 0, 0, 0 # Unused parameters
)
# Wait for confirmation
while True:
msg = master.recv_match(type='COMMAND_ACK', blocking=True)
if msg and msg.command == mavutil.mavlink.MAV_CMD_MISSION_START:
if msg.result == mavutil.mavlink.MAV_RESULT_ACCEPTED:
print("Mission started successfully!")
break
else:
print("Mission start command failed. Retrying...")
raise Exception("MAV_CMD_MISSION_START command failed.")
time.sleep(1)
def monitor_position():
"""
Monitors the vehicle's position (latitude, longitude, altitude) and logs when it starts moving towards a waypoint.
"""
print("Monitoring vehicle position...")
while True:
msg = master.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
if msg:
lat = msg.lat / 1e7 # Convert from int32 to degrees
lon = msg.lon / 1e7 # Convert from int32 to degrees
alt = msg.relative_alt / 1000.0 # Convert from mm to meters
print(f"Current Position - Latitude: {lat:.6f}, Longitude: {lon:.6f}, Altitude: {alt:.2f}m")
# Check if the vehicle is moving (simple check for significant latitude/longitude changes)
if lat != 0 and lon != 0:
print(f"Vehicle is moving towards a waypoint. Position - Lat: {lat}, Lon: {lon}")
break
time.sleep(1) # Adjust the sleep interval as needed
def main():
"""
Main function to execute the waypoint setting process.
"""
try:
config_path = "config/autopilot_config.yaml"
config = load_config(config_path)
wp_file = config.get('WAYPOINT_FILE', 'square_copter.txt')
# Load waypoints from file
waypoint_file = 'waypoints/' + wp_file
waypoints = load_waypoints(waypoint_file)
print(f"Loaded waypoints: {waypoints}")
# Upload waypoints to the vehicle
upload_waypoints(waypoints)
# Arm and takeoff
arm_and_takeoff(altitude=100) # Takeoff to 100 meters
# Start the mission
start_mission()
#TODO: Buggy, need to fix
#monitor_position()
print("Mission execution in progress...")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()