-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgoogle_service.py
More file actions
73 lines (62 loc) · 2.75 KB
/
google_service.py
File metadata and controls
73 lines (62 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2.service_account import Credentials
import tempfile
import os
import sys
# Funkcja do autoryzacji i budowania klienta Google Drive API
def authenticate_google_drive():
"""Autoryzuje i tworzy klienta usługi Google Drive."""
credentials_path = "credentials.json"
scopes = ["https://www.googleapis.com/auth/drive.file"]
credentials = Credentials.from_service_account_file(credentials_path, scopes=scopes)
service = build("drive", "v3", credentials=credentials)
return service
def upload_file_from_memory(file_content: bytes, file_name: str, folder_id: str = None) -> str:
"""
Zapisuje tymczasowo zawartość pliku z pamięci, wysyła na Google Drive i zwraca ID pliku.
"""
try:
# Utwórz plik tymczasowy i zapisz w nim zawartość
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{file_name}") as temp_file:
temp_file.write(file_content)
temp_file_path = temp_file.name
# Wyślij plik tymczasowy na Google Drive
file_id = upload_file_to_google_drive(temp_file_path, file_name, folder_id)
return file_id
finally:
# Upewnij się, że plik tymczasowy jest usuwany
if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
os.remove(temp_file_path)
# Funkcja do wysyłania pliku na Dysk Google
def upload_file_to_google_drive(file_path, file_name, folder_id=None):
"""Wysyła plik z podanej ścieżki na Dysk Google i zwraca ID pliku."""
service = authenticate_google_drive()
file_metadata = {"name": file_name}
if folder_id:
file_metadata["parents"] = [folder_id]
media = MediaFileUpload(file_path, resumable=True)
uploaded_file = service.files().create(
body=file_metadata,
media_body=media,
fields="id"
).execute()
file_id = uploaded_file.get('id')
print(f"Plik został wysłany. ID pliku: {file_id}")
return file_id
def get_file_parents(file_id: str):
"""Pobiera i wyświetla metadane pliku, w tym jego rodziców."""
service = authenticate_google_drive()
try:
file_metadata = service.files().get(fileId=file_id, fields="id, name, parents").execute()
print("Oto metadane pliku:")
print(file_metadata)
except Exception as e:
print(f"Wystąpił błąd podczas pobierania metadanych pliku: {e}")
if __name__ == "__main__":
# Prosty interfejs CLI do diagnostyki
if len(sys.argv) > 2 and sys.argv[1] == "check_parents":
file_id_to_check = sys.argv[2]
get_file_parents(file_id_to_check)
else:
print("Użycie: python google_service.py check_parents <ID_PLIKU>")