-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathusers.py
More file actions
51 lines (43 loc) · 1.5 KB
/
users.py
File metadata and controls
51 lines (43 loc) · 1.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from db import SessionLocal
from models import User
from auth import hash_password, verify_password
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_user_by_username(username: str, db: Session = None) -> User | None:
if not db:
db = next(get_db())
return db.query(User).filter(User.username == username).first()
def get_user_by_email(email: str, db: Session = None) -> User | None:
if not db:
db = next(get_db())
return db.query(User).filter(User.email == email).first()
def get_user_by_id(user_id: int, db: Session = None) -> User | None:
if not db:
db = next(get_db())
return db.query(User).filter(User.id == user_id).first()
def create_user(username: str, email: str, password: str, db: Session = None) -> User:
if not db:
db = next(get_db())
hashed = hash_password(password)
user = User(username=username, email=email, hashed_password=hashed)
db.add(user)
try:
db.commit()
db.refresh(user)
return user
except IntegrityError:
db.rollback()
raise ValueError("Username or email already exists")
def validate_user_credentials(username: str, password: str, db: Session = None) -> User | None:
if not db:
db = next(get_db())
user = get_user_by_username(username, db)
if user and verify_password(password, user.hashed_password):
return user
return None