-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
executable file
·294 lines (276 loc) · 11.1 KB
/
worker.py
File metadata and controls
executable file
·294 lines (276 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from os import getpid
from pathlib import Path
from time import strftime, sleep, perf_counter
from datetime import timedelta
from rc_classes import Logger as Log
from rc_classes import RoboCopy, HashThread, FileHash, Size, NormString
class Copy:
'''Copy files using RoboCopy'''
def __init__(self, src_paths, dst_path, settings, config, labels,
simulate = False,
echo = print,
kill = None
):
'''Pass arguments to worker'''
self._settings = settings
self._config = config
self._labels = labels
self._errors = list() # list of errors
self._src_paths = src_paths # given source paths
self._dst_path = dst_path.absolute() # given destination path
self._simulate = simulate # True to run robocopy with /l = only list files, do not copy app_path, labels,
self._echo = echo # method to show messages (print or from gui)
self._kill = kill # event to stop copy process
self._logger = Log(self._echo, self._settings, self._config)
def _verify_by_size(self):
'''Verify copied files by size'''
Log.info(self._labels.starting_size_verification)
for cnt, (src_path, src_size, dst_path) in enumerate(self._files, start=1):
if self._check_kill_signal():
return True
self._echo_file_progress(cnt)
dst_size = Size(dst_path.stat().st_size)
if dst_size != src_size:
Log.warning(self._labels.mismatching_sizes.replace('#',
f'{src_path}: {src_size}, {dst_path}: {dst_size}')
)
self._bad_files[dst_path] = dst_size
Log.info(self._labels.size_check_finished)
def _verify_by_hash(self):
'''Verify copied files by hash'''
processed_size = Size(0)
Log.info(self._labels.starting_hash_verification)
for cnt, hash_set in enumerate(self._hash_thread.files, start=1):
if self._check_kill_signal():
return True
self._echo_size_progress(cnt, processed_size)
dst_hash = FileHash.hashsum(hash_set['dst_path'], algorithm=self._verify)
if dst_hash != hash_set[self._verify]:
Log.warning(self._labels.mismatching_hashes.replace('#',
f'{hash_set["src_path"]}: {hash_set[self._verify]}, {hash_set["dst_path"]}: {dst_hash}')
)
self._bad_files[dst_path] = dst_hash
processed_size += hash_set['src_size']
Log.info(self._labels.hash_check_finished)
def _echo_simulation(self, fh=None):
'''Show what would be copied'''
Log.info(self._labels.starting_simulation)
for src_path, size, dst_path in self._files:
if self._check_kill_signal():
return True
msg = f'{src_path} ({Size(size)}) \u2192 {dst_path}'
if dst_path.exists():
self._echo(f'\u26A0 {msg}, {self._labels.existing}')
self._bad_files[src_path] = dst_path
else:
self._echo(f'\u2713 {msg}')
Log.info(self._labels.finisjed_simulation)
def _wait_hashing(self):
'''Wait for hash thread to finish'''
index = 0
while self._hash_thread.is_alive():
if self._check_kill_signal():
return True
self._echo(f'{"|/-\\"[index]} ', end='\r')
index += 1
if index > 3:
index = 0
sleep(.25)
def _write_collisions(self, fh):
'''Write TSV with possible collisions / files that might be overwritten (simulation)'''
print('src_path\tsrc_size\tdst_path\tdst_exists', file=fh)
for src_path, src_size, dst_path in self._files:
if self._check_kill_signal():
return True
line = f'{src_path}\t{src_size}\t{dst_path}\t'
if src_path in self._bad_files:
line += 'exists'
print(line, file=fh)
def _write_sizes(self, fh):
'''Write TSV without any verification)'''
print('src_path\tsrc_size\tdst_path', file=fh)
for src_path, src_size, dst_path in self._files:
if self._check_kill_signal():
return True
print(f'{src_path}\t{src_size}\t{dst_path}', file=fh)
def _write_bad_sizes(self, fh):
'''Write TSV with files that have different sizes (simulation)'''
print('src_path\tsrc_size\tdst_path\tbad_dst_size', file=fh)
for src_path, src_size, dst_path in self._files:
if self._check_kill_signal():
return True
line = f'{src_path}\t{src_size}\t{dst_path}\t'
if src_path in self._bad_files:
line += f'{self._bad_files[src_path]}'
print(line, file=fh)
def _write_hashes(self, fh):
'''Write TSV with hashes of copied files'''
print(f'{"\t".join(self._hash_thread.keys)}', file=fh)
for hash_set in self._hash_thread.files:
if self._check_kill_signal():
return True
print(f'{"\t".join(f'{hash_set[key]}' for key in self._hash_thread.keys)}', file=fh)
def _write_bad_hashes(self, fh):
'''Write TSV with hashes and mismathing hashes in destination'''
print(f'{"\t".join(self._hash_thread.keys)}\tbad_{self._settings.verify}', file=fh)
for cnt, hash_set in enumerate(self._hash_thread.files, start=1):
if self._check_kill_signal():
return True
line = f'{"\t".join(f'{hash_set[key]}' for key in self._hash_thread.keys)}\t'
if hash_set['src_path'] in self._bad_files:
line += f'{self._bad_files[hash_set["src_path"]]}'
print(line, file=fh)
def _write_hashes_bad_sizes(self, fh):
'''Write TSV with hashes and mismatching sizes in destination'''
print(f'{"\t".join(self._hash_thread.keys)}\tbad_dst_size', file=fh)
for hash_set in self._hash_thread.files:
if self._check_kill_signal():
return True
line = f'{"\t".join(f'{hash_set[key]}' for key in self._hash_thread.keys)}'
if hash_set['src_path'] in self._bad_files:
line += f'{self._bad_files[hash_set["src_path"]]}'
print(line, file=fh)
def _error(self, msg):
'''Log and echo error'''
Log.error(msg)
self._errors.append(msg)
def _check_kill_signal(self):
'''Check if kill signal is set'''
if self._kill and self._kill.is_set():
Log.info(self._labels.aborting_by_user)
return True
return False
def _echo_file_progress(self, processed_files):
'''Show progress while processing files, percentage by nimber of files'''
self._echo(f'{processed_files} {self._labels.of} {self._total_files}, {processed_files * 100 // self._total_files} %', end='\r')
def _echo_size_progress(self, processed_files, processed_size):
'''Show progress while processing files, percentage by size'''
msg = f'{processed_files} {self._labels.of} {self._total_files}, {processed_size} {self._labels.of} {self._total_size}'
msg += f', {processed_size % self._total_size}'
self._echo(msg, end='\r')
def run(self):
'''Execute copy process (or simulation)'''
start_time = perf_counter() ### read source structure ###
src_dir_paths = set() # given directories to copy
src_file_paths = set() # given files to copy
Log.info(self._labels.reading_source)
for path in self._src_paths:
try:
src_path = path.absolute()
if src_path.is_dir():
src_dir_paths.add(src_path)
elif src_path.is_file():
src_file_paths.add(src_path)
except:
self._error(self._labels.invalid_path.replace('#', f'{path}'))
src_dir_paths = list(src_dir_paths)
src_file_paths = list(src_file_paths)
self._files = list() # all files to copy (including subdirectories): (path, size)
self._total_size = Size(0) # total size of all files to copy
bad_paths = list() # files that might be overwritten (simulation)
echo_time = int(perf_counter() * 10)
self._norm_string = NormString(self._config.max_echo_len)
for this_src_dir_path in src_dir_paths:
for path in this_src_dir_path.rglob('*'):
int_perf_counter = int(perf_counter() * 10)
if int_perf_counter > echo_time:
self._echo(self._norm_string.get(f'{path}'), end='\r')
echo_time = int_perf_counter
if self._check_kill_signal():
return
try:
if path.is_file():
size = Size(path.stat().st_size)
self._files.append((path, size, self._dst_path / path.relative_to(this_src_dir_path.parent)))
self._total_size += size
except:
bad_paths.append(path)
for path in src_file_paths:
self._echo(self._norm_string.get(f'{path}'), end='\r')
try:
size = Size(path.stat().st_size)
self._files.append((path, size, self._dst_path / path.name))
self._total_size += size
except:
bad_paths.append(path)
Log.info(f'{self._labels.done_reading}: {len(self._files)} {self._labels.files}, {self._total_size}')
if len_bad_paths := len(bad_paths):
msg = self._labels.invalid_paths.replace('#', f'{len_bad_paths}')
msg += ': ' + ', '.join(f'{path}' for path in bad_paths[:100])
if len_bad_paths > 100:
msg += ', ...'
Log.warning(msg)
if self._settings.hashes: ### start hashing ###
Log.info(self._labels.starting_hashing)
self._hash_thread = HashThread(self._files, algorithms=self._settings.hashes)
self._hash_thread.start()
robo_parameters = self._config.robocopy_base_parameters ### robocopy parameters ###
robo_parameters.extend(self._settings.options)
if self._simulate: ### add /l parameter for simulation
robo_parameters.append('/l')
for src_path in src_dir_paths: ### copy directories ###
dst_path = self._dst_path / src_path.name
robocopy = RoboCopy(src_path, dst_path, parameters=robo_parameters)
Log.info(self._labels.executing.replace('#', f'{robocopy}'))
returncode = robocopy.run(echo=self._echo, max_len=self._config.max_echo_len, kill=self._kill)
if self._check_kill_signal():
return
if returncode >= 8:
self._error(self._labels.robocopy_error.replace('#', f'{returncode}'))
for src_path in src_file_paths: ### copy files ###
robocopy = RoboCopy(src_path.parent, self._dst_path, file=src_path.name, parameters=robo_parameters)
Log.info(self._labels.executing.replace('#', f'{robocopy}'))
returncode = robocopy.run(echo=self._echo, max_len=self._config.max_echo_len, kill=self._kill)
if self._check_kill_signal():
return
if returncode >= 8:
self._error(self._labels.robocopy_error.replace('#', f'{returncode}'))
Log.info(self._labels.robocopy_finished)
self._total_files = len(self._files) ### post robocopy, hashing might run in parallel ###
self._bad_files = dict()
if self._simulate: ### simulation ###
if self._echo_simulation():
return
elif self._settings.verify == 'size': ### check sizes but not hen simulating ###
if self._verify_by_size():
return
if self._settings.hashes: ### wait until hashing is finished ###
if self._hash_thread.is_alive():
Log.info(self._labels.hashing_in_progress)
if self._wait_hashing():
return
Log.info(self._labels.hashing_finished)
with self._logger.open_tsv() as fh:
if self._settings.verify:
if self._settings.verify == 'size':
if self._settings.hashes:
if self._write_hashes_bad_sizes(fh):
return
else:
if self._write_bad_sizes(fh):
return
else:
if self._write_bad_hashes(fh):
return
elif self._settings.hashes:
if self._write_hashes(fh):
return
else:
if self._write_sizes(fh):
return
end_time = perf_counter()
delta = end_time - start_time
self._info(self._labels.all_done.replace('#', f'{timedelta(seconds=delta)}'))
if warnings:= len(self._bad_files):
returncode = self._labels.warnings_occured.replace('#', f'{warnings}')
self._warning(returncode)
else:
returncode = True
logging.shutdown()
if self._copy_log_path:
self._logger.copy_log_into(self._copy_log_path)
if self._settings.log_dir_path:
self._logger.copy_tsv_into(self._settings.log_dir_path)
return returncode