From e9b69c913a36c97782d298d71ca1a88784551d48 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Tue, 9 Dec 2025 16:59:00 +0100 Subject: [PATCH 1/7] feat: introduced a first version of an error handler --- app/auth.py | 8 ++- app/database/db.py | 2 +- app/error.py | 62 +++++++++++++++++ app/main.py | 2 + app/middleware/error_handling.py | 51 ++++++++++++++ app/routers/unit_jobs.py | 116 +++++++++++++++++++++++-------- 6 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 app/error.py create mode 100644 app/middleware/error_handling.py diff --git a/app/auth.py b/app/auth.py index b09db15..1ae154d 100644 --- a/app/auth.py +++ b/app/auth.py @@ -6,6 +6,8 @@ from jwt import PyJWKClient from loguru import logger +from app.error import AuthException + from .config.settings import settings # Keycloak OIDC info @@ -37,9 +39,9 @@ def _decode_token(token: str): ) return payload except Exception: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", + raise AuthException( + http_status=status.HTTP_401_UNAUTHORIZED, + message="Could not validate credentials!", ) diff --git a/app/database/db.py b/app/database/db.py index e7cfd0a..1ebf87f 100644 --- a/app/database/db.py +++ b/app/database/db.py @@ -35,7 +35,7 @@ def get_db(): yield db db.commit() except Exception: - logger.exception("An error occurred during database retrieval") + logger.error("An error occurred during database retrieval") db.rollback() raise finally: diff --git a/app/error.py b/app/error.py new file mode 100644 index 0000000..40186e9 --- /dev/null +++ b/app/error.py @@ -0,0 +1,62 @@ +from typing import Any, Dict, Optional +from fastapi import status +from pydantic import BaseModel + + +class ErrorResponse(BaseModel): + status: str = "error" + error_code: str + message: str + details: Optional[Dict[str, Any]] = None + request_id: Optional[str] = None + + +class DispatcherException(Exception): + """ + Base domain exception for the APEx Dispatch API. + """ + + http_status: int = status.HTTP_400_BAD_REQUEST + error_code: str = "APEX_ERROR" + message: str = "An error occurred." + details: Optional[Dict[str, Any]] = None + + def __init__( + self, + message: Optional[str] = None, + error_code: Optional[str] = None, + http_status: Optional[int] = None, + details: Optional[Dict[str, Any]] = None, + ): + if message: + self.message = message + if error_code: + self.error_code = error_code + if http_status: + self.http_status = http_status + if details: + self.details = details + + def __str__(self): + return f"{self.error_code}: {self.message}" + + +class AuthException(DispatcherException): + def __init__( + self, + http_status: Optional[int] = status.HTTP_401_UNAUTHORIZED, + message: Optional[Dict[str, Any]] = "Authentication failed.", + ): + super().__init__(message, "AUTHENTICATION_FAILED", http_status) + + +class JobNotFoundException(DispatcherException): + http_status: int = status.HTTP_404_NOT_FOUND + error_code: str = "JOB_NOT_FOUND" + message: str = "The requested job was not found." + + +class InternalException(DispatcherException): + http_status: int = status.HTTP_500_INTERNAL_SERVER_ERROR + error_code: str = "INTERNAL_ERROR" + message: str = "An internal server error occurred." diff --git a/app/main.py b/app/main.py index ca2ca98..4b5f9a0 100644 --- a/app/main.py +++ b/app/main.py @@ -2,6 +2,7 @@ from fastapi.middleware.cors import CORSMiddleware from app.middleware.correlation_id import add_correlation_id +from app.middleware.error_handling import register_exception_handlers from app.platforms.dispatcher import load_processing_platforms from app.services.tiles.base import load_grids from app.config.logger import setup_logging @@ -28,6 +29,7 @@ ) app.middleware("http")(add_correlation_id) +register_exception_handlers(app) # include routers app.include_router(tiles.router) diff --git a/app/middleware/error_handling.py b/app/middleware/error_handling.py new file mode 100644 index 0000000..11f0674 --- /dev/null +++ b/app/middleware/error_handling.py @@ -0,0 +1,51 @@ +from fastapi import Request, status +from fastapi.responses import JSONResponse +from app.error import DispatcherException, ErrorResponse +from app.middleware.correlation_id import correlation_id_ctx +from loguru import logger + + +def get_dispatcher_error_response( + exc: DispatcherException, request_id: str +) -> ErrorResponse: + return ErrorResponse( + error_code=exc.error_code, + message=exc.message, + details=exc.details, + request_id=request_id, + ) + + +async def dispatch_exception_handler(request: Request, exc: DispatcherException): + + content = get_dispatcher_error_response(exc, correlation_id_ctx.get()) + logger.exception(f"DispatcherException raised: {exc.message}") + return JSONResponse(status_code=exc.http_status, content=content.dict()) + + +async def generic_exception_handler(request: Request, exc: Exception): + + # DO NOT expose internal exceptions to the client + content = ErrorResponse( + error_code="INTERNAL_SERVER_ERROR", + message="An unexpected error occurred.", + details=None, + request_id=correlation_id_ctx.get(), + ) + + # Log exception to server logs for debugging + print(f"[ERROR] Request ID: {exc}") + + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=content.dict() + ) + + +def register_exception_handlers(app): + """ + Call this in main.py after creating the FastAPI() instance. + """ + + app.add_exception_handler(DispatcherException, dispatch_exception_handler) + # app.add_exception_handler(RequestValidationError, validation_exception_handler) + app.add_exception_handler(Exception, generic_exception_handler) diff --git a/app/routers/unit_jobs.py b/app/routers/unit_jobs.py index a20019c..8048b13 100644 --- a/app/routers/unit_jobs.py +++ b/app/routers/unit_jobs.py @@ -5,6 +5,14 @@ from app.auth import oauth2_scheme from app.database.db import get_db +from app.error import ( + AuthException, + DispatcherException, + ErrorResponse, + InternalException, + JobNotFoundException, +) +from app.middleware.error_handling import get_dispatcher_error_response from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum from app.schemas.unit_job import ( BaseJobRequest, @@ -30,6 +38,19 @@ status_code=status.HTTP_201_CREATED, tags=["Unit Jobs"], summary="Create a new processing job", + responses={ + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, + }, ) async def create_unit_job( payload: Annotated[ @@ -105,20 +126,42 @@ async def create_unit_job( """Create a new processing job with the provided data.""" try: return await create_processing_job(token, db, payload) - except HTTPException as e: - raise e + except DispatcherException as de: + raise de except Exception as e: - logger.exception(f"Error creating processing job: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred while creating the processing job: {e}", + logger.error(f"Error creating processing job: {e}") + raise InternalException( + message="An error occurred while creating processing job." ) @router.get( "/unit_jobs/{job_id}", tags=["Unit Jobs"], - responses={404: {"description": "Processing job not found"}}, + responses={ + JobNotFoundException.http_status: { + "description": "Job not found", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + JobNotFoundException(), "request-id" + ) + } + }, + }, + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, + }, ) async def get_job( job_id: int, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) @@ -126,26 +169,44 @@ async def get_job( try: job = await get_processing_job_by_user_id(token, db, job_id) if not job: - logger.error(f"Processing job {job_id} not found") - raise HTTPException( - status_code=404, - detail=f"Processing job {job_id} not found", - ) + raise JobNotFoundException() return job - except HTTPException as e: - raise e + except DispatcherException as de: + raise de except Exception as e: - logger.exception(f"Error retrieving processing job {job_id}: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred while retrieving processing job {job_id}: {e}", + logger.error(f"Error retrieving processing job {job_id}: {e}") + raise InternalException( + message="An error occurred while retrieving the processing job." ) @router.get( "/unit_jobs/{job_id}/results", tags=["Unit Jobs"], - responses={404: {"description": "Processing job not found"}}, + responses={ + JobNotFoundException.http_status: { + "description": "Job not found", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + JobNotFoundException(), "request-id" + ) + } + }, + }, + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, + }, ) async def get_job_results( job_id: int, db: Session = Depends(get_db), token: str = Depends(oauth2_scheme) @@ -153,17 +214,12 @@ async def get_job_results( try: result = await get_processing_job_results(token, db, job_id) if not result: - logger.error(f"Result for processing job {job_id} not found") - raise HTTPException( - status_code=404, - detail=f"Result for processing job {job_id} not found", - ) + raise JobNotFoundException() return result - except HTTPException as e: - raise e + except DispatcherException as de: + raise de except Exception as e: - logger.exception(f"Error getting results for processing job {job_id}: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred while retrieving results for processing job {job_id}: {e}", + logger.error(f"Error getting results for processing job {job_id}: {e}") + raise InternalException( + message="An error occurred while retrieving processing job results." ) From 5cf6f884dcd01e058fd5b11f3871f48f55c4d677 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 10 Dec 2025 08:53:55 +0100 Subject: [PATCH 2/7] feat: updated error handling in job status retrieval --- app/auth.py | 46 +++++++++++++++++----- app/middleware/error_handling.py | 21 ++++++++-- app/routers/jobs_status.py | 67 ++++++++++++++++++++++++-------- 3 files changed, 103 insertions(+), 31 deletions(-) diff --git a/app/auth.py b/app/auth.py index 1ae154d..a8ee657 100644 --- a/app/auth.py +++ b/app/auth.py @@ -6,7 +6,8 @@ from jwt import PyJWKClient from loguru import logger -from app.error import AuthException +from app.error import AuthException, DispatcherException +from app.schemas.websockets import WSStatusMessage from .config.settings import settings @@ -57,6 +58,7 @@ async def websocket_authenticate(websocket: WebSocket) -> str | None: """ logger.debug("Authenticating websocket") token = websocket.query_params.get("token") + if not token: logger.error("Token is missing from websocket authentication") await websocket.close(code=1008, reason="Missing token") @@ -65,9 +67,20 @@ async def websocket_authenticate(websocket: WebSocket) -> str | None: try: await websocket.accept() return token + except DispatcherException as ae: + logger.error(f"Dispatcher exception detected: {ae.message}") + await websocket.send_json( + WSStatusMessage(type="error", message=ae.message).model_dump() + ) + await websocket.close(code=1008, reason=ae.error_code) + return None except Exception as e: - logger.error(f"Invalid token in websocket authentication: {e}") - await websocket.close(code=1008, reason="Invalid token") + logger.error(f"Unexpected error occurred during websocket authentication: {e}") + await WSStatusMessage( + type="error", + message="Something went wrong during authentication. Please try again.", + ).model_dump() + await websocket.close(code=1008, reason="INTERNAL_ERROR") return None @@ -107,9 +120,12 @@ async def exchange_token_for_provider( resp = await client.post(token_url, data=payload) except httpx.RequestError as exc: logger.error(f"Token exchange network error for provider={provider}: {exc}") - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail="Failed to contact the identity provider for token exchange.", + raise AuthException( + http_status=status.HTTP_502_BAD_GATEWAY, + message=( + f"Could not authenticate with {provider}. Please contact APEx support or reach out " + "through the APEx User Forum." + ), ) # Parse response @@ -119,9 +135,12 @@ async def exchange_token_for_provider( logger.error( f"Token exchange invalid JSON response (status={resp.status_code})" ) - raise HTTPException( - status_code=status.HTTP_502_BAD_GATEWAY, - detail="Invalid response from identity provider during token exchange.", + raise AuthException( + http_status=status.HTTP_502_BAD_GATEWAY, + message=( + f"Could not authenticate with {provider}. Please contact APEx support or reach out " + "through the APEx User Forum." + ), ) if resp.status_code != 200: @@ -138,7 +157,14 @@ async def exchange_token_for_provider( else status.HTTP_502_BAD_GATEWAY ) - raise HTTPException(client_status, detail=body) + raise AuthException( + http_status=client_status, + message=( + f"Please link your account with {provider} in your Account Dashboard" + if body.get("error", "") == "not_linked" + else f"Could not authenticate with {provider}: {err}" + ), + ) # Successful exchange, return token response (access_token, expires_in, etc.) return body diff --git a/app/middleware/error_handling.py b/app/middleware/error_handling.py index 11f0674..4cc6182 100644 --- a/app/middleware/error_handling.py +++ b/app/middleware/error_handling.py @@ -1,4 +1,5 @@ from fastapi import Request, status +from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from app.error import DispatcherException, ErrorResponse from app.middleware.correlation_id import correlation_id_ctx @@ -33,19 +34,31 @@ async def generic_exception_handler(request: Request, exc: Exception): request_id=correlation_id_ctx.get(), ) - # Log exception to server logs for debugging - print(f"[ERROR] Request ID: {exc}") - + logger.exception(f"GenericException raised: {exc}") return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=content.dict() ) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + + content = ErrorResponse( + error_code="VALIDATION_ERROR", + message="Request validation failed.", + details={"errors": exc.errors()}, + request_id=correlation_id_ctx.get(), + ) + + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=content.dict() + ) + + def register_exception_handlers(app): """ Call this in main.py after creating the FastAPI() instance. """ app.add_exception_handler(DispatcherException, dispatch_exception_handler) - # app.add_exception_handler(RequestValidationError, validation_exception_handler) + app.add_exception_handler(RequestValidationError, validation_exception_handler) app.add_exception_handler(Exception, generic_exception_handler) diff --git a/app/routers/jobs_status.py b/app/routers/jobs_status.py index 16fcd3f..90dfcdf 100644 --- a/app/routers/jobs_status.py +++ b/app/routers/jobs_status.py @@ -7,6 +7,8 @@ from loguru import logger from app.database.db import SessionLocal, get_db +from app.error import DispatcherException, ErrorResponse, InternalException +from app.middleware.error_handling import get_dispatcher_error_response from app.schemas.jobs_status import JobsFilter, JobsStatusResponse from app.schemas.websockets import WSStatusMessage from app.services.processing import get_processing_jobs_by_user_id @@ -22,6 +24,19 @@ "/jobs_status", tags=["Upscale Tasks", "Unit Jobs"], summary="Get a list of all upscaling tasks & processing jobs for the authenticated user", + responses={ + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, + }, ) async def get_jobs_status( db: Session = Depends(get_db), @@ -34,21 +49,29 @@ async def get_jobs_status( """ Return combined list of upscaling tasks and processing jobs for the authenticated user. """ - logger.debug("Fetching jobs list") - upscaling_tasks = ( - await get_upscaling_tasks_by_user_id(token, db) - if JobsFilter.upscaling in filter - else [] - ) - processing_jobs = ( - await get_processing_jobs_by_user_id(token, db) - if JobsFilter.processing in filter - else [] - ) - return JobsStatusResponse( - upscaling_tasks=upscaling_tasks, - processing_jobs=processing_jobs, - ) + try: + logger.debug("Fetching jobs list") + upscaling_tasks = ( + await get_upscaling_tasks_by_user_id(token, db) + if JobsFilter.upscaling in filter + else [] + ) + processing_jobs = ( + await get_processing_jobs_by_user_id(token, db) + if JobsFilter.processing in filter + else [] + ) + return JobsStatusResponse( + upscaling_tasks=upscaling_tasks, + processing_jobs=processing_jobs, + ) + except DispatcherException as de: + raise de + except Exception as e: + logger.error(f"Error retrieving job status: {e}") + raise InternalException( + message="An error occurred while retrieving the job status." + ) @router.websocket( @@ -91,8 +114,18 @@ async def ws_jobs_status( except WebSocketDisconnect: logger.info("WebSocket disconnected") + except DispatcherException as ae: + logger.error(f"Dispatcher exception detected: {ae.message}") + await websocket.send_json( + WSStatusMessage(type="error", message=ae.message).model_dump() + ) + await websocket.close(code=1008, reason=ae.error_code) except Exception as e: - logger.exception(f"Error in jobs_status_ws: {e}") - await websocket.close(code=1011, reason="Error in job status websocket: {e}") + logger.error(f"Unexpected error occurred during websocket authentication: {e}") + await WSStatusMessage( + type="error", + message="Something went wrong during authentication. Please try again.", + ).model_dump() + await websocket.close(code=1008, reason="INTERNAL_ERROR") finally: db.close() From d147735249a46cc4ddd15c0f7e625d3069bde3f4 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 10 Dec 2025 09:31:06 +0100 Subject: [PATCH 3/7] feat: updated error handling for tiles endpoint --- app/middleware/error_handling.py | 12 +++++++++++- app/routers/tiles.py | 26 ++++++++++++++++++++------ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/app/middleware/error_handling.py b/app/middleware/error_handling.py index 4cc6182..0798e67 100644 --- a/app/middleware/error_handling.py +++ b/app/middleware/error_handling.py @@ -1,3 +1,4 @@ +from typing import Any from fastapi import Request, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse @@ -40,15 +41,24 @@ async def generic_exception_handler(request: Request, exc: Exception): ) +def _parse_validation_error(err: Any): + if "ctx" in err: + del err["ctx"] + return err + + async def validation_exception_handler(request: Request, exc: RequestValidationError): + logger.error(f"Request validation error: {exc.__class__.__name__}: {exc}") content = ErrorResponse( error_code="VALIDATION_ERROR", message="Request validation failed.", - details={"errors": exc.errors()}, + details={"errors": [_parse_validation_error(error) for error in exc.errors()]}, request_id=correlation_id_ctx.get(), ) + logger.error(content.dict()) + return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=content.dict() ) diff --git a/app/routers/tiles.py b/app/routers/tiles.py index 262d836..3ffa25c 100644 --- a/app/routers/tiles.py +++ b/app/routers/tiles.py @@ -3,6 +3,8 @@ from geojson_pydantic import GeometryCollection, Polygon from loguru import logger +from app.error import DispatcherException, ErrorResponse, InternalException +from app.middleware.error_handling import get_dispatcher_error_response from app.schemas.tiles import GridTypeEnum, TileRequest from app.services.tiles.base import split_polygon_by_grid @@ -54,7 +56,18 @@ } } }, - } + }, + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, }, ) def split_in_tiles( @@ -90,11 +103,12 @@ def split_in_tiles( try: logger.debug(f"Splitting tiles in a {payload.grid} formation") return split_polygon_by_grid(payload.aoi, payload.grid) + except DispatcherException as de: + raise de except Exception as e: - logger.exception( - f"An error occurred while calculating tiles for {payload.grid}" + logger.error( + f"An error occurred while calculating tiles for {payload.grid}: {e}" ) - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred while calculating tiles for {payload.grid}: {e}", + raise InternalException( + message=f"An error occurred while calculating tiles for {payload.grid}" ) From 19ab9f48cfc4acc417301d07f5c0c33a5b3ceff8 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 10 Dec 2025 11:25:08 +0100 Subject: [PATCH 4/7] feat: aligned error handling for upscale_tasks --- app/auth.py | 2 +- app/error.py | 6 +++ app/routers/jobs_status.py | 4 +- app/routers/upscale_tasks.py | 97 ++++++++++++++++++++++++++++-------- 4 files changed, 84 insertions(+), 25 deletions(-) diff --git a/app/auth.py b/app/auth.py index a8ee657..8952115 100644 --- a/app/auth.py +++ b/app/auth.py @@ -42,7 +42,7 @@ def _decode_token(token: str): except Exception: raise AuthException( http_status=status.HTTP_401_UNAUTHORIZED, - message="Could not validate credentials!", + message="Could not validate credentials. Please retry signing in.", ) diff --git a/app/error.py b/app/error.py index 40186e9..7c9f427 100644 --- a/app/error.py +++ b/app/error.py @@ -56,6 +56,12 @@ class JobNotFoundException(DispatcherException): message: str = "The requested job was not found." +class TaskNotFoundException(DispatcherException): + http_status: int = status.HTTP_404_NOT_FOUND + error_code: str = "TASK_NOT_FOUND" + message: str = "The requested task was not found." + + class InternalException(DispatcherException): http_status: int = status.HTTP_500_INTERNAL_SERVER_ERROR error_code: str = "INTERNAL_ERROR" diff --git a/app/routers/jobs_status.py b/app/routers/jobs_status.py index 90dfcdf..ffb9d1e 100644 --- a/app/routers/jobs_status.py +++ b/app/routers/jobs_status.py @@ -121,10 +121,10 @@ async def ws_jobs_status( ) await websocket.close(code=1008, reason=ae.error_code) except Exception as e: - logger.error(f"Unexpected error occurred during websocket authentication: {e}") + logger.error(f"Unexpected error occurred during websocket : {e}") await WSStatusMessage( type="error", - message="Something went wrong during authentication. Please try again.", + message="An error occurred while monitoring the job status.", ).model_dump() await websocket.close(code=1008, reason="INTERNAL_ERROR") finally: diff --git a/app/routers/upscale_tasks.py b/app/routers/upscale_tasks.py index 4c2d61a..5f513de 100644 --- a/app/routers/upscale_tasks.py +++ b/app/routers/upscale_tasks.py @@ -16,6 +16,13 @@ from app.auth import oauth2_scheme, websocket_authenticate from app.database.db import SessionLocal, get_db +from app.error import ( + DispatcherException, + ErrorResponse, + InternalException, + TaskNotFoundException, +) +from app.middleware.error_handling import get_dispatcher_error_response from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum from app.schemas.unit_job import ( ServiceDetails, @@ -43,6 +50,19 @@ status_code=status.HTTP_201_CREATED, tags=["Upscale Tasks"], summary="Create a new upscaling task", + responses={ + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, + }, ) async def create_upscale_task( payload: Annotated[ @@ -120,20 +140,42 @@ async def create_upscale_task( upscaling_task_id=task.id, ) return task - except HTTPException as e: - raise e + except DispatcherException as de: + raise de except Exception as e: - logger.exception(f"Error creating upscale task: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred while creating the upscale task: {e}", + logger.error(f"Error getting creating upscaling task: {e}") + raise InternalException( + message="An error occurred while retrieving processing job results." ) @router.get( "/upscale_tasks/{task_id}", tags=["Upscale Tasks"], - responses={404: {"description": "Upscale task not found"}}, + responses={ + TaskNotFoundException.http_status: { + "description": "Upscaling not found", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + TaskNotFoundException(), "request-id" + ) + } + }, + }, + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, + }, ) async def get_upscale_task( task_id: int, @@ -144,24 +186,18 @@ async def get_upscale_task( job = await get_upscaling_task_by_user_id(token, db, task_id) if not job: logger.error(f"Upscale task {task_id} not found") - raise HTTPException( - status_code=404, - detail=f"Upscale task {task_id} not found", - ) + raise TaskNotFoundException() return job - except HTTPException as e: - raise e + except DispatcherException as de: + raise de except Exception as e: - logger.exception(f"Error retrieving upscale task {task_id}: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred while retrieving the upscale task {task_id}: {e}", + logger.error(f"Error retrieving upscale task {task_id}: {e}") + raise InternalException( + message="An error occurred while retrieving the upscale task." ) -@router.websocket( - "/ws/upscale_tasks/{task_id}", -) +@router.websocket("/ws/upscale_tasks/{task_id}") async def ws_task_status( websocket: WebSocket, task_id: int, @@ -212,6 +248,23 @@ async def ws_task_status( except WebSocketDisconnect: logger.info("WebSocket disconnected") + except DispatcherException as ae: + logger.error(f"Dispatcher exception detected: {ae.message}") + await websocket.send_json( + WSTaskStatusMessage( + type="error", task_id=task_id, message=ae.message + ).model_dump() + ) + await websocket.close(code=1008, reason=ae.error_code) except Exception as e: - logger.exception(f"Error in upscaling task status websocket: {e}") - await websocket.close(code=1011, reason=f"Error in job status websocket: {e}") + logger.error( + f"An error occurred while monitoring upscaling task {task_id}: {e}" + ) + await WSTaskStatusMessage( + type="error", + task_id=task_id, + message="An error occurred while monitoring upscaling task.", + ).model_dump() + await websocket.close(code=1008, reason="INTERNAL_ERROR") + finally: + db.close() From 323d95dbcd8e1a3b6227f511de8eedc42a5422b4 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 10 Dec 2025 11:28:36 +0100 Subject: [PATCH 5/7] feat: updated error handling on auth component --- app/auth.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/app/auth.py b/app/auth.py index 8952115..074e68f 100644 --- a/app/auth.py +++ b/app/auth.py @@ -1,7 +1,7 @@ from typing import Any, Dict import httpx import jwt -from fastapi import Depends, HTTPException, WebSocket, status +from fastapi import Depends, WebSocket, status from fastapi.security import OAuth2AuthorizationCodeBearer from jwt import PyJWKClient from loguru import logger @@ -96,15 +96,15 @@ async def exchange_token_for_provider( :return: The token response (dict) on success. - :raise: Raises HTTPException with an appropriate status and message on error. + :raise: Raises AuthException with an appropriate status and message on error. """ token_url = f"{KEYCLOAK_BASE_URL}/protocol/openid-connect/token" # Check if the necessary settings are in place if not settings.keycloak_client_id or not settings.keycloak_client_secret: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Token exchange not configured on the server (missing client credentials).", + raise AuthException( + http_status=status.HTTP_500_INTERNAL_SERVER_ERROR, + message="Token exchange not configured on the server (missing client credentials).", ) payload = { From 0ea1c07a1e30b4f40e73807b3e32aa67e6c9f6d7 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 10 Dec 2025 11:29:57 +0100 Subject: [PATCH 6/7] feat: updated error handling for sync jobs --- app/routers/sync_jobs.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/app/routers/sync_jobs.py b/app/routers/sync_jobs.py index 7a83b9c..f739c02 100644 --- a/app/routers/sync_jobs.py +++ b/app/routers/sync_jobs.py @@ -2,6 +2,8 @@ from fastapi import Body, APIRouter, Depends, HTTPException, Response, status from loguru import logger +from app.error import DispatcherException, ErrorResponse, InternalException +from app.middleware.error_handling import get_dispatcher_error_response from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum from app.schemas.unit_job import ( BaseJobRequest, @@ -23,6 +25,19 @@ status_code=status.HTTP_201_CREATED, tags=["Unit Jobs"], summary="Create a new processing job", + responses={ + InternalException.http_status: { + "description": "Internal server error", + "model": ErrorResponse, + "content": { + "application/json": { + "example": get_dispatcher_error_response( + InternalException(), "request-id" + ) + } + }, + }, + }, ) async def create_sync_job( payload: Annotated[ @@ -97,11 +112,10 @@ async def create_sync_job( """Initiate a synchronous processing job with the provided data and return the result.""" try: return await create_synchronous_job(token, payload) - except HTTPException as e: - raise e + except DispatcherException as de: + raise de except Exception as e: - logger.exception(f"Error creating synchronous job: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"An error occurred while creating the synchronous job: {e}", + logger.error(f"Error creating synchronous job: {e}") + raise InternalException( + message="An error occurred while creating the synchronous job." ) From 929b9236febcb275dc7d3f2be66afb71b1accb11 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 10 Dec 2025 12:02:55 +0100 Subject: [PATCH 7/7] fix: tests --- app/auth.py | 14 +++++--- app/error.py | 2 +- app/routers/jobs_status.py | 14 ++++---- app/routers/sync_jobs.py | 2 +- app/routers/tiles.py | 2 +- app/routers/unit_jobs.py | 3 +- app/routers/upscale_tasks.py | 17 +++++----- tests/routers/test_job_status.py | 1 + tests/routers/test_sync_jobs.py | 18 +++++------ tests/routers/test_tiles.py | 5 +-- tests/routers/test_unit_jobs.py | 50 ++++++++++++++++------------- tests/routers/test_upscale_tasks.py | 50 +++++++++++++++-------------- 12 files changed, 96 insertions(+), 82 deletions(-) diff --git a/app/auth.py b/app/auth.py index 074e68f..d27d605 100644 --- a/app/auth.py +++ b/app/auth.py @@ -76,10 +76,12 @@ async def websocket_authenticate(websocket: WebSocket) -> str | None: return None except Exception as e: logger.error(f"Unexpected error occurred during websocket authentication: {e}") - await WSStatusMessage( - type="error", - message="Something went wrong during authentication. Please try again.", - ).model_dump() + await websocket.send_json( + WSStatusMessage( + type="error", + message="Something went wrong during authentication. Please try again.", + ).model_dump() + ) await websocket.close(code=1008, reason="INTERNAL_ERROR") return None @@ -160,7 +162,9 @@ async def exchange_token_for_provider( raise AuthException( http_status=client_status, message=( - f"Please link your account with {provider} in your Account Dashboard" + f"Please link your account with {provider} in your " + "Account Dashboard" if body.get("error", "") == "not_linked" else f"Could not authenticate with {provider}: {err}" ), diff --git a/app/error.py b/app/error.py index 7c9f427..896b1af 100644 --- a/app/error.py +++ b/app/error.py @@ -45,7 +45,7 @@ class AuthException(DispatcherException): def __init__( self, http_status: Optional[int] = status.HTTP_401_UNAUTHORIZED, - message: Optional[Dict[str, Any]] = "Authentication failed.", + message: Optional[str] = "Authentication failed.", ): super().__init__(message, "AUTHENTICATION_FAILED", http_status) diff --git a/app/routers/jobs_status.py b/app/routers/jobs_status.py index ffb9d1e..5f24de1 100644 --- a/app/routers/jobs_status.py +++ b/app/routers/jobs_status.py @@ -119,13 +119,15 @@ async def ws_jobs_status( await websocket.send_json( WSStatusMessage(type="error", message=ae.message).model_dump() ) - await websocket.close(code=1008, reason=ae.error_code) + await websocket.close(code=1011, reason=ae.error_code) except Exception as e: logger.error(f"Unexpected error occurred during websocket : {e}") - await WSStatusMessage( - type="error", - message="An error occurred while monitoring the job status.", - ).model_dump() - await websocket.close(code=1008, reason="INTERNAL_ERROR") + await websocket.send_json( + WSStatusMessage( + type="error", + message="An error occurred while monitoring the job status.", + ).model_dump() + ) + await websocket.close(code=1011, reason="INTERNAL_ERROR") finally: db.close() diff --git a/app/routers/sync_jobs.py b/app/routers/sync_jobs.py index f739c02..aba1850 100644 --- a/app/routers/sync_jobs.py +++ b/app/routers/sync_jobs.py @@ -1,5 +1,5 @@ from typing import Annotated -from fastapi import Body, APIRouter, Depends, HTTPException, Response, status +from fastapi import Body, APIRouter, Depends, Response, status from loguru import logger from app.error import DispatcherException, ErrorResponse, InternalException diff --git a/app/routers/tiles.py b/app/routers/tiles.py index 3ffa25c..57fa3b3 100644 --- a/app/routers/tiles.py +++ b/app/routers/tiles.py @@ -1,5 +1,5 @@ from typing import Annotated -from fastapi import APIRouter, HTTPException, status, Body +from fastapi import APIRouter, status, Body from geojson_pydantic import GeometryCollection, Polygon from loguru import logger diff --git a/app/routers/unit_jobs.py b/app/routers/unit_jobs.py index 8048b13..d70102b 100644 --- a/app/routers/unit_jobs.py +++ b/app/routers/unit_jobs.py @@ -1,12 +1,11 @@ from typing import Annotated -from fastapi import Body, APIRouter, Depends, HTTPException, status +from fastapi import Body, APIRouter, Depends, status from loguru import logger from sqlalchemy.orm import Session from app.auth import oauth2_scheme from app.database.db import get_db from app.error import ( - AuthException, DispatcherException, ErrorResponse, InternalException, diff --git a/app/routers/upscale_tasks.py b/app/routers/upscale_tasks.py index 5f513de..b88028d 100644 --- a/app/routers/upscale_tasks.py +++ b/app/routers/upscale_tasks.py @@ -6,7 +6,6 @@ Body, APIRouter, Depends, - HTTPException, WebSocket, WebSocketDisconnect, status, @@ -255,16 +254,18 @@ async def ws_task_status( type="error", task_id=task_id, message=ae.message ).model_dump() ) - await websocket.close(code=1008, reason=ae.error_code) + await websocket.close(code=1011, reason=ae.error_code) except Exception as e: logger.error( f"An error occurred while monitoring upscaling task {task_id}: {e}" ) - await WSTaskStatusMessage( - type="error", - task_id=task_id, - message="An error occurred while monitoring upscaling task.", - ).model_dump() - await websocket.close(code=1008, reason="INTERNAL_ERROR") + await websocket.send_json( + WSTaskStatusMessage( + type="error", + task_id=task_id, + message="An error occurred while monitoring upscaling task.", + ).model_dump() + ) + await websocket.close(code=1011, reason="INTERNAL_ERROR") finally: db.close() diff --git a/tests/routers/test_job_status.py b/tests/routers/test_job_status.py index d0f22c7..22f7b8c 100644 --- a/tests/routers/test_job_status.py +++ b/tests/routers/test_job_status.py @@ -103,5 +103,6 @@ async def test_ws_jobs_status_closes_on_error(mock_get_jobs_status, client): websocket.receive_json() websocket.receive_json() websocket.receive_json() + websocket.receive_json() assert exc_info.value.code == 1011 diff --git a/tests/routers/test_sync_jobs.py b/tests/routers/test_sync_jobs.py index 8575ede..e2ee119 100644 --- a/tests/routers/test_sync_jobs.py +++ b/tests/routers/test_sync_jobs.py @@ -1,7 +1,9 @@ import json from unittest.mock import patch -from fastapi import HTTPException +from fastapi import status + +from app.error import InternalException @patch("app.routers.sync_jobs.create_synchronous_job") @@ -27,21 +29,19 @@ def test_sync_jobs_create_500( mock_create_sync_job.side_effect = SystemError("Could not launch the job") r = client.post("/sync_jobs", json=fake_processing_job_request.model_dump()) - assert r.status_code == 500 - assert "could not launch the job" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An error occurred while creating the synchronous job." in r.json().get("message", "") @patch("app.routers.sync_jobs.create_synchronous_job") -def test_sync_jobs_create_http_error( +def test_sync_jobs_create_internal_error( mock_create_sync_job, client, fake_processing_job_request, ): - mock_create_sync_job.side_effect = HTTPException( - status_code=503, detail="Oops, service unavailable" - ) + mock_create_sync_job.side_effect = InternalException() r = client.post("/sync_jobs", json=fake_processing_job_request.model_dump()) - assert r.status_code == 503 - assert "service unavailable" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An internal server error occurred." in r.json().get("message", "") diff --git a/tests/routers/test_tiles.py b/tests/routers/test_tiles.py index d7af1c2..037ffdf 100644 --- a/tests/routers/test_tiles.py +++ b/tests/routers/test_tiles.py @@ -1,6 +1,7 @@ from unittest.mock import patch import pytest from app.schemas.tiles import GridTypeEnum +from fastapi import status @pytest.fixture @@ -28,5 +29,5 @@ def test_split_in_tiles_success(client, dummy_payload): def test_split_in_tiles_unknown_grid(mock_split, client, dummy_payload): mock_split.side_effect = ValueError("Unknown grid: INVALID_GRID") response = client.post("/tiles", json=dummy_payload) - assert response.status_code == 500 - assert "Unknown grid" in response.json()["detail"] + assert response.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An error occurred while calculating tiles for 20x20km" in response.json()["message"] diff --git a/tests/routers/test_unit_jobs.py b/tests/routers/test_unit_jobs.py index e001d49..de4a585 100644 --- a/tests/routers/test_unit_jobs.py +++ b/tests/routers/test_unit_jobs.py @@ -1,7 +1,9 @@ import json from unittest.mock import patch -from fastapi import HTTPException +from fastapi import status + +from app.error import InternalException @patch("app.routers.unit_jobs.create_processing_job") @@ -29,24 +31,24 @@ def test_unit_jobs_create_500( mock_create_processing_job.side_effect = SystemError("Could not launch the job") r = client.post("/unit_jobs", json=fake_processing_job_request.model_dump()) - assert r.status_code == 500 - assert "could not launch the job" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An error occurred while creating processing job." in r.json().get( + "message", "" + ) @patch("app.routers.unit_jobs.create_processing_job") -def test_unit_jobs_create_http_error( +def test_unit_jobs_create_internal_error( mock_create_processing_job, client, fake_processing_job_request, ): - mock_create_processing_job.side_effect = HTTPException( - status_code=503, detail="Oops, service unavailable" - ) + mock_create_processing_job.side_effect = InternalException r = client.post("/unit_jobs", json=fake_processing_job_request.model_dump()) - assert r.status_code == 503 - assert "service unavailable" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An internal server error occurred." in r.json().get("message", "") @patch("app.routers.unit_jobs.get_processing_job_by_user_id") @@ -71,8 +73,8 @@ def test_unit_jobs_get_job_404(mock_get_processing_job, client): mock_get_processing_job.return_value = None r = client.get("/unit_jobs/1") - assert r.status_code == 404 - assert "processing job 1 not found" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_404_NOT_FOUND + assert "The requested job was not found." in r.json().get("message", "") @patch("app.routers.unit_jobs.get_processing_job_by_user_id") @@ -81,20 +83,20 @@ def test_unit_jobs_get_job_500(mock_get_processing_job, client): mock_get_processing_job.side_effect = RuntimeError("Database connection lost") r = client.get("/unit_jobs/1") - assert r.status_code == 500 - assert "database connection lost" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An error occurred while retrieving the processing job." in r.json().get( + "message", "" + ) @patch("app.routers.unit_jobs.get_processing_job_by_user_id") -def test_unit_jobs_get_job_http_error(mock_get_processing_job, client): +def test_unit_jobs_get_job_internal_error(mock_get_processing_job, client): - mock_get_processing_job.side_effect = HTTPException( - status_code=503, detail="Oops, service unavailable" - ) + mock_get_processing_job.side_effect = InternalException() r = client.get("/unit_jobs/1") - assert r.status_code == 503 - assert "service unavailable" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An internal server error occurred." in r.json().get("message", "") @patch("app.routers.unit_jobs.get_processing_job_results") @@ -119,8 +121,8 @@ def test_unit_jobs_get_job_results_404(mock_get_processing_job_results, client): mock_get_processing_job_results.return_value = None r = client.get("/unit_jobs/1/results") - assert r.status_code == 404 - assert "result for processing job 1 not found" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_404_NOT_FOUND + assert "The requested job was not found." in r.json().get("message", "") @patch("app.routers.unit_jobs.get_processing_job_results") @@ -131,5 +133,7 @@ def test_unit_jobs_get_job_results_500(mock_get_processing_job_results, client): ) r = client.get("/unit_jobs/1/results") - assert r.status_code == 500 - assert "database connection lost" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An error occurred while retrieving processing job results." in r.json().get( + "message", "" + ) diff --git a/tests/routers/test_upscale_tasks.py b/tests/routers/test_upscale_tasks.py index 86e3d53..7c5d01a 100644 --- a/tests/routers/test_upscale_tasks.py +++ b/tests/routers/test_upscale_tasks.py @@ -1,9 +1,11 @@ import json from unittest.mock import AsyncMock, patch -from fastapi import HTTPException, WebSocketDisconnect +from fastapi import WebSocketDisconnect, status import pytest +from app.error import InternalException + @patch("app.routers.upscale_tasks.create_upscaling_processing_jobs") @patch("app.routers.upscale_tasks.create_upscaling_task") @@ -18,7 +20,7 @@ def test_upscaling_task_create_201( mock_create_upscaling_task.return_value = fake_upscaling_task_summary r = client.post("/upscale_tasks", json=fake_upscaling_task_request.model_dump()) - assert r.status_code == 201 + assert r.status_code == status.HTTP_201_CREATED assert r.json() == fake_upscaling_task_summary.model_dump() assert mock_create_processing_jobs.called_once() @@ -30,29 +32,27 @@ def test_upscaling_task_create_500( fake_upscaling_task_request, ): - mock_create_upscaling_task.side_effect = SystemError( - "Could not launch the upscale task" - ) + mock_create_upscaling_task.side_effect = SystemError("Database connection lost") r = client.post("/upscale_tasks", json=fake_upscaling_task_request.model_dump()) - assert r.status_code == 500 - assert "could not launch the upscale task" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An error occurred while retrieving processing job results." in r.json().get( + "message", "" + ) @patch("app.routers.upscale_tasks.create_upscaling_task") -def test_upscaling_task_create_http_error( +def test_upscaling_task_create_internal_error( mock_create_upscaling_task, client, fake_upscaling_task_request, ): - mock_create_upscaling_task.side_effect = HTTPException( - status_code=503, detail="Oops, service unavailable" - ) + mock_create_upscaling_task.side_effect = InternalException() r = client.post("/upscale_tasks", json=fake_upscaling_task_request.model_dump()) - assert r.status_code == 503 - assert "service unavailable" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An internal server error occurred." in r.json().get("message", "") @patch("app.routers.upscale_tasks.get_upscaling_task_by_user_id") @@ -65,7 +65,7 @@ def test_upscaling_task_get_task_200( mock_get_upscale_task.return_value = fake_upscaling_task r = client.get("/upscale_tasks/1") - assert r.status_code == 200 + assert r.status_code == status.HTTP_200_OK assert json.dumps(r.json(), indent=1) == fake_upscaling_task.model_dump_json( indent=1 ) @@ -77,8 +77,8 @@ def test_upscaling_task_get_task_404(mock_get_upscale_task, client): mock_get_upscale_task.return_value = None r = client.get("/upscale_tasks/1") - assert r.status_code == 404 - assert "upscale task 1 not found" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_404_NOT_FOUND + assert "The requested task was not found." in r.json().get("message", "") @patch("app.routers.upscale_tasks.get_upscaling_task_by_user_id") @@ -87,20 +87,21 @@ def test_upscaling_task_get_task_500(mock_get_upscale_task, client): mock_get_upscale_task.side_effect = SystemError("Database connection lost") r = client.get("/upscale_tasks/1") - assert r.status_code == 500 - assert "database connection lost" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert "An error occurred while retrieving the upscale task" in r.json().get( + "message", "" + ) @patch("app.routers.upscale_tasks.get_upscaling_task_by_user_id") -def test_upscaling_task_get_task_http_error(mock_get_upscale_task, client): +def test_upscaling_task_get_task_internal_error(mock_get_upscale_task, client): - mock_get_upscale_task.side_effect = HTTPException( - status_code=503, detail="Oops, service unavailable" - ) + error = InternalException() + mock_get_upscale_task.side_effect = error r = client.get("/upscale_tasks/1") - assert r.status_code == 503 - assert "service unavailable" in r.json().get("detail", "").lower() + assert r.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR + assert error.message in r.json().get("message", "") @pytest.mark.asyncio @@ -135,6 +136,7 @@ async def test_ws_jobs_status_closes_on_error( websocket.receive_json() websocket.receive_json() websocket.receive_json() + websocket.receive_json() assert exc_info.value.code == 1011