-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreprocessing_emulator.py
More file actions
736 lines (553 loc) · 34.5 KB
/
preprocessing_emulator.py
File metadata and controls
736 lines (553 loc) · 34.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
"""
Created on Fri Feb 25 2022
@author:Tristan
Fichier réalisant la préparation des données d'entrainement
du réseau émulateur.
"""
#%% Imports
# - Third-party
import numpy as np
import pandas as pd
import os
import json
# path = "C:/Users/trist/Documents/Stage/perso/git_code/cobiopro_tristan" # office path
# path = "C:/Users/trist/Documents/ENSAM/Stage INRIA/cobiopro_tristan" # home path
# os.chdir(path)
# - Local
import rot_quat_utils as rot
import remap_utils as remap
#%% 1. Lecture des données du csv
def read_df(n):
""" Fonction qui ouvre le corpus s8 si n = 0
et le corpus complet sinon (~15sec) """
if n == 0:
df = pd.read_csv('datasets/s8_corpus_ProxyDistal_RightHanded_Train.csv') # df contenant le corpus de s8
print("Loaded corpus s8 of 58844 ech")
else:
df = pd.read_csv('datasets/corpus_ProxyDistal_RightHanded_Train.csv') # df contenant le corpus complet
print("-> Loaded full corpus of 933925 ech")
df0 = df[["subjectId","timestamp","tgtNumber","tgtRed"]]
df1 = df.loc[:,"tgtPos_x":"endEffCustQuat_w"]
df = pd.concat([df0, df1], axis=1) # df contenant data nécessaires pour le controleur et l'émulateur
return df
# on choisit quel corpus on veut lire, prend 15s pour corpus complet
# df = read_df(n = 1)
#%% 2. Obtention des configs de 7angles, du corpus remappé et construction du corpus de tgt remappées
def compute_angles(df, ref):
""" Fonction qui renvoie les angles d'euler associés au quaternions
dans le repère de notre choix """
## quaternions associés a chaque articulations de la prothèse
quats_upper = np.array(df.loc[:,"shouVirtQuat_x":"shouVirtQuat_w"])
quats_fore = np.array(df.loc[:,"elbVirtQuat_x":"elbVirtQuat_w"])
quats_hand = np.array(df.loc[:,"wriVirtQuat_x":"wriVirtQuat_w"])
if ref == "refArm": # configuration calculée avec refArm
quats_ref = np.array(df.loc[:,"refArmQuat_x":"refArmQuat_w"])
if ref == "refTgt": # configuration calculée avec refTgt (!Attention: ne marche que si state_tgt_remapped.csv en Quat)
if 'tgt_states' not in locals(): # corpus des tgt a load
tgt_states = np.array(pd.read_csv('corpus_state_tgt_remapped.csv' ))
quats_ref = tgt_states[:,3:] # quat non impactés par remappage pos
if ref == "refInit": # configuration calculée avec refInit
quats_ref = np.array(df.loc[:,"refInitQuat_x":"refInitQuat_w"])
## on utilise rot_quat_utils.py pour avoir les angles d'euler a partir des quaternions
mat_angles = rot.quats2config(quats_upper, quats_fore, quats_hand,
quats_ref, unit="rad", arm_side='R')
print("-> Angles d'Euler créé et dans le repère", ref)
## on recupere les quaternions de la main qu'on exprime en angle d'Euler dans refInit (Rajouté pour controleur)
hand_quats = np.array(df.loc[:,"endEffVirtQuat_x":"endEffVirtQuat_w"])
hand_angles = rot.quat2angles( quats = hand_quats,
order_of_rots = "XZY",
quats_prev=quats_ref, unit="rad",
arm_side='R')[: , :2] # angle Pitch et Roll seulement
print("-> Angles d'Euler de la main dans le repère", ref)
## UNCOMMENT IF you want to extract corpus euler angles to csv
# if not os.path.exists('datasets/euler_angles_refInit.csv'):
# df_mat_angles = pd.DataFrame(mat_angles, columns = ["ShP", "ShR","ArY",
# "ElbP","FoY","WrP","WrR"])
# df_mat_angles.to_csv(path + "/datasets/euler_angles_refInit.csv", index = True)
# if not os.path.exists('datasets/hand_angles_refInit.csv'):
# df_hand_angles = pd.DataFrame(hand_angles, columns = ["handP", "handR"])
# df_hand_angles.to_csv(path + "/datasets/hand_angles_refInit.csv", index = True)
return mat_angles, hand_angles
def remap_pos(df, mat_angles, remappage):
""" Fonction qui renvoie les positions du corpus remappées par rapport
aux dimensions de référence seg_dims """
if remappage == True:
configs = mat_angles # autrefois: doute sur ce point, wriAngles or handAngles?
seg_dims = np.array([[0, -0.280, 0], # position of elbow in shoulder ref
[0, -0.25, 0], # position of wrist in elbow ref
[0, 0, 0]]) # position of end effector in wrist ref (! A MODIFER pour CONTROLEUR )
# seg_dims = np.array([[0, -0.186, 0], # 1 m tall subject
# [0, -0.146, 0],
# [0, 0, 0]])
arrow_offset = None # mis à None pour l'instant (ce qui était convenu avec Vincent)
urdf_path = ""
hand_pos = remap.get_remapped_block(configs, seg_dims, arrow_offset, urdf_path)
print("-> Le corpus a été remappé avec ", seg_dims)
## UNCOMMENT IF you want to extract corpus handpos to csv
## extract handpos corpus remmapped
# if not os.path.exists('datasets/hand_pos_remapped.csv'):
# df_hand_pos = pd.DataFrame(hand_pos, columns = ["handPos_x", "handPos_y","handPos_z"])
# df_hand_pos.to_csv(path + "/datasets/hand_pos_remapped.csv", index = True)
return hand_pos
else:
hand_pos = np.array( df.loc[ : ,"endEffVirtPos_x":"endEffVirtPos_z" ]) # position de main brute, non remappée
print("-> Le corpus n'a pas été remappé ")
return hand_pos
def create_corpus_tgt(df, hand_angles, hand_pos, n, remappage): # la grandeur indice_cible renvoyée n'est pas exploitée
"""" Fonction qui renvoie le corpus des cibles remappées, les index
de troncatures des cibles et les indices des cibles du corpus d'origine
(inconvénient ~175s) """
## on créé les variables nécessaires et on initialise les listes
vect_tgt = np.array(df[["tgtRed"]]).squeeze() # vecteur comportant la donnée tgtRed
vect_cible = np.array(df[["tgtNumber"]]).squeeze() # vecteur comportant le nb de cible atteintes a chaque configuration
nb_ech = len(vect_tgt)
tgtRed = list() # liste qui contiendra l'ensemble des configs a tgtRed = True
indice_c = 0 # incrément pour voir les variations dans vect_cible
j = 0 # indice pour incrémenter les cibles != indice_c
indice_cible = list() # liste qui contiendra les index des cibles
index_Red = list() # liste qui contiendra les index des tgtRed
index_tronc = list() # liste qui contiendra les index des 1ere tgtRed
tgt_angles = hand_angles
if n == 0 : # corpus s8
tgt_statesRed = np.zeros((290,5)) # array qui contiendra l'ensemble des configs des tgts que l'on veut
tgt_states = np.zeros((58844,5))
nb_cible = 290
elif n == 1: # corpus complet
tgt_statesRed = np.zeros((4305,5))
tgt_states = np.zeros((939925,5))
nb_cible = 4305
## on construit la liste de tgt et le corpus avec chaque config de tgt qui se repete sur sa meme trajectoire
for k in range( nb_ech ): # boucle parcourant tout les échantillons OU tgtRed = [df.loc[ k , "wriVirtQuat_x" : "wriVirtQuat_w" ] for k in range(len(vect_tgt)) if vect_tgt[k] == True]
boolean_tgt = vect_tgt[k] # boolean
indice_c_k = vect_cible[k] # int
if boolean_tgt == True: # on ajoute la config a tgtRed True
tgt_pos = hand_pos[k]
tgt_ang = tgt_angles[k]
mat_Red = np.concatenate((tgt_pos,tgt_ang))
tgtRed.append( mat_Red ) # données pour l'émulateur
index_Red.append(k) # on stocke l'index de la tgt
elif indice_c_k != indice_c:
tgt_statesRed[j] = tgtRed[0] # on garde seulement la 1ere config où tgtRed True
j += 1
index_tronc.append( index_Red[-1] - index_Red[0] ) # on garde seulement le premier index tgtRed True
index_Red = list()
indice_c = indice_c_k # l'incrément prend la valeur de i
tgtRed = list() # on réinitialise tgtRed après chaque cible
indice_cible.append(k) # on stocke l'index où la cible change
tgt_statesRed[j] = tgtRed[0] # on ajoute la dernière tgt (pour i = i_c : numéro derniere tgt)
index_tronc.append(index_Red[-1] - index_Red[0]) # idem
## on construit le corpus avec chaque config de tgt qui se repète sur sa meme trajectoire
i_c = 0 # incrément pour voir les variations dans vect_cible
ind = 0 # incrément qui parcours les tgts dans tgt_config
for k in range(nb_ech) :
i = vect_cible[k]
if i != i_c:
i_c = i
ind += 1
if ind <= nb_cible:
tgt_states[k] = tgt_statesRed[ind]
if remappage == True:
print("-> Le corpus de tgt remappées a été créé")
else:
print("-> Le corpus de tgt non remappées a été créé")
## UNCOMMENT IF you want to extract corpus tgt remapped to csv
# if not os.path.exists('datasets/tgt_states_remapped_refInit.csv'):
# # create new df with new tgts
# df_tgt_states = pd.DataFrame(tgt_states, columns = ["tgtPos_x", "tgtPos_y","tgtPos_z",
# "tgtPitch","tgtRoll"]) # on pourrait surement recup les noms du df
# df_tgt_states.to_csv(path + "/datasets/tgt_states_remapped_refInit.csv", index = True)
return tgt_states, index_tronc, indice_cible
## 1. on créé l'ensemble des config de 7 angles d'Euler dans le refInit
## 2. on remap le corpus pour obtenir les nouvelles positions
## 3. on créé le corpus de Tgt remappées
# mat_angles, hand_angles = compute_angles(df, ref = "refInit")
# hand_pos = remap_pos(df, mat_angles, remappage = True)
# tgt_states, index_tronc, indice_cible = create_corpus_tgt(df, hand_angles, hand_pos, n = 1, remappage = True)
#%% 3. Obtention du vecteur d'état z(t) remappé (RUN IT ONCE ONLY)
## pratique pour lancement par cellules #%% (permet de commencer ici)
# if 'mat_angles' not in locals():
# mat_angles = np.array(pd.read_csv('datasets/euler_angles_refInit.csv', index_col = 0))
# if 'hand_angles' not in locals():
# hand_angles = np.array(pd.read_csv('datasets/hand_angles_refInit.csv', index_col = 0))
# if 'hand_pos' not in locals():
# hand_pos = np.array(pd.read_csv('datasets/hand_pos_remapped.csv', index_col = 0))
# if 'tgt_states' not in locals():
# tgt_states = np.array(pd.read_csv('datasets/tgt_states_remapped_refInit.csv' , index_col = 0)) # matrice contenant le corpus de tgt, PB: taille (n_sample , 8)
def compute_position(df, hand_pos, tgt_states, ref): # !ATTTENTION si hand_pos ne provient pas du 2. ou importé du repertoire, ~remappe a l'infini
""" Fonction qui renvoie les positions de la main dans le repère
de notre choix """
pos_in_W = hand_pos # on récupére la position de la main (et la position de la main remappée.. ? )
ref_quat_in_W = np.array(df.loc[:,"refInitQuat_x":"refInitQuat_w"]) # le plus cohérent pr l'instant, autre possibilité est tgtQuat
if ref == "refArm": # position calculée avec refArm
ref_pos_in_W = np.array(df.loc[:,"refArmPos_x":"refArmPos_z"])
print("-> Position dans le repère refArm")
if ref == "refTgt": # position calculée avec refTgt
ref_pos_in_W = tgt_states[:, : 3] # on récupère les positions seulement
print("-> Position dans le repère refTgt")
## on fait appel a rot_quat_utils.py pour avoir les positions main dans refTgt
hand_pos = rot.get_pos_in_given_ref(pos_in_W, ref_pos_in_W, ref_quat_in_W)
# if not os.path.exists('datasets/hand_pos_remapped_refInit.csv'):
# df_hand_pos = pd.DataFrame(hand_pos, columns = ["handPos_x", "handPos_y","handPos_z"])
# df_hand_pos.to_csv(path + "/datasets/hand_pos_remapped_refInit.csv", index = True)
return hand_pos
def compute_state(df, mat_angles, hand_angles, hand_pos, tgt_states, ref):
""" Fonction qui renvoie l'état du système dans le ref hybride:
ref_tgtPos_InitAng (0.13s) """
mat_states = np.concatenate((mat_angles, hand_angles, hand_pos), axis = 1) # matrice contenant l'état du système angle+pos
## UNCOMMENT IF you want to extract full corpus remapped to csv
# if not os.path.exists("datasets/state_remapped_refInit.csv"):
# ## on créé un df contenant l'état (a modifier pour controleur car incomplet)
# df_state = pd.DataFrame(mat_states, columns = ["ShouPitch","ShouRoll",
# "ArmYaw","ElbPitch", "ForearmYaw","WriPitch",
# "WriRoll", "HandPitch", "HandRoll", "PosX", "PosY", "PosZ"]) # ordre = choix arbitraire
# df_state.to_csv(path + "/datasets/state_remapped_refInit.csv", index = True)
print("-> Le corpus est dans un référentiel avec l'orientation de refInit et refTgt comme origine")
return mat_states
# hand_pos = compute_position(df, hand_pos, tgt_states, ref = "refTgt" )
# mat_states = compute_state(df, mat_angles, hand_angles, hand_pos, tgt_states, ref = "refTgt" )
#%% 4a. Preparation des données pour le réseau émulateur
## pratique pour lancement par cellules #%% (permet de commencer ici)
# if "index_tronc.csv" not in locals():
# index_tronc = np.array(pd.read_csv('datasets/index_tronc.csv', index_col = 0))
# if 'mat_states' not in locals():
# mat_states = np.array(pd.read_csv('datasets/state_remapped_refInit.csv' , index_col = 0)) # matrice contenant le corpus d'états,
# if 'tgt_states' not in locals():
# tgt_states = np.array(pd.read_csv('datasets/tgt_states_remapped_refInit.csv', index_col =0 )) # matrice contenant le corpus de tgt
def split_truncate_traj_e(df, index_tronc, mat_states, tgt_states, n):
""" Fonction qui renvoie les trajectoires tronquées et séparées
par cibles de l'émulateur """
## on réorganise le dataset pour dissocier les trajectoires par cible
vect_cibles = np.array(df[["tgtNumber"]]) # vecteur comportant le nb de cible atteintes a chaque configuration
# initialisation de matrices/vecteurs contenant les trajectoires selon le corpus
if n == 1:
vect_subj = np.array(df[["subjectId"]]) # array contenant les subj id associé a chaque ech
mat_states = np.concatenate((mat_states, vect_subj), axis = 1) # on ajoute la donnée subj id
mat_tgts = np.zeros((4305,), dtype = object) # matrice contenant les cibles pour le corpus complet
elif n == 0: # PAS FAITE
mat_tgts = np.zeros((290,), dtype = object) # matrice contenant les cibles pour s8
# on construit la matrice
mat_cible = mat_states[0] # on initialise la matrice contenant une trajectoires de cible
n_ech = 1 # incrément parcourant les échantillons
nb_cible = 0 # nb de cible réellement parcourues
i_c = 0 # incrément pour voir les variations dans vect_cible
for i in vect_cibles[1:]: # boucle parcourant tgtNumber
if i != i_c:
mat_tgts[nb_cible] = mat_cible # on ajoute la trajectoire de cible
i_c = i
nb_cible += 1
mat_cible = mat_states[n_ech]
else:
mat_cible = np.vstack(( mat_cible, mat_states[n_ech] )) # on ajoute les ech d'une trajectoire
n_ech += 1
mat_tgts[-1] = mat_cible # on ajoute la dernière cible pour i == i_c
len_tgts = [len(c) for c in mat_tgts] # matrice contenant la longueur des tgt
## on tronque le corpus avec le choix du corpus de cible qu'on a fait précédémment
mat_tgts = np.array([ mat_tgts[k][: len(mat_tgts[k]) - int(index_tronc[k]) ] for k in range( len(mat_tgts) )] , dtype = object)
## on tronque le corpus de cibles (autre manière de tronquer que faite dans preprocessing controleur)
tgt_state0 = tgt_states[ : sum(len_tgts[ : 1 ]) - int(index_tronc[0])]
for k in range(1 , len(len_tgts)):
ech_start = sum(len_tgts[ : k])
ech_stop = sum(len_tgts[ : k + 1 ]) - int(index_tronc[k])
tgt_state0 = np.vstack((tgt_state0 , tgt_states[ ech_start : ech_stop ] ))
len_tgts = [len(c) for c in mat_tgts]
# on tronque les inputs (mat_states PAS UTILISÉ)
mat_states = mat_tgts[0][0, : 12] # tout sauf vect_subj
for c in mat_tgts:
mat_states = np.vstack((mat_states , c[: , :12]))
tgt_states = tgt_state0
nb_ech_tronc = sum(len_tgts)
print("-> Corpus has been truncated and is now of ", nb_ech_tronc, " ech")
if n == 1 : # corpus complet
# on tronque les données qu'on récup du corpus
vect_subj = mat_tgts[0][: , -1]
for c in mat_tgts[1 : ]:
vect_subj = np.concatenate((vect_subj , c[: , -1]))
vect_subj = list(vect_subj)
# on remet mat_tgts
mat_tgts = np.array([ mat_tgts[k][: , :12] for k in range(len(mat_tgts))], dtype = object)
## UNCOMMENT IF you want to extract corpus tgt_state_remmapped_truncated
# if not os.path.exists("datasets/tgt_states_preprocessed.csv"):
# df_tgt_states = pd.DataFrame(tgt_states, columns = ["tgtPos_x", "tgtPos_y","tgtPos_z",
# "tgtPitch","tgtRoll"]) # on pourrait surement recup les noms du df
# df_tgt_states.to_csv(path + "/datasets/tgt_states_preprocessed.csv", index = True)
# if not os.path.exists("datasets/states_preprocessed.csv"):
# ## on créé un df contenant l'état (a modifier pour controleur car incomplet)
# df_state = pd.DataFrame(mat_states, columns = ["ShouPitch","ShouRoll",
# "ArmYaw","ElbPitch", "ForearmYaw","WriPitch",
# "WriRoll", "HandPitch", "HandRoll",
# "PosX", "PosY", "PosZ"]) # ordre = choix arbitraire
# df_state.to_csv(path + "/datasets/states_preprocessed.csv", index = True)
# if not(os.path.exists("/datasets/mat_tgts_e.npy")): # sauvegarde de mat_tgts
# np.save('./datasets/mat_tgts_e.npy', mat_tgts)
return mat_tgts , len_tgts, tgt_states, vect_subj
# mat_tgts, len_tgts, tgt_states, vect_subj = split_truncate_traj_e(df, index_tronc, mat_states, tgt_states, n = 1)
#%% 4b. Preparation des données pour le réseau émulateur
# if 'mat_tgts' not in locals():
# mat_tgts = np.load("./datasets/mat_tgts_e.npy", allow_pickle = True)
## old was for prepare_set e1
# if 'vect_subj' not in locals(): # vect_subj a enregistrer pour pouvoir le load avant
# f = open("datasets/vect_subj.json")
# vect_subj = json.load(f)
# f.close()
# if "Index_subj" not in locals():
# f = open("datasets/Index des sujets.json")
# Index_subj = json.load(f)
# f.close()
def prepare_sets_e1(mat_tgts, vect_subj, ID, n): # NOT USED ANYMORE
""" Fonction qui renvoie le dataset d'entrainement et de test
de l'émulateur avec k=17 fold et un dico avec les index de départ
du sujet test ID extrait (échantillon et trajectoire)
Elle nécessite le fichier vect_subj.json donnant les index des sujets par cible.
Inconvénient: la compilation est longue + les configs
sont approximées après conversion np.array()
(THIS FUNCTION WAS NOT RUNNED RECENTLY ; previous compilation time: ~50sec) """
if n == 1 :
len_tgts = [len(c) for c in mat_tgts]
subj_id = [10] # liste qui contiendra les numéros des 17 sujets
k = subj_id[0]
for j in vect_subj: # boucle parcourant les ID de sujets
if k != j:
subj_id.append(j) # on ajoute les numéros de sujets
k = j
# on détermine le 1er et dernier échantillon du set de test
ech_start = vect_subj.index(subj_id[ID]) # indice signalant a quel ech commence le sujet choisit ex sujet 11: 51051; suj13 : 99085
if ID == 16: # cas pour le dernier sujet
ech_stop = vect_subj.index(subj_id[-1])
if ID != 16:
ech_stop = vect_subj.index(subj_id[ ID + 1 ]) # indice signalant a quel ech finis le sujet
# on détermine l'indice de départ et de fin sur les cibles car on a séparé les tgt par cibles independamment des sujets
nb_ech = 0
cible_start = 0 # indice qui sera celui de la 1ere cible DU SUJET extrait
# on détermine les indices pour séparer le train et test set
while nb_ech < ech_start:
nb_ech += len_tgts[cible_start]
cible_start += 1
cible_stop = cible_start # indice qui sera celui de la derniere cible du sujet
while nb_ech < ech_stop:
nb_ech += len_tgts[cible_stop]
cible_stop += 1
if ID == 16: # cas pour le dernier sujet
cible_stop = len(mat_tgts)
elif n == 0: # corpus s8 (peut etre pas au point pour derniere cible )
cible_start = ID
cible_stop = ID + 1
# on commence à l'ech 0
if ID == 0:
ech_start = 0
ech_stop = len(mat_tgts[0]) - 1 # la trajectoire commence a 0
# on commence à l'ech où commence la cible
else:
ech_start = sum(len_tgts[ : ID])
ech_stop = sum(len_tgts[ : ID + 1])
print("La cible extraite du training data est la cible n°", ID + 1 ) # car on commence a cible 0
# on construit les matrices train, test
train_states_init = mat_tgts[0][0] # initialisation des vecteurs d'état z(t) et z(t+1) du train set
train_states_final = mat_tgts[0][1]
if ID == 0 or cible_start == 0 :
train_states_init = mat_tgts[cible_stop][0] # initialisation des vecteurs d'état z(t) et z(t+1) du train set
train_states_final = mat_tgts[cible_stop][1]
for c in mat_tgts[ : cible_start ]: # boucle parcourant les trajectoires par cible
if np.array_equal(c, mat_tgts[0]):
train_states_init = np.vstack((train_states_init, c[1 : len(c) - 1]))
train_states_final = np.vstack((train_states_final, c[2 : len(c)]))
else:
train_states_init = np.vstack((train_states_init, c[ : len(c) - 1]))
train_states_final = np.vstack((train_states_final, c[1 : len(c)]))
train_control = train_states_final - train_states_init
# on conserve un sujet (n°10) ou une cible pour le test
test_states_init = mat_tgts[cible_start][0]
test_states_final = mat_tgts[cible_start][1]
for c in mat_tgts[ cible_start : cible_stop ]:
if np.array_equal(c, mat_tgts[cible_start]):
test_states_init = np.vstack((test_states_init, c[1 : len(c) - 1]))
test_states_final = np.vstack((test_states_final, c[2 : len(c)]))
else:
test_states_init = np.vstack((test_states_init, c[ : len(c) - 1]))
test_states_final = np.vstack((test_states_final, c[1 : len(c)]))
test_control = test_states_final - test_states_init
for c in mat_tgts[ cible_stop : ]:
if np.array_equal(c, mat_tgts[cible_stop]):
train_states_init = np.vstack((train_states_init, c[1 : len(c) - 1]))
train_states_final = np.vstack((train_states_final, c[2 : len(c)]))
else:
train_states_init = np.vstack((train_states_init, c[ : len(c) - 1]))
train_states_final = np.vstack((train_states_final, c[1 : len(c)]))
train_control = train_states_final - train_states_init
if n == 1 :
print("-> Le sujet extrait du training data est le sujet n°", subj_id[ID])
elif n == 0:
print("-> La cible extraite du training data est la cible n°", ID + 1 ) # car on commence a cible 0
# on reforme le dataset pour plus de simplicite
x_train = np.concatenate((train_states_init, train_control), axis = 1)
y_train = train_states_final
x_test = np.concatenate((test_states_init, test_control), axis = 1)
y_test = test_states_final
# on stock les index de départs
dic = dict(ech_start = ech_start, ech_stop= ech_stop, cible_start=cible_start, cible_stop=cible_stop)
## UNCOMMENT IF you want to extract previous emulator training and test data
# if not os.path.exists("datasets/previous_training_data/corpus_xtrain_e.csv"):
# column_names1 = ["ShouPitch","ShouRoll","ArmYaw","ElbPitch", "ForearmYaw","WriPitch",
# "WriRoll", "PosX", "PosY", "PosZ", "u0","u1",
# "u2","u3","u4","u5","u6","u7","u8","u9"]
# column_names2 = column_names1[:10]
# df_xtrain = pd.DataFrame(x_train, columns = column_names1)
# df_ytrain = pd.DataFrame(y_train, columns = column_names2)
# df_xtest = pd.DataFrame(x_test, columns = column_names1)
# df_ytest = pd.DataFrame(y_test, columns = column_names2)
# df_xtrain.to_csv(path + "/corpus_xtrain_e.csv", index = True)
# df_ytrain.to_csv(path + "/corpus_ytrain_e.csv", index = True)
# df_xtest.to_csv(path + "/corpus_xtest_e.csv", index = True)
# df_ytest.to_csv(path + "/corpus_ytest_e.csv", index = True)
return ((x_train, y_train), (x_test, y_test)), dic
def prepare_sets_e2(mat_tgts, Index_subj, ID): # PAS UTILISÉ à ce jour (pas d'utilité notable)
"""Fonction qui construit la matrice contenant les trajectoires
par sujets """
mat_subj = [mat_tgts[Index_subj[k]["cible_start"] :
Index_subj[k]["cible_stop"]] for k in range(17)] # matrice contenant les trajectoires par sujet
return mat_subj
def prepare_sets_e3(mat_tgts, Index_subj, kfold):
"""Fonction qui construit les train, val et test set avec 5 Fold
elle est très proche de prepare_sets_e1 mais elle nécessite le fichier
Index des sujets.json qui contient les indices d'échantillons et
trajectoires de chaque sujet (ou: d'avoir lancé
prepare_sets_e1 pour les 17 sujets et récupéré tout les vect_subj.json)
temps d'exécution ~ 30 sec """
k = kfold
## creation des mat_tgts train, test et val
test_index_start, test_index_stop = Index_subj[-2]["cible_start"], Index_subj[-1]["cible_stop"]
mat_tgts_test = mat_tgts[test_index_start : test_index_stop]
mat_tgts = mat_tgts[ : test_index_start]
Index = [i["cible_start"] for i in Index_subj]
Index = Index[: -1] # on enleve le dernier sujet test
if k == 1 :
k -= 1
elif k == 2:
k += 1
elif k == 3 :
k += 3
elif k == 4 :
k += 5
elif k == 5:
k += 7
index_start, index_stop =Index[k] , Index[k + 3]
mat_tgts_train = np.concatenate((mat_tgts[ : index_start] , mat_tgts[ index_stop : ]))
mat_tgts_val = mat_tgts[ index_start : index_stop ]
## construction des train, val et test set pour le réseau
train_states_init = mat_tgts_train[0][0] # initialisation des vecteurs d'état z(t) et z(t+1) du train set
train_states_final = mat_tgts_train[0][1]
for c in mat_tgts_train: # boucle parcourant les trajectoires par cible
if np.array_equal(c, mat_tgts_train[0]):
train_states_init = np.vstack((train_states_init, c[1 : len(c) - 1]))
train_states_final = np.vstack((train_states_final, c[2 : len(c)]))
else:
train_states_init = np.vstack((train_states_init, c[ : len(c) - 1]))
train_states_final = np.vstack((train_states_final, c[1 : len(c)]))
train_control = train_states_final - train_states_init
x_train = np.concatenate((train_states_init, train_control), axis = 1)
y_train = train_states_final
val_states_init = mat_tgts_val[0][0]
val_states_final = mat_tgts_val[0][1]
for c in mat_tgts_val:
if np.array_equal(c, mat_tgts_val[0]):
val_states_init = np.vstack((val_states_init, c[1 : len(c) - 1]))
val_states_final = np.vstack((val_states_final, c[2 : len(c)]))
else:
val_states_init = np.vstack((val_states_init, c[ : len(c) - 1]))
val_states_final = np.vstack((val_states_final, c[1 : len(c)]))
val_control = val_states_final - val_states_init
x_val = np.concatenate((val_states_init, val_control), axis = 1)
y_val = val_states_final
test_states_init = mat_tgts_test[0][0]
test_states_final = mat_tgts_test[0][1]
for c in mat_tgts_test:
if np.array_equal(c, mat_tgts_test[0]):
test_states_init = np.vstack((test_states_init, c[1 : len(c) - 1]))
test_states_final = np.vstack((test_states_final, c[2 : len(c)]))
else:
test_states_init = np.vstack((test_states_init, c[ : len(c) - 1]))
test_states_final = np.vstack((test_states_final, c[1 : len(c)]))
test_control = test_states_final - test_states_init
x_test = np.concatenate((test_states_init, test_control), axis = 1)
y_test = test_states_final
## UNCOMMENT IF you want to extract training, val and test data
if not os.path.exists("datasets/training_data/xtrain_e_k="+str(k)+".npy"):
if k == 0 :
k = 1
elif k == 3:
k = 2
elif k == 6 :
k = 3
elif k == 9 :
k = 4
elif k == 12:
k = 5
np.save("./datasets/training_data/x_train_e_k="+str(k)+".npy", x_train)
np.save("./datasets/training_data/y_train_e_k="+str(k)+".npy", y_train)
np.save("./datasets/training_data/x_val_e_k="+str(k)+".npy", x_val)
np.save("./datasets/training_data/y_val_e_k="+str(k)+".npy", y_val)
np.save("./datasets/training_data/x_test_e_k="+str(k)+".npy", x_test)
np.save("./datasets/training_data/y_test_e_k="+str(k)+".npy", y_test)
print("-> Le Fold ",str(k), " du train, val set et test set a été créé (cross val 5Fold)")
return ((x_train, y_train), (x_val, y_val), (x_test, y_test))
## utilisation de prepareset_1 :
## choix du sujet du test set dans [10, 11, 13, 14, 3, 5, 6, 7, 8, 9, 27, 28, 29, 30, 31, 32, 33]
## sujet ID = 0 est le sujet n°10
# dataset_e , dic = prepare_sets_e1(mat_tgts, vect_subj, ID = 0, n = 1)
## utilisation de prepareset_3
## choose between k = 1 to k = 5 depending on the crossval set you want
# for k in range(2,6):
# dataset_e = prepare_sets_e3(mat_tgts, Index_subj, kfold = 1) # if cross validation put kfold = k in loop
#%% MAIN FUNCTION
def prepare_data_e():
""" Fonction qui fait le preprocessing du dataset de l'émulateur avec un découpage
pour cross validation fixée à 5fold (12 train, 3 val et 2 sujets test).
Elle necessite au minimum le .csv du corpus complet et le .json des indices de début de
trajectoires de chaque sujet "Index_subj".
Temps de compilation ~ 152sec
"""
print("Pour extraire le training data de l'émulateur dans le repertoire datasets/training_data , vérifier que la partie réservée dans prepare_sets_e3 est decommentée.")
if False: # Mettre a True si on part du corpus brut
## choix du corpus
df = read_df(n = 1)
## construction des vecteur d'états (angle et pos) dans la configuration recherchée
mat_angles, hand_angles = compute_angles(df , ref = "refInit")
hand_pos = remap_pos(df , mat_angles, remappage = True)
tgt_states , index_tronc , _ = create_corpus_tgt(df, hand_angles, hand_pos, n = 1, remappage = True)
hand_pos = compute_position(df, hand_pos, tgt_states, ref = "refTgt" )
mat_states = compute_state(df, mat_angles, hand_angles, hand_pos, tgt_states, ref = "refTgt" )
## construction des données d'entrainements et choix du kfold
mat_tgts, _, _ , _ = split_truncate_traj_e(df, index_tronc, mat_states, tgt_states, n = 1)
f = open("datasets/Index des sujets.json")
Index_subj = json.load(f)
f.close()
## choix du kfold
dataset_e = prepare_sets_e3(mat_tgts, Index_subj, kfold = 1)
return dataset_e
if False: # Mettre a True si une partie du preprocessing est faite (quasiment équivalente : temps de compilation : 149 s )
## choix du corpus
df = read_df(n = 1)
## construction des données d'entrainements et choix du kfold
index_tronc = np.array(pd.read_csv('datasets/index_tronc.csv', index_col = 0))
mat_states = np.array(pd.read_csv('datasets/state_remapped_refInit.csv' , index_col = 0))
tgt_states = np.array(pd.read_csv('datasets/tgt_states_remapped_refInit.csv', index_col =0 ))
mat_tgts, _ , _ , _ = split_truncate_traj_e(df, index_tronc,
mat_states, tgt_states, n = 1)
f = open("datasets/Index des sujets.json")
Index_subj = json.load(f)
f.close()
## choix du kfold
dataset_e = prepare_sets_e3(mat_tgts, Index_subj, kfold = 1)
return dataset_e
if True: # Mettre a True si mat_tgts_e déja enregistrée
## construction des données d'entrainements et choix du kfold
mat_tgts = np.load("datasets/mat_tgts_e.npy", allow_pickle = True)
f = open("datasets/Index des sujets.json")
Index_subj = json.load(f)
f.close()
## choix du kfold
dataset_e = prepare_sets_e3(mat_tgts, Index_subj, kfold = 1)
return dataset_e
def main():
prepare_data_e()
if __name__=="__main__":
main()