Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .coverage
Binary file not shown.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
⚙️ You can Generate Project Interactively Based on this template with the [FastAPI Gen8 CLI Tool](https://pypi.org/project/fastapi-gen8/)


![Coverage Badge](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/brianobot/b56b3d61a5e739fd26252cda094bace2/raw)
![Test Coverage Badge](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/brianobot/b56b3d61a5e739fd26252cda094bace2/raw)


[📖 Read Article here](https://medium.com/@brianobot9/the-ultimate-fastapi-project-blueprint-build-scalable-secure-and-maintainable-systems-with-ease-acbc4e058012)
Expand Down
19 changes: 11 additions & 8 deletions app/services/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,19 +301,22 @@ async def signin_user(data: auth_schema.UserSignInData, session: AsyncSession):
async def refresh_token(
token_data: auth_schema.RefreshTokenModel, session: AsyncSession
):
payload = jwt.decode(
token_data.refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM]
)
try:
payload = jwt.decode(
token_data.refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM]
)
except Exception:
logger.error("JWT Decode Failed!")
raise HTTPException(detail="Invalid Refresh Token", status_code=400)

if not payload:
raise HTTPException(status_code=401, detail="Invalid refresh token")
email = payload.get("sub")
if not email:
raise HTTPException(status_code=400, detail="Missing user ID in token")
raise HTTPException(status_code=401, detail="Invalid Refresh Token")

email = payload.get("sub")
user = await get_user(email, session)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found"
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Refresh Token"
)

new_access_token = create_access_token(data={"sub": email})
Expand Down
85 changes: 85 additions & 0 deletions app/services/tests/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta
from typing import Any
from unittest.mock import patch

import pytest
from faker import Faker
Expand Down Expand Up @@ -222,3 +223,87 @@ async def test_create_access_token(expires_delta: timedelta | None):
)
async def test_refresh_access_token(expires_delta: timedelta | None):
auth_services.create_refresh_token({"sub": faker.email()}, expires_delta)


async def test_authenticate_user(user: UserDB, session: AsyncSession):
result = await auth_services.authenticate_user(user.email, "password", session)
assert isinstance(result, UserDB)
assert result.id == user.id


@pytest.mark.parametrize(
"email,password",
[
(faker.email(), "password"),
(faker.email(), "incorrectpassword"),
],
)
async def test_authenticate_user_returns_none(
session: AsyncSession, email: str, password: str
):
result = await auth_services.authenticate_user(email, password, session)
assert result is False


async def test_signup_user(session: AsyncSession):
email = faker.email()
result = await auth_services.signup_user(
auth_schemas.UserSignUpData(email=email, password="password"),
session,
BackgroundTasks(tasks=[]),
)
assert isinstance(result, UserDB)
assert result.email == email


async def test_signin_user(user: UserDB, session: AsyncSession):
result = await auth_services.signin_user(
auth_schemas.UserSignInData(email=user.email, password="password"), session
)
assert isinstance(result, auth_schemas.Token)


async def test_signin_user_for_none_user(session: AsyncSession):
with pytest.raises(HTTPException) as err:
await auth_services.signin_user(
auth_schemas.UserSignInData(email=faker.email(), password="password"),
session,
)

assert err.value.detail == "Incorrect email or password"


async def test_refresh_token(user: UserDB, session: AsyncSession):
initial_refresh_token = auth_services.create_refresh_token({"sub": user.email})
updated_refreshed_token = await auth_services.refresh_token(
auth_schemas.RefreshTokenModel(refresh_token=initial_refresh_token), session
)
assert updated_refreshed_token


@pytest.mark.parametrize(
"email,error_message",
[(faker.email(), "Invalid Refresh Token"), (None, "Invalid Refresh Token")],
)
async def test_refresh_token_fails(
email: str | None, error_message: str, session: AsyncSession
):
invalid_refresh_token = auth_services.create_refresh_token({"sub": email}) # type: ignore
with pytest.raises(HTTPException) as err:
await auth_services.refresh_token(
auth_schemas.RefreshTokenModel(refresh_token=invalid_refresh_token), session
)
assert err.value.detail == error_message


async def test_refresh_token_payload_return_none(session: AsyncSession):
invalid_refresh_token = auth_services.create_refresh_token({"sub": faker.email()}) # type: ignore
with patch("app.services.auth.jwt.decode") as mock_decode:
mock_decode.return_value = None

with pytest.raises(HTTPException) as err:
await auth_services.refresh_token(
auth_schemas.RefreshTokenModel(refresh_token=invalid_refresh_token),
session,
)
assert err.value.detail == "Invalid Refresh Token"
2 changes: 1 addition & 1 deletion app/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def test_trusted_host_middleware_denied():


# Uncomment this to activate test after setting up CORS
async def _test_cors_middleware_headers():
async def _test_cors_middleware_headers(): # pragma: no cover
# Change the value of the CORS url here to match your config
headers = {"Origin": "http://localhost:3000"}
async with AsyncClient(
Expand Down