From 21f14546fc88370d8e0c1d5959af0281ec14e31f Mon Sep 17 00:00:00 2001 From: Phillip Ho Date: Fri, 2 May 2025 23:11:03 +0800 Subject: [PATCH 1/2] chore: allow querying status by query param --- src/server/routes/index.ts | 8 +++- src/server/routes/transaction/status.ts | 49 ++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index 4a74394e1..1150458dc 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -104,7 +104,10 @@ import { getAllTransactions } from "./transaction/get-all"; import { getAllDeployedContracts } from "./transaction/get-all-deployed-contracts"; import { retryTransaction } from "./transaction/retry"; import { retryFailedTransactionRoute } from "./transaction/retry-failed"; -import { checkTxStatus } from "./transaction/status"; +import { + getTransactionStatusQueryParamRoute, + getTransactionStatusRoute, +} from "./transaction/status"; import { syncRetryTransactionRoute } from "./transaction/sync-retry"; import { createWebhookRoute } from "./webhooks/create"; import { getWebhooksEventTypes } from "./webhooks/events"; @@ -239,7 +242,8 @@ export async function withRoutes(fastify: FastifyInstance) { // Transactions await fastify.register(getAllTransactions); - await fastify.register(checkTxStatus); + await fastify.register(getTransactionStatusRoute); + await fastify.register(getTransactionStatusQueryParamRoute); await fastify.register(getAllDeployedContracts); await fastify.register(retryTransaction); await fastify.register(syncRetryTransactionRoute); diff --git a/src/server/routes/transaction/status.ts b/src/server/routes/transaction/status.ts index 60dcb18ba..f02b85b4e 100644 --- a/src/server/routes/transaction/status.ts +++ b/src/server/routes/transaction/status.ts @@ -62,7 +62,7 @@ responseBodySchema.example = { }, }; -export async function checkTxStatus(fastify: FastifyInstance) { +export async function getTransactionStatusRoute(fastify: FastifyInstance) { fastify.route<{ Params: Static; Reply: Static; @@ -135,3 +135,50 @@ export async function checkTxStatus(fastify: FastifyInstance) { }, }); } + +// An alterate route that accepts the queueId as a query param. +export async function getTransactionStatusQueryParamRoute( + fastify: FastifyInstance, +) { + fastify.route<{ + Querystring: Static; + Reply: Static; + }>({ + method: "GET", + url: "/transaction/status", + schema: { + summary: "Get transaction status", + description: "Get the status for a transaction request.", + tags: ["Transaction"], + operationId: "status", + querystring: requestSchema, + response: { + ...standardResponseSchema, + [StatusCodes.OK]: responseBodySchema, + }, + }, + handler: async (request, reply) => { + const { queueId } = request.query; + if (!queueId) { + throw createCustomError( + "Queue ID is required.", + StatusCodes.BAD_REQUEST, + "QUEUE_ID_REQUIRED", + ); + } + + const transaction = await TransactionDB.get(queueId); + if (!transaction) { + throw createCustomError( + "Transaction not found.", + StatusCodes.BAD_REQUEST, + "TRANSACTION_NOT_FOUND", + ); + } + + reply.status(StatusCodes.OK).send({ + result: toTransactionSchema(transaction), + }); + }, + }); +} From a0832d08a160a26d3e3ffc8d812336379489042b Mon Sep 17 00:00:00 2001 From: Phillip Ho Date: Fri, 2 May 2025 23:24:09 +0800 Subject: [PATCH 2/2] feat: allow querying transaction status via query param --- src/server/index.ts | 2 - src/server/routes/transaction/status.ts | 47 ------- src/server/utils/websocket.ts | 169 ------------------------ 3 files changed, 218 deletions(-) delete mode 100644 src/server/utils/websocket.ts diff --git a/src/server/index.ts b/src/server/index.ts index 80ec10994..74509ddaa 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -18,7 +18,6 @@ import { withOpenApi } from "./middleware/open-api"; import { withPrometheus } from "./middleware/prometheus"; import { withRateLimit } from "./middleware/rate-limit"; import { withSecurityHeaders } from "./middleware/security-headers"; -import { withWebSocket } from "./middleware/websocket"; import { withRoutes } from "./routes"; import { writeOpenApiToFile } from "./utils/openapi"; @@ -81,7 +80,6 @@ export const initServer = async () => { withPrometheus(server); // Register routes - await withWebSocket(server); await withAuth(server); await withOpenApi(server); await withRoutes(server); diff --git a/src/server/routes/transaction/status.ts b/src/server/routes/transaction/status.ts index f02b85b4e..d18fa0333 100644 --- a/src/server/routes/transaction/status.ts +++ b/src/server/routes/transaction/status.ts @@ -1,23 +1,13 @@ -import type { SocketStream } from "@fastify/websocket"; import { type Static, Type } from "@sinclair/typebox"; import type { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { TransactionDB } from "../../../shared/db/transactions/db"; -import { logger } from "../../../shared/utils/logger"; import { createCustomError } from "../../middleware/error"; import { standardResponseSchema } from "../../schemas/shared-api-schemas"; import { TransactionSchema, toTransactionSchema, } from "../../schemas/transaction"; -import { - findOrAddWSConnectionInSharedState, - formatSocketMessage, - getStatusMessageAndConnectionStatus, - onClose, - onError, - onMessage, -} from "../../utils/websocket"; // INPUT const requestSchema = Type.Object({ @@ -96,43 +86,6 @@ export async function getTransactionStatusRoute(fastify: FastifyInstance) { result: toTransactionSchema(transaction), }); }, - wsHandler: async (connection: SocketStream, request) => { - const { queueId } = request.params; - - findOrAddWSConnectionInSharedState(connection, queueId, request); - - const transaction = await TransactionDB.get(queueId); - const returnData = transaction ? toTransactionSchema(transaction) : null; - - const { message, closeConnection } = - await getStatusMessageAndConnectionStatus(returnData); - - connection.socket.send(await formatSocketMessage(returnData, message)); - - if (closeConnection) { - connection.socket.close(); - return; - } - - connection.socket.on("error", (error) => { - logger({ - service: "websocket", - level: "error", - message: "Websocket error", - error, - }); - - onError(error, connection, request); - }); - - connection.socket.on("message", async (_message, _isBinary) => { - onMessage(connection, request); - }); - - connection.socket.on("close", () => { - onClose(connection, request); - }); - }, }); } diff --git a/src/server/utils/websocket.ts b/src/server/utils/websocket.ts deleted file mode 100644 index e01f00715..000000000 --- a/src/server/utils/websocket.ts +++ /dev/null @@ -1,169 +0,0 @@ -import type { SocketStream } from "@fastify/websocket"; -import type { Static } from "@sinclair/typebox"; -import type { FastifyRequest } from "fastify"; -import { logger } from "../../shared/utils/logger"; -import type { TransactionSchema } from "../schemas/transaction"; -import { type UserSubscription, subscriptionsData } from "../schemas/websocket"; - -// websocket timeout, i.e., ws connection closed after 10 seconds -const timeoutDuration = 10 * 60 * 1000; - -export const findWSConnectionInSharedState = async ( - connection: SocketStream, - _request: FastifyRequest, -): Promise => { - const index = subscriptionsData.findIndex( - (sub) => sub.socket === connection.socket, - ); - return index; -}; - -export const removeWSFromSharedState = async ( - connection: SocketStream, - request: FastifyRequest, -): Promise => { - const index = await findWSConnectionInSharedState(connection, request); - if (index === -1) { - return -1; - } - subscriptionsData.splice(index, 1); - return index; -}; - -export const onError = async ( - error: Error, - connection: SocketStream, - request: FastifyRequest, -): Promise => { - logger({ - service: "server", - level: "error", - message: "Websocket error", - error, - }); - - const index = await findWSConnectionInSharedState(connection, request); - if (index === -1) { - return; - } - - const userSubscription = subscriptionsData[index]; - subscriptionsData.splice(index, 1); - userSubscription.socket.send( - JSON.stringify({ - result: null, - requestId: userSubscription.requestId, - status: "error", - message: error.message, - }), - ); - - connection.socket.close(); -}; - -export const onMessage = async ( - connection: SocketStream, - request: FastifyRequest, -): Promise => { - const index = await findWSConnectionInSharedState(connection, request); - const userSubscription = subscriptionsData[index]; - subscriptionsData.splice(index, 1); - userSubscription.socket.send( - JSON.stringify({ - result: null, - requestId: userSubscription.requestId, - status: "error", - message: "Do not send any message. Closing Socket... Reconnect again.", - }), - ); - userSubscription.socket.close(); -}; - -export const onClose = async ( - connection: SocketStream, - request: FastifyRequest, -): Promise => { - const index = await findWSConnectionInSharedState(connection, request); - if (index === -1) { - return; - } - subscriptionsData.splice(index, 1); -}; - -export const wsTimeout = async ( - connection: SocketStream, - queueId: string, - request: FastifyRequest, -): Promise => { - return setTimeout(() => { - connection.socket.send("Timeout exceeded. Closing connection..."); - removeWSFromSharedState(connection, request); - connection.socket.close(1000, "Session timeout"); // 1000 is a normal closure status code - - logger({ - service: "server", - level: "info", - message: `Websocket connection for ${queueId} closed due to timeout.`, - }); - }, timeoutDuration); -}; - -export const findOrAddWSConnectionInSharedState = async ( - connection: SocketStream, - queueId: string, - request: FastifyRequest, -) => { - let userSubscription: UserSubscription | undefined = undefined; - const index = await findWSConnectionInSharedState(connection, request); - if (index > -1) { - userSubscription = subscriptionsData[index]; - } else { - userSubscription = { - socket: connection.socket, - requestId: queueId, - }; - - subscriptionsData.push(userSubscription); - } -}; - -type CustomStatusAndConnectionType = { - message: string; - closeConnection: boolean; -}; - -export const getStatusMessageAndConnectionStatus = async ( - data: Static | null, -): Promise => { - let message = - "Request is queued. Waiting for transaction to be picked up by worker."; - let closeConnection = false; - - if (!data) { - message = "Transaction not found. Make sure the provided ID is correct."; - closeConnection = true; - } else if (data.status === "mined") { - message = "Transaction mined. Closing connection."; - closeConnection = true; - } else if (data.status === "errored") { - message = data.errorMessage || "Transaction errored. Closing connection."; - closeConnection = true; - } else if (data.status === "sent") { - message = - "Transaction submitted to blockchain. Waiting for transaction to be mined..."; - } - - return { message, closeConnection }; -}; - -export const formatSocketMessage = async ( - data: Static | null, - message: string, -): Promise => { - const returnData = JSON.stringify({ - result: data ? JSON.stringify(data) : undefined, - queueId: data?.queueId, - message, - }); - return returnData; -};