-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdependency.py
More file actions
74 lines (54 loc) · 2.12 KB
/
dependency.py
File metadata and controls
74 lines (54 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from fastapi import Depends, HTTPException, Request, Security, security
from sqlalchemy.orm import Session
from database import get_db
from exception import TokenExpire, TokenNotCorrect
from repository import TaskRepository, TaskCache
from cache import get_redis_connection
from repository import UserRepository
from service import TaskService
from service import UserService
from service import AuthService
from settings import Settings
def get_task_repository(db_session: Session = Depends(get_db)) -> TaskRepository:
return TaskRepository(db_session)
def get_task_cache_repository() -> TaskCache:
redis_connection = get_redis_connection()
return TaskCache(redis_connection)
def get_task_service(
task_repository: TaskRepository = Depends(get_task_repository),
task_cache: TaskCache = Depends(get_task_cache_repository),
) -> TaskService:
return TaskService(
task_repository=task_repository,
task_cache=task_cache
)
def get_user_repository(db_session: Session = Depends(get_db)) -> UserRepository:
return UserRepository(db_session=db_session)
def get_auth_service(
user_repository: UserRepository = Depends(get_user_repository),
) -> AuthService:
return AuthService(user_repository=user_repository, settings=Settings())
def get_user_service(
user_repository: UserRepository = Depends(get_user_repository),
auth_service: AuthService = Depends(get_auth_service)
) -> UserService:
return UserService(user_repository=user_repository, auth_service=auth_service)
reusable_oauth2 = security.HTTPBearer()
def get_request_user_id(
auth_service: AuthService = Depends(get_auth_service),
token: security.http.HTTPAuthorizationCredentials = Security(
reusable_oauth2)
) -> int:
try:
user_id = auth_service.get_user_id_from_access_token(token.credentials)
except TokenExpire as e:
raise HTTPException(
status_code=401,
detail=e.details
)
except TokenNotCorrect as e:
raise HTTPException(
status_code=401,
detail=e.details
)
return user_id