-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDownload_GoPro.py
More file actions
executable file
·365 lines (306 loc) · 13.8 KB
/
Download_GoPro.py
File metadata and controls
executable file
·365 lines (306 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# FIXME: Does not work with GoPro MAX and 5Ghz WiFi band
# FIXME: Not deleting images from Hero 10 when transfering via WiFi?
#
# Changes:
# - Upgraded GoProCam to 4.2.0, can now connect WiFi on Hero 10
# - Replaced ble subprocess execution with direct python calls
# - Updated exif_latlon.py
# - WIP: improve modularisation
# - Also support direct transfer of MTP mounted GoPro
# - Run from any directory and write files to "work_dir" from config file
# - Retry on Nominatum failures
import sys
import os
from pathlib import Path
import shutil
import subprocess
import json
from goprocam import GoProCamera, constants
#from exif import Image
from geopy.geocoders import Nominatim
import time
from datetime import datetime
import asyncio
import argparse
import re
from tqdm import tqdm
import exif_latlon
from gopro_ble import main as ble
configFile = Path.home() / ".config" / "goprotransfer.json"
sequences=[]
def RenameSequenceDirectories(sequences):
# Rename directories based on location of first image in each sequence
print("Renaming sequence directories")
print("-------------------------------------------------------\n")
geolocator = Nominatim(user_agent="GoPro_Transfer")
for dirName, fileName in sequences:
fullName=os.path.join(dirName,fileName)
print(f"Renaming {dirName} based on {fileName} ...")
locName=GetLocation(geolocator, fullName)
os.rename(dirName, f"{dirName}_{locName}")
time.sleep(2) # Delay so as to be a good citizen and not abuse nominatim
def GetLocation(geolocator, fullName):
locName="UNKNOWN"
lat,lon = exif_latlon.get_lat_lon(fullName)
if lat is None:
print(f"ERROR: No lat/lon for '{fullName}'")
else:
done = False
retries = 0
locName="UNKNOWN"
while not done:
try:
location = geolocator.reverse((lat, lon))
if 'hamlet' in location.raw['address']:
locName=location.raw['address']['hamlet']
elif 'village' in location.raw['address']:
locName=location.raw['address']['village']
elif 'suburb' in location.raw['address']:
locName=location.raw['address']['suburb']
elif 'town' in location.raw['address']:
locName=location.raw['address']['town']
elif 'city' in location.raw['address']:
locName=location.raw['address']['city']
elif 'county' in location.raw['address']:
locName=location.raw['address']['county']
elif 'state' in location.raw['address']:
locName=location.raw['address']['state']
elif 'name' in location.raw['address']:
locName=location.raw['address']['name']
else:
print(f"WARNING: '{fullName}' No location from '{location.raw}'")
done = True
except Exception as inst:
retries = retries + 1
if retries > 5:
print(f"ERROR: Unable to reverse geocode even after retrying")
done=True
else:
print(f"WARNING: Unable to reverse geocode, exception {type(inst)}, retrying")
time.sleep(2) # Delay so as to be a good citizen and not abuse nominatim
locName=locName.replace(" ","_")
return(locName)
def CreateDir(directory):
if not os.path.exists(directory):
os.makedirs(directory)
#==============================================================================
parser = argparse.ArgumentParser(
prog='Download_GoPro',
description='Transfer files from GoPro to computer')
parser.add_argument('-dm', '--dont-move', help="Don't move files from camera (copy instead), for development use",
action='store_true')
parser.add_argument('Camera', help="Name of camera from which to transfer")
args = parser.parse_args()
if args.dont_move:
print(f"\nCopying instead of moving files\n")
mtp_transfer=shutil.copy
mtp_cleanup=lambda x: print(f"MTP: Not removing dir {x}")
else:
mtp_transfer=shutil.move
mtp_cleanup=os.rmdir
print(f"\n\nTransferring files from GoPro '{args.Camera}'")
print("=======================================================\n")
if not os.path.exists(configFile):
print(f"ERROR: Config file '{configFile}' does not exist")
quit()
# Get camera details from config file
camera = None
config = json.load(open(configFile, "r"))
# WIP: put all files relative to workDir (see dirName below)
workDir = config['work_dir']
for i in config['cameras']:
if i['camera'] == args.Camera:
gopro_bt = i['bt']
gopro_wifi = i['wifi']
gopro_mtp = i['mtp']
camera = i['camera']
#print(f"{camera} {gopro_bt} {gopro_wifi}")
if camera is None:
print(f"No entry for camera '{args.Camera}' in '{configFile}'")
sys.exit(1)
now = datetime.now().strftime("%Y-%m-%d")
dest_dir=os.path.join(workDir,f"{now}_{camera}")
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
print(f"Output directory '{dest_dir}' created.")
else:
count=1;
created=False;
while not created:
dest_dir=os.path.join(workDir,f"{now}_{camera}_{count}")
if os.path.isdir(dest_dir):
count = count+1;
else:
os.mkdir(dest_dir)
print(f"Output directory '{dest_dir}' created.")
created=True
if os.path.exists(gopro_mtp):
print(f"{camera} connected via USB/MTP.")
src_dir=f"{gopro_mtp}/GoPro MTP Client Disk Volume/DCIM"
if os.path.exists(src_dir):
dest_still_dir=os.path.join(dest_dir,f"Stills")
dest_video_dir=os.path.join(dest_dir,f"Video")
dest_seq_root_dir=os.path.join(dest_dir,f"Seq")
sequence_codes=[]
geolocator = Nominatim(user_agent="GoPro_Transfer")
# TODO: Get count of files and print
print(f"Transferring files from beneath '{src_dir}' to '{dest_dir}'...")
# TODO: Lots of efficiency stuff:
# - Compile regexps
# - Keep track of directories created and don't continually check if they exist
# FIXME: Put flat and spherical files into separate directories
# Regexs observed:
# Type Form Camera Regex
# a. Still Flat Max GP__.*\.JPG
# b. Seq Flat Max GP.*\.JPG
# a. Still Spherical Max GS__.*\.JPG
# b. Seq Spherical Max GS.*\.JPG
# c. Video Max GS.*\.360
# - Video Max GS.*\.THM
# - Video Max GS.*\.LRV
#-------------------------------------------
# a. Still Flat Hero10 GOPR.*\.JPG
# b. Seq Flat Hero10 GO.*\.JPG
# c. Video Flat Hero10 GX.*\.MP4
# - Video Flat Hero10 GX.*\.THM
# - Video Flat Hero10 GX.*\.LRV
for root, dirs, files in os.walk(src_dir, topdown=False):
num_files=len(files)
for file in tqdm(files):
src_file=os.path.join(root,file)
# a. Handle individual still image files
# TODO: Put in separate flat and spherical directories
if re.match("GP__.*\\.JPG",file) or re.match("GS__.*\\.JPG",file) or re.match("GOPR.*\\.JPG",file):
CreateDir(dest_still_dir)
tqdm.write(f"Still Image {file} -> {dest_still_dir}")
mtp_transfer(f"{src_file}", dest_still_dir)
# b. Handle image sequence files
# TODO: Put in separate flat and spherical directories
elif re.match("G..*\\.JPG",file):
seq_code="Seq_"+file[:4]
if seq_code not in sequence_codes:
sequence_codes.append(seq_code)
location=GetLocation(geolocator, src_file)
dest_seq_dir=os.path.join(dest_seq_root_dir,seq_code+"_"+location)
tqdm.write(f"Sequence '{seq_code}' at '{location}'")
CreateDir(dest_seq_dir)
mtp_transfer(f"{src_file}", dest_seq_dir)
# c. Handle video files
elif re.match(".*\\.(MP4|360|LRV|THM)",file):
CreateDir(dest_video_dir)
if re.match(".*\\.(MP4|360)",file):
tqdm.write(f"Video {file} -> {dest_video_dir}")
mtp_transfer(f"{src_file}", dest_video_dir)
for directory in dirs:
mtp_cleanup(os.path.join(root,directory))
print(f"MTP Transfer done.")
else:
print(f"ERROR: Could not find GoPro MTP source directory'{src_dir}'")
quit()
# Couldn't connnect via MTP, try BT/Wifi
else:
print(f"'{gopro_mtp} does not exist")
print("Camera not connected via USB/MTP, trying Bluetooth/WiFi")
# Save SSID for currently connected WiFi
results=subprocess.run(["iwgetid","-r"], capture_output=True, text=True)
ssid=results.stdout.rstrip("\n")
# Try connecting to GoPro via Bluetooth and turning on Wifi,
print(f"Establishing BlueTooth connection to GoPro '{camera}'...")
print("-------------------------------------------------------\n")
bt_connected=False
bt_tried=0
while ( (not bt_connected) and (bt_tried < 2)):
try:
bt_tried = bt_tried+1
print(f" Connection attempt {bt_tried}")
asyncio.run(ble.run(gopro_bt, "wifi on"))
# If BT connection fails, prompt user to turn GoPro on and try again...
except:
if (bt_tried < 2):
print(f" Unable to connect via Bluetooth")
input(f" Ensure remote is not connected.\n Power on GoPro '{camera}' and press <ENTER>: ")
else:
bt_connected=True
if (not bt_connected):
print(f"Unable to connect to '{camera}' via Bluetooth")
sys.exit(1)
# Connect computer to GoPro WiFi
print(f"Connecting to GoPro '{camera}' Wifi network '{gopro_wifi}'...")
print("-------------------------------------------------------\n")
results=subprocess.run(["nmcli","c","up", "id", gopro_wifi])
print(f"Status = {results.returncode}")
try:
gpCam = GoProCamera.GoPro()
except:
print(f"Unable to connect to GoPro: {sys.exc_info()[0]}");
else:
# Report camera overview
gpCam.overview()
# Create and change to destination directory
if not os.path.exists(dest_dir):
print(f"Creating directory '{dest_dir}'")
os.makedirs(dest_dir)
else:
print(f"Using existing directory '{dest_dir}'")
print(f"GoPro files will be transferred to {dest_dir}")
print("-------------------------------------------------------\n")
os.chdir(dest_dir)
gopro_media = json.loads(gpCam.listMedia())
for directory in gopro_media["media"]:
src_dir = directory["d"]
for mediaFile in directory["fs"]:
filename = mediaFile["n"];
# If file has a 'b' (begin?) entry, it represents a sequence (timelapse or burst)
if 'b' in mediaFile:
seq_code="Seq_"+filename[2:4]
print(f"DBG: filename {filename} code {seq_code}")
base=filename[:4]
rel_dir_name=seq_code
# Place each sequence in it's own directory
if not os.path.exists(rel_dir_name):
os.makedirs(rel_dir_name)
os.chdir(rel_dir_name)
start=int(mediaFile["b"])
end=int(mediaFile["l"])
print(f"---Download sequence of {end-start} images")
for i in range(start,end+1):
seq_image_filename=f"{base}{i:04d}.JPG"
if i==start:
sequences.append((rel_dir_name, seq_image_filename))
print(f" ---Download sequence image {i-start}/{end-start} ",end=" ")
# FIXME: Stop downloadMedia from printing so a tqdm progress bar can be used instead
gpCam.downloadMedia(src_dir,seq_image_filename)
gpCam.deleteFile(src_dir, seq_image_filename)
os.chdir("..")
else:
# Place non-timelapse files in their own directory
rel_dir_name=f"NonSeq"
if not os.path.exists(rel_dir_name):
os.makedirs(rel_dir_name)
os.chdir(rel_dir_name)
print(f"---Download non-timelapse file ",end=" ")
gpCam.downloadMedia(src_dir,filename)
gpCam.deleteFile(src_dir, filename)
os.chdir("..")
print("Turning off GoPro...")
print("-------------------------------------------------------\n")
gpCam.power_off()
# FIXME: ssid as reported by iwgetid is not always the same as name/id used by nmcli (sometimes it has "Auto" pre-pended)
ssid = f"Auto {ssid}"
if ssid != gopro_wifi:
print(f"Re-connecting to previous WiFi network '{ssid}'...")
print("-------------------------------------------------------\n")
subprocess.run(["nmcli","c","up", "id", ssid])
# FIXME: Wait for network reconnection instead of just waiting
time.sleep(10) # Delay to ensure network reconnection is complete
print("Renaming directories...")
print("-------------------------------------------------------\n")
RenameSequenceDirectories(sequences)
print("-------------------------------------------------------\n")
print(f"Opening file explorer on '{dest_dir}'")
subprocess.Popen(["nemo",dest_dir], start_new_session=True)
print(f"Running gonme-terminal in '{dest_dir}'")
os.chdir(dest_dir)
subprocess.Popen(["gnome-terminal"], start_new_session=True)
print(f"Done.")
time.sleep(30)