diff --git a/.coverage b/.coverage index fd7fb8e..5f3da80 100644 Binary files a/.coverage and b/.coverage differ diff --git a/README.md b/README.md index 920666d..28bcbe5 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ⚙️ You can Generate Project Interactively Based on this template with the [FastAPI Gen8 CLI Tool](https://pypi.org/project/fastapi-gen8/) -![Coverage Badge](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/brianobot/b56b3d61a5e739fd26252cda094bace2/raw) +![Test Coverage Badge](https://img.shields.io/endpoint?url=https://gist.githubusercontent.com/brianobot/b56b3d61a5e739fd26252cda094bace2/raw) [📖 Read Article here](https://medium.com/@brianobot9/the-ultimate-fastapi-project-blueprint-build-scalable-secure-and-maintainable-systems-with-ease-acbc4e058012) diff --git a/app/services/auth.py b/app/services/auth.py index 3185e1f..d14638c 100644 --- a/app/services/auth.py +++ b/app/services/auth.py @@ -301,19 +301,22 @@ async def signin_user(data: auth_schema.UserSignInData, session: AsyncSession): async def refresh_token( token_data: auth_schema.RefreshTokenModel, session: AsyncSession ): - payload = jwt.decode( - token_data.refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM] - ) + try: + payload = jwt.decode( + token_data.refresh_token, JWT_SECRET, algorithms=[JWT_ALGORITHM] + ) + except Exception: + logger.error("JWT Decode Failed!") + raise HTTPException(detail="Invalid Refresh Token", status_code=400) + if not payload: - raise HTTPException(status_code=401, detail="Invalid refresh token") - email = payload.get("sub") - if not email: - raise HTTPException(status_code=400, detail="Missing user ID in token") + raise HTTPException(status_code=401, detail="Invalid Refresh Token") + email = payload.get("sub") user = await get_user(email, session) if not user: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" + status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Refresh Token" ) new_access_token = create_access_token(data={"sub": email}) diff --git a/app/services/tests/test_auth.py b/app/services/tests/test_auth.py index e1e643b..78931fd 100644 --- a/app/services/tests/test_auth.py +++ b/app/services/tests/test_auth.py @@ -1,5 +1,6 @@ from datetime import timedelta from typing import Any +from unittest.mock import patch import pytest from faker import Faker @@ -222,3 +223,87 @@ async def test_create_access_token(expires_delta: timedelta | None): ) async def test_refresh_access_token(expires_delta: timedelta | None): auth_services.create_refresh_token({"sub": faker.email()}, expires_delta) + + +async def test_authenticate_user(user: UserDB, session: AsyncSession): + result = await auth_services.authenticate_user(user.email, "password", session) + assert isinstance(result, UserDB) + assert result.id == user.id + + +@pytest.mark.parametrize( + "email,password", + [ + (faker.email(), "password"), + (faker.email(), "incorrectpassword"), + ], +) +async def test_authenticate_user_returns_none( + session: AsyncSession, email: str, password: str +): + result = await auth_services.authenticate_user(email, password, session) + assert result is False + + +async def test_signup_user(session: AsyncSession): + email = faker.email() + result = await auth_services.signup_user( + auth_schemas.UserSignUpData(email=email, password="password"), + session, + BackgroundTasks(tasks=[]), + ) + assert isinstance(result, UserDB) + assert result.email == email + + +async def test_signin_user(user: UserDB, session: AsyncSession): + result = await auth_services.signin_user( + auth_schemas.UserSignInData(email=user.email, password="password"), session + ) + assert isinstance(result, auth_schemas.Token) + + +async def test_signin_user_for_none_user(session: AsyncSession): + with pytest.raises(HTTPException) as err: + await auth_services.signin_user( + auth_schemas.UserSignInData(email=faker.email(), password="password"), + session, + ) + + assert err.value.detail == "Incorrect email or password" + + +async def test_refresh_token(user: UserDB, session: AsyncSession): + initial_refresh_token = auth_services.create_refresh_token({"sub": user.email}) + updated_refreshed_token = await auth_services.refresh_token( + auth_schemas.RefreshTokenModel(refresh_token=initial_refresh_token), session + ) + assert updated_refreshed_token + + +@pytest.mark.parametrize( + "email,error_message", + [(faker.email(), "Invalid Refresh Token"), (None, "Invalid Refresh Token")], +) +async def test_refresh_token_fails( + email: str | None, error_message: str, session: AsyncSession +): + invalid_refresh_token = auth_services.create_refresh_token({"sub": email}) # type: ignore + with pytest.raises(HTTPException) as err: + await auth_services.refresh_token( + auth_schemas.RefreshTokenModel(refresh_token=invalid_refresh_token), session + ) + assert err.value.detail == error_message + + +async def test_refresh_token_payload_return_none(session: AsyncSession): + invalid_refresh_token = auth_services.create_refresh_token({"sub": faker.email()}) # type: ignore + with patch("app.services.auth.jwt.decode") as mock_decode: + mock_decode.return_value = None + + with pytest.raises(HTTPException) as err: + await auth_services.refresh_token( + auth_schemas.RefreshTokenModel(refresh_token=invalid_refresh_token), + session, + ) + assert err.value.detail == "Invalid Refresh Token" diff --git a/app/tests/test_main.py b/app/tests/test_main.py index 289623a..ded6b2b 100644 --- a/app/tests/test_main.py +++ b/app/tests/test_main.py @@ -65,7 +65,7 @@ async def test_trusted_host_middleware_denied(): # Uncomment this to activate test after setting up CORS -async def _test_cors_middleware_headers(): +async def _test_cors_middleware_headers(): # pragma: no cover # Change the value of the CORS url here to match your config headers = {"Origin": "http://localhost:3000"} async with AsyncClient(