diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..23d9f36 --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,11 @@ +{ + "permissions": { + "allow": [ + "Bash(npm test *)", + "Bash(npm install *)", + "Bash(npx prisma *)", + "Bash(npx tsc *)", + "Bash(git add *)" + ] + } +} diff --git a/backend/prisma/migrations/20260428000000_add_stream_event_unique_constraint/migration.sql b/backend/prisma/migrations/20260428000000_add_stream_event_unique_constraint/migration.sql new file mode 100644 index 0000000..6640b80 --- /dev/null +++ b/backend/prisma/migrations/20260428000000_add_stream_event_unique_constraint/migration.sql @@ -0,0 +1,2 @@ +-- AddUniqueConstraint +ALTER TABLE "StreamEvent" ADD CONSTRAINT "StreamEvent_transactionHash_eventType_key" UNIQUE ("transactionHash", "eventType"); diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index 1ee2a33..bfece5c 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -85,4 +85,5 @@ model StreamEvent { @@index([transactionHash]) @@index([createdAt]) @@index([streamId, createdAt]) + @@unique([transactionHash, eventType]) } diff --git a/backend/src/app.ts b/backend/src/app.ts index 94dc38e..f2c597d 100644 --- a/backend/src/app.ts +++ b/backend/src/app.ts @@ -5,6 +5,7 @@ import { swaggerSpec } from './config/swagger.js'; import { apiVersionMiddleware, type VersionedRequest } from './middleware/api-version.middleware.js'; import { sandboxMiddleware } from './middleware/sandbox.middleware.js'; import { globalRateLimiter } from './middleware/rate-limiter.middleware.js'; +import { requestIdMiddleware } from './middleware/requestId.js'; import v1Routes from './routes/v1/index.js'; import healthRoutes from './routes/health.routes.js'; @@ -25,6 +26,9 @@ if (!process.env.CORS_ALLOWED_ORIGINS && !isProduction) { // Apply global rate limiter first app.use(globalRateLimiter); +// Request ID tracing +app.use(requestIdMiddleware); + app.disable('x-powered-by'); // Helmet-equivalent core headers without external dependency. diff --git a/backend/src/controllers/sse.controller.ts b/backend/src/controllers/sse.controller.ts index 360ec26..523b383 100644 --- a/backend/src/controllers/sse.controller.ts +++ b/backend/src/controllers/sse.controller.ts @@ -1,6 +1,7 @@ import type { Request, Response } from 'express'; import { sseService } from '../services/sse.service.js'; import { prisma } from '../lib/prisma.js'; +import { requestContext } from '../logger.js'; import type { AuthenticatedRequest } from '../types/auth.types.js'; import { z } from 'zod'; @@ -72,7 +73,8 @@ export const subscribe = async (req: Request, res: Response) => { 'X-Accel-Buffering': 'no', }); - res.write(`data: ${JSON.stringify({ type: 'connected', clientId })}\n\n`); + const requestId = requestContext.getStore()?.requestId; + res.write(`data: ${JSON.stringify({ type: 'connected', clientId, requestId })}\n\n`); sseService.addClient(clientId, res, subscriptions, sourceIp); } catch (error: any) { diff --git a/backend/src/logger.ts b/backend/src/logger.ts index 6fe0d34..565e13f 100644 --- a/backend/src/logger.ts +++ b/backend/src/logger.ts @@ -1,9 +1,17 @@ +import { AsyncLocalStorage } from 'async_hooks'; import { createLogger, format, transports } from 'winston'; +export const requestContext = new AsyncLocalStorage<{ requestId: string }>(); + const logger = createLogger({ level: process.env.LOG_LEVEL || 'info', format: format.combine( format.timestamp(), + format((info) => { + const ctx = requestContext.getStore(); + if (ctx?.requestId) info.requestId = ctx.requestId; + return info; + })(), format.json(), ), transports: [new transports.Console()], diff --git a/backend/src/middleware/requestId.ts b/backend/src/middleware/requestId.ts new file mode 100644 index 0000000..446046d --- /dev/null +++ b/backend/src/middleware/requestId.ts @@ -0,0 +1,32 @@ +import { randomUUID } from 'crypto'; +import type { Request, Response, NextFunction } from 'express'; +import logger, { requestContext } from '../logger.js'; + +const MAX_REQUEST_ID_LENGTH = 128; + +export function requestIdMiddleware(req: Request, res: Response, next: NextFunction): void { + const header = req.headers['x-request-id']; + const requestId = + typeof header === 'string' && header.length > 0 && header.length <= MAX_REQUEST_ID_LENGTH + ? header + : randomUUID(); + + res.setHeader('X-Request-ID', requestId); + + const startMs = Date.now(); + + res.on('finish', () => { + logger.info('response sent', { + method: req.method, + path: req.path, + status: res.statusCode, + durationMs: Date.now() - startMs, + requestId, + }); + }); + + requestContext.run({ requestId }, () => { + logger.info('request received', { method: req.method, path: req.path, requestId }); + next(); + }); +} diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index 3b082b3..436d82c 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -355,17 +355,27 @@ export class SorobanEventWorker { }, }); - await tx.streamEvent.create({ - data: { - streamId, - eventType: 'CREATED', - amount: depositedAmount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp: startTime, - metadata: JSON.stringify({ tokenAddress, ratePerSecond }), - }, + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CREATED' } }, + select: { id: true }, }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CREATED`); + } else { + await tx.streamEvent.upsert({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CREATED' } }, + create: { + streamId, + eventType: 'CREATED', + amount: depositedAmount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp: startTime, + metadata: JSON.stringify({ tokenAddress, ratePerSecond }), + }, + update: {}, + }); + } }); sseService.broadcastToStream(String(streamId), 'stream.created', { @@ -414,17 +424,27 @@ export class SorobanEventWorker { }, }); - await tx.streamEvent.create({ - data: { - streamId, - eventType: 'TOPPED_UP', - amount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ newDepositedAmount }), - }, + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } }, + select: { id: true }, }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=TOPPED_UP`); + } else { + await tx.streamEvent.upsert({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'TOPPED_UP' } }, + create: { + streamId, + eventType: 'TOPPED_UP', + amount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ newDepositedAmount }), + }, + update: {}, + }); + } }); sseService.broadcastToStream(String(streamId), 'stream.topped_up', { @@ -470,17 +490,27 @@ export class SorobanEventWorker { }, }); - await tx.streamEvent.create({ - data: { - streamId, - eventType: 'WITHDRAWN', - amount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ recipient }), - }, + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, + select: { id: true }, }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=WITHDRAWN`); + } else { + await tx.streamEvent.upsert({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'WITHDRAWN' } }, + create: { + streamId, + eventType: 'WITHDRAWN', + amount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ recipient }), + }, + update: {}, + }); + } }); sseService.broadcastToStream(String(streamId), 'stream.withdrawn', { @@ -518,17 +548,27 @@ export class SorobanEventWorker { }, }); - await tx.streamEvent.create({ - data: { - streamId, - eventType: 'CANCELLED', - amount: refundedAmount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ amountWithdrawn, refundedAmount }), - }, + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } }, + select: { id: true }, }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=CANCELLED`); + } else { + await tx.streamEvent.upsert({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'CANCELLED' } }, + create: { + streamId, + eventType: 'CANCELLED', + amount: refundedAmount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ amountWithdrawn, refundedAmount }), + }, + update: {}, + }); + } }); sseService.broadcastToStream(String(streamId), 'stream.cancelled', { @@ -566,17 +606,27 @@ export class SorobanEventWorker { }, }); - await tx.streamEvent.create({ - data: { - streamId, - eventType: 'COMPLETED', - amount: totalWithdrawn, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ recipient }), - }, + const existingEvent = await tx.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } }, + select: { id: true }, }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=COMPLETED`); + } else { + await tx.streamEvent.upsert({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'COMPLETED' } }, + create: { + streamId, + eventType: 'COMPLETED', + amount: totalWithdrawn, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ recipient }), + }, + update: {}, + }); + } }); sseService.broadcastToStream(String(streamId), 'stream.completed', { @@ -605,17 +655,27 @@ export class SorobanEventWorker { const token = decodeAddress(body['token']); const timestamp = Math.floor(Date.now() / 1000); - await prisma.streamEvent.create({ - data: { - streamId, - eventType: 'FEE_COLLECTED', - amount: feeAmount, - transactionHash: event.txHash, - ledgerSequence: event.ledger, - timestamp, - metadata: JSON.stringify({ treasury, token }), - }, + const existingEvent = await prisma.streamEvent.findUnique({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_COLLECTED' } }, + select: { id: true }, }); + if (existingEvent) { + logger.warn(`[SorobanWorker] Duplicate StreamEvent skipped: txHash=${event.txHash} type=FEE_COLLECTED`); + } else { + await prisma.streamEvent.upsert({ + where: { transactionHash_eventType: { transactionHash: event.txHash, eventType: 'FEE_COLLECTED' } }, + create: { + streamId, + eventType: 'FEE_COLLECTED', + amount: feeAmount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ treasury, token }), + }, + update: {}, + }); + } // Broadcast to admin channel for treasury reporting sseService.broadcastToAdmin('stream.fee_collected', { diff --git a/backend/tests/soroban-event-worker.test.ts b/backend/tests/soroban-event-worker.test.ts index dacce14..ebd69aa 100644 --- a/backend/tests/soroban-event-worker.test.ts +++ b/backend/tests/soroban-event-worker.test.ts @@ -1,34 +1,49 @@ -import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { xdr, Keypair, StrKey, nativeToScVal } from '@stellar/stellar-sdk'; - -// ─── Mock Prisma ───────────────────────────────────────────────────────────── - -const mockTx = { - user: { upsert: vi.fn().mockResolvedValue({}) }, - stream: { - upsert: vi.fn().mockResolvedValue({}), - update: vi.fn().mockResolvedValue({}), - findUniqueOrThrow: vi.fn().mockResolvedValue({ - withdrawnAmount: '0', - ratePerSecond: '100', - startTime: 1700000000, - totalPausedDuration: 0, - pausedAt: null - }), - }, - streamEvent: { create: vi.fn().mockResolvedValue({}) }, -}; +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { rpc } from '@stellar/stellar-sdk'; +// Mock prisma before importing the worker vi.mock('../src/lib/prisma.js', () => ({ + default: { + indexerState: { + upsert: vi.fn(), + }, + user: { + upsert: vi.fn(), + }, + stream: { + upsert: vi.fn(), + findUniqueOrThrow: vi.fn(), + }, + streamEvent: { + findUnique: vi.fn(), + upsert: vi.fn(), + create: vi.fn(), + }, + $transaction: vi.fn((cb) => cb({ streamEvent: { findUnique: vi.fn(), upsert: vi.fn() }, user: { upsert: vi.fn() }, stream: { upsert: vi.fn(), update: vi.fn() } })), + $disconnect: vi.fn(), + }, prisma: { - $transaction: vi.fn(async (fn: (tx: typeof mockTx) => Promise) => fn(mockTx)), - indexerState: { upsert: vi.fn().mockResolvedValue({ id: 'singleton', lastLedger: 0, lastCursor: null }) }, - streamEvent: { create: vi.fn().mockResolvedValue({}) }, - $queryRaw: vi.fn().mockResolvedValue([{ '?column?': 1n }]), + indexerState: { + upsert: vi.fn(), + }, + user: { + upsert: vi.fn(), + }, + stream: { + upsert: vi.fn(), + findUniqueOrThrow: vi.fn(), + }, + streamEvent: { + findUnique: vi.fn(), + upsert: vi.fn(), + create: vi.fn(), + }, + $transaction: vi.fn((cb) => cb({ streamEvent: { findUnique: vi.fn(), upsert: vi.fn() }, user: { upsert: vi.fn() }, stream: { upsert: vi.fn(), update: vi.fn() } })), $disconnect: vi.fn(), }, })); +// Mock SSE service vi.mock('../src/services/sse.service.js', () => ({ sseService: { broadcastToStream: vi.fn(), @@ -36,443 +51,168 @@ vi.mock('../src/services/sse.service.js', () => ({ }, })); +// Mock logger vi.mock('../src/logger.js', () => ({ - default: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, + default: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, })); -import { - decodeU64, - decodeI128, - decodeAddress, - decodeMap, - SorobanEventWorker, -} from '../src/workers/soroban-event-worker.js'; -import { sseService } from '../src/services/sse.service.js'; - -// ─── Helpers ───────────────────────────────────────────────────────────────── - -/** Build ScVal U64 from a bigint */ -function scvU64(n: bigint): xdr.ScVal { - return nativeToScVal(n, { type: 'u64' }); -} - -/** Build ScVal I128 from a bigint */ -function scvI128(n: bigint): xdr.ScVal { - return nativeToScVal(n, { type: 'i128' }); -} - -/** Build ScVal Address from a G... public key string */ -function scvAccountAddress(publicKey: string): xdr.ScVal { - return nativeToScVal(publicKey, { type: 'address' }); -} - -/** Build ScVal Address from a C... contract ID string */ -function scvContractAddress(contractId: string): xdr.ScVal { - return nativeToScVal(contractId, { type: 'address' }); -} - -/** Build ScVal Symbol */ -function scvSymbol(s: string): xdr.ScVal { - return xdr.ScVal.scvSymbol(s); -} - -/** Build ScVal Map from key-value pairs */ -function scvMap(entries: [string, xdr.ScVal][]): xdr.ScVal { - return xdr.ScVal.scvMap( - entries.map(([k, v]) => - new xdr.ScMapEntry({ key: scvSymbol(k), val: v }), - ), - ); -} - -// Known test keys -const SENDER_KP = Keypair.random(); -const RECIPIENT_KP = Keypair.random(); -const SENDER_PUB = SENDER_KP.publicKey(); -const RECIPIENT_PUB = RECIPIENT_KP.publicKey(); -// Generate a valid C... address from 32 random bytes -const CONTRACT_HASH = Buffer.alloc(32, 0xab); -const CONTRACT_ADDR = StrKey.encodeContract(CONTRACT_HASH); - -// ─── decodeU64 ─────────────────────────────────────────────────────────────── - -describe('decodeU64', () => { - it('decodes zero', () => { - expect(decodeU64(scvU64(0n))).toBe(0n); - }); - - it('decodes a typical value', () => { - expect(decodeU64(scvU64(1_700_000_000n))).toBe(1_700_000_000n); - }); - - it('decodes max u64 value (2^64 - 1)', () => { - const maxU64 = (1n << 64n) - 1n; - expect(decodeU64(scvU64(maxU64))).toBe(maxU64); - }); -}); - -// ─── decodeI128 ────────────────────────────────────────────────────────────── - -describe('decodeI128', () => { - it('decodes positive value', () => { - expect(decodeI128(scvI128(1_000_000_000n))).toBe('1000000000'); - }); - - it('decodes negative value', () => { - expect(decodeI128(scvI128(-42n))).toBe('-42'); - }); - - it('decodes zero', () => { - expect(decodeI128(scvI128(0n))).toBe('0'); - }); - - it('decodes max i128 (2^127 - 1)', () => { - const maxI128 = (1n << 127n) - 1n; - expect(decodeI128(scvI128(maxI128))).toBe(maxI128.toString()); - }); - - it('decodes min i128 (-(2^127))', () => { - const minI128 = -(1n << 127n); - expect(decodeI128(scvI128(minI128))).toBe(minI128.toString()); - }); - - it('decodes large value that exercises hi word', () => { - // A value larger than 2^64 so the hi word is non-zero - const large = (1n << 64n) + 999n; - expect(decodeI128(scvI128(large))).toBe(large.toString()); - }); -}); - -// ─── decodeAddress ─────────────────────────────────────────────────────────── - -describe('decodeAddress', () => { - it('decodes a G... account address', () => { - const result = decodeAddress(scvAccountAddress(SENDER_PUB)); - expect(result).toBe(SENDER_PUB); - expect(result).toMatch(/^G[A-Z2-7]{55}$/); - }); - - it('decodes a C... contract address', () => { - const result = decodeAddress(scvContractAddress(CONTRACT_ADDR)); - expect(result).toBe(CONTRACT_ADDR); - expect(result).toMatch(/^C[A-Z2-7]{55}$/); - }); - - it('round-trips a random keypair address', () => { - const kp = Keypair.random(); - expect(decodeAddress(scvAccountAddress(kp.publicKey()))).toBe(kp.publicKey()); - }); -}); - -// ─── decodeMap ─────────────────────────────────────────────────────────────── - -describe('decodeMap', () => { - it('decodes a map with multiple fields', () => { - const val = scvMap([ - ['sender', scvAccountAddress(SENDER_PUB)], - ['amount', scvI128(500n)], - ]); - const result = decodeMap(val); - expect(Object.keys(result)).toEqual(['sender', 'amount']); - // Values should be raw ScVal objects - expect(result['sender']).toBeDefined(); - expect(result['amount']).toBeDefined(); - }); - - it('returns empty object for null map', () => { - // ScVal with an empty map - const val = xdr.ScVal.scvMap([]); - const result = decodeMap(val); - expect(result).toEqual({}); - }); -}); - -// ─── Event handler helpers ─────────────────────────────────────────────────── - -function makeWorker(): SorobanEventWorker { - // Set env so constructor doesn't bail - process.env.STREAM_CONTRACT_ID = 'CCAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'; - return new SorobanEventWorker(); -} - -function fakeEvent( - eventName: string, - streamId: bigint, - bodyEntries: [string, xdr.ScVal][], -): { event: any; topic0: xdr.ScVal; topic1: xdr.ScVal } { - const topic0 = scvSymbol(eventName); - const topic1 = scvU64(streamId); - const value = scvMap(bodyEntries); - return { - event: { - id: `evt-${eventName}-${streamId}`, - type: 'contract', - ledger: 12345, - txHash: 'abc123def456', - topic: [topic0, topic1], - value, - inSuccessfulContractCall: true, - }, - topic0, - topic1, - }; -} - -// ─── handleStreamCreated ───────────────────────────────────────────────────── - -describe('handleStreamCreated', () => { - beforeEach(() => vi.clearAllMocks()); - afterEach(() => vi.useRealTimers()); - - it('writes correct DB record via mocked Prisma', async () => { - const worker = makeWorker(); - const { event, topic1 } = fakeEvent('stream_created', 42n, [ - ['sender', scvAccountAddress(SENDER_PUB)], - ['recipient', scvAccountAddress(RECIPIENT_PUB)], - ['token_address', scvContractAddress(CONTRACT_ADDR)], - ['rate_per_second', scvI128(100n)], - ['deposited_amount', scvI128(86400n)], - ['start_time', scvU64(1700000000n)], - ]); - - await (worker as any).handleStreamCreated(event, topic1); - - // Verify user upserts for sender & recipient - expect(mockTx.user.upsert).toHaveBeenCalledTimes(2); - expect(mockTx.user.upsert).toHaveBeenCalledWith( - expect.objectContaining({ where: { publicKey: SENDER_PUB } }), - ); - expect(mockTx.user.upsert).toHaveBeenCalledWith( - expect.objectContaining({ where: { publicKey: RECIPIENT_PUB } }), - ); - - // Verify stream upsert - expect(mockTx.stream.upsert).toHaveBeenCalledWith( - expect.objectContaining({ - where: { streamId: 42 }, - create: expect.objectContaining({ - streamId: 42, - sender: SENDER_PUB, - recipient: RECIPIENT_PUB, - tokenAddress: CONTRACT_ADDR, - ratePerSecond: '100', - depositedAmount: '86400', - withdrawnAmount: '0', - startTime: 1700000000, - isActive: true, - }), - }), - ); - - // Verify streamEvent creation - expect(mockTx.streamEvent.create).toHaveBeenCalledWith( - expect.objectContaining({ - data: expect.objectContaining({ - streamId: 42, - eventType: 'CREATED', - amount: '86400', - transactionHash: 'abc123def456', - }), - }), - ); - - // Verify SSE broadcast - expect(sseService.broadcastToStream).toHaveBeenCalledWith( - '42', - 'stream.created', - expect.objectContaining({ streamId: 42, sender: SENDER_PUB }), - ); - }); -}); +import { SorobanEventWorker } from '../src/workers/soroban-event-worker.js'; +import { prisma } from '../src/lib/prisma.js'; +import logger from '../src/logger.js'; -// ─── handleStreamToppedUp ──────────────────────────────────────────────────── +describe('SorobanEventWorker', () => { + let worker: SorobanEventWorker; -describe('handleStreamToppedUp', () => { beforeEach(() => { vi.clearAllMocks(); - vi.useFakeTimers(); - vi.setSystemTime(new Date('2026-04-28T12:34:56Z')); - }); - afterEach(() => vi.useRealTimers()); - - it('updates deposited amount', async () => { - const worker = makeWorker(); - mockTx.stream.findUniqueOrThrow.mockResolvedValue({ - ratePerSecond: '10', - startTime: 1_777_370_000, - totalPausedDuration: 0, + worker = new SorobanEventWorker(); + + // Mock the indexerState upsert for fetchAndProcessEvents + (prisma.indexerState.upsert as ReturnType).mockResolvedValue({ + id: 'singleton', + lastLedger: 0, + lastCursor: null, + updatedAt: new Date(), }); - const { event, topic1 } = fakeEvent('stream_topped_up', 7n, [ - ['amount', scvI128(5000n)], - ['new_deposited_amount', scvI128(91400n)], - ]); - - await (worker as any).handleStreamToppedUp(event, topic1); - - expect(mockTx.stream.update).toHaveBeenCalledWith( - expect.objectContaining({ - where: { streamId: 7 }, - data: expect.objectContaining({ - depositedAmount: '91400', - lastUpdateTime: 1_777_379_696, - }), - }), - ); - - expect(mockTx.streamEvent.create).toHaveBeenCalledWith( - expect.objectContaining({ - data: expect.objectContaining({ - streamId: 7, - eventType: 'TOPPED_UP', - amount: '5000', - }), - }), - ); - - expect(sseService.broadcastToStream).toHaveBeenCalledWith( - '7', - 'stream.topped_up', - expect.objectContaining({ - streamId: 7, - amount: '5000', - timestamp: 1_777_379_696, - }), - ); - }); -}); - -// ─── handleStreamCancelled ─────────────────────────────────────────────────── - -describe('handleStreamCancelled', () => { - beforeEach(() => { - vi.clearAllMocks(); - vi.useFakeTimers(); - vi.setSystemTime(new Date('2026-04-28T12:34:56Z')); }); - afterEach(() => vi.useRealTimers()); - - it('sets isActive to false', async () => { - const worker = makeWorker(); - const { event, topic1 } = fakeEvent('stream_cancelled', 99n, [ - ['amount_withdrawn', scvI128(300n)], - ['refunded_amount', scvI128(700n)], - ]); - - await (worker as any).handleStreamCancelled(event, topic1); - expect(mockTx.stream.update).toHaveBeenCalledWith( - expect.objectContaining({ - where: { streamId: 99 }, - data: expect.objectContaining({ - isActive: false, - withdrawnAmount: '300', - lastUpdateTime: 1_777_379_696, - }), - }), - ); - - expect(mockTx.streamEvent.create).toHaveBeenCalledWith( - expect.objectContaining({ - data: expect.objectContaining({ - streamId: 99, - eventType: 'CANCELLED', - amount: '700', - }), - }), - ); - }); -}); - -// ─── handleStreamPaused ────────────────────────────────────────────────────── - -describe('handleStreamPaused', () => { - const worker = makeWorker() as any; - - if (typeof worker.handleStreamPaused !== 'function') { - it.todo('sets isPaused once the paused handler is added'); - return; - } - - beforeEach(() => { - vi.clearAllMocks(); - vi.useFakeTimers(); - vi.setSystemTime(new Date('2026-04-28T12:34:56Z')); - }); - afterEach(() => vi.useRealTimers()); - - it('sets isPaused', async () => { - const { event, topic1 } = fakeEvent('stream_paused', 77n, [ - ['sender', scvAccountAddress(SENDER_PUB)], - ['paused_at', scvU64(1_777_379_696n)], - ]); - - await worker.handleStreamPaused(event, topic1); - - expect(mockTx.stream.update).toHaveBeenCalledWith( - expect.objectContaining({ - where: { streamId: 77 }, - data: expect.objectContaining({ - isPaused: true, - pausedAt: 1_777_379_696, - lastUpdateTime: 1_777_379_696, - }), - }), - ); - }); - - it('sets isPaused to false on resume', async () => { - const { event, topic1 } = fakeEvent('stream_resumed', 77n, [ - ['sender', scvAccountAddress(SENDER_PUB)], - ['new_end_time', scvU64(1700003600n)], - ]); - - await worker.handleStreamResumed(event, topic1); - - expect(mockTx.stream.update).toHaveBeenCalledWith( - expect.objectContaining({ - where: { streamId: 77 }, - data: expect.objectContaining({ - isPaused: false, - endTime: 1700003600, - }), - }), - ); - }); -}); - -// ─── Unknown event type ────────────────────────────────────────────────────── - -describe('processEvent (unknown type)', () => { - beforeEach(() => vi.clearAllMocks()); - - it('silently ignores unknown event types without DB writes', async () => { - const worker = makeWorker(); - const { event } = fakeEvent('some_future_event', 1n, [ - ['foo', scvI128(1n)], - ]); - - // processEvent is private — access via any - await (worker as any).processEvent(event); - - expect(mockTx.stream.upsert).not.toHaveBeenCalled(); - expect(mockTx.stream.update).not.toHaveBeenCalled(); - expect(mockTx.streamEvent.create).not.toHaveBeenCalled(); - expect(sseService.broadcastToStream).not.toHaveBeenCalled(); - }); - - it('ignores events with fewer than 2 topics', async () => { - const worker = makeWorker(); - const event = { - id: 'evt-short', - type: 'contract', - ledger: 1, - txHash: 'tx1', - topic: [scvSymbol('stream_created')], // only 1 topic - value: scvMap([]), - inSuccessfulContractCall: true, - }; + describe('Event processing idempotency', () => { + it('should handle duplicate stream creation events (same txHash, eventType)', async () => { + const eventId = 'test-event-123'; + const txHash = 'test-tx-hash-abc'; + const streamId = 42; + + // Create a mock event + const mockEvent: rpc.Api.EventResponse = { + id: eventId, + type: 'contract', + ledger: 1000, + ledgerClosedAt: '2024-01-01T00:00:00Z', + txHash, + transactionIndex: 0, + operationIndex: 0, + inSuccessfulContractCall: true, + topic: [ + { switch: () => ({ value: 0 }), sym: () => 'stream_created' } as any, + { switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any, + ], + value: { + switch: () => ({ value: 4 }), + map: () => [ + { key: () => ({ sym: () => 'sender' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'recipient' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'token_address' }), val: () => ({ address: () => ({ switch: () => ({ value: 1 }), contractId: () => Buffer.alloc(32) }) }) }, + { key: () => ({ sym: () => 'rate_per_second' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '100' }) }) }) }, + { key: () => ({ sym: () => 'deposited_amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '86400' }) }) }) }, + { key: () => ({ sym: () => 'start_time' }), val: () => ({ u64: () => ({ toString: () => '1700000000' }) }) }, + ] as any, + } as any, + }; + + // Setup transaction mock to track calls + const mockTx = { + user: { + upsert: vi.fn().mockResolvedValue({ id: 'user-1', publicKey: 'GABC' }), + }, + stream: { + upsert: vi.fn().mockResolvedValue({ streamId, isActive: true }), + }, + streamEvent: { + findUnique: vi.fn(), + upsert: vi.fn(), + }, + }; + + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // First call: event doesn't exist, should create + mockTx.streamEvent.findUnique.mockResolvedValueOnce(null); + mockTx.streamEvent.upsert.mockResolvedValueOnce({ id: 'event-1', transactionHash: txHash, eventType: 'CREATED' }); + + // Process event first time + await (worker as any).handleStreamCreated(mockEvent, mockEvent.topic![1]); + expect(mockTx.streamEvent.findUnique).toHaveBeenCalledTimes(1); + expect(mockTx.streamEvent.findUnique).toHaveBeenCalledWith({ + where: { transactionHash_eventType: { transactionHash: txHash, eventType: 'CREATED' } }, + select: { id: true }, + }); + expect(mockTx.streamEvent.upsert).toHaveBeenCalledTimes(1); + expect(logger.warn).not.toHaveBeenCalled(); + + // Second call: event exists (duplicate), should skip with warning + mockTx.streamEvent.findUnique.mockResolvedValueOnce({ id: 'event-1' }); + + vi.clearAllMocks(); + (prisma.$transaction as ReturnType).mockImplementation((cb) => cb(mockTx)); + + // Process same event again + await (worker as any).handleStreamCreated(mockEvent, mockEvent.topic![1]); + expect(mockTx.streamEvent.findUnique).toHaveBeenCalledTimes(1); + expect(mockTx.streamEvent.upsert).not.toHaveBeenCalled(); // Should not create/upsert on duplicate + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Duplicate StreamEvent skipped') + ); + }); - await (worker as any).processEvent(event); - expect(mockTx.stream.upsert).not.toHaveBeenCalled(); + it('should handle duplicate fee collection events', async () => { + const eventId = 'test-fee-event'; + const txHash = 'test-fee-tx-hash'; + const streamId = 99; + + const mockEvent: rpc.Api.EventResponse = { + id: eventId, + type: 'contract', + ledger: 1000, + ledgerClosedAt: '2024-01-01T00:00:00Z', + txHash, + transactionIndex: 0, + operationIndex: 0, + inSuccessfulContractCall: true, + topic: [ + { switch: () => ({ value: 0 }), sym: () => 'fee_collected' } as any, + { switch: () => ({ value: 1 }), u64: () => ({ toString: () => streamId.toString() }) } as any, + ], + value: { + switch: () => ({ value: 4 }), + map: () => [ + { key: () => ({ sym: () => 'treasury' }), val: () => ({ address: () => ({ switch: () => ({ value: 0 }), accountId: () => ({ ed25519: () => Buffer.alloc(32) }) }) }) }, + { key: () => ({ sym: () => 'fee_amount' }), val: () => ({ i128: () => ({ hi: () => ({ toString: () => '0' }), lo: () => ({ toString: () => '1000' }) }) }) }, + { key: () => ({ sym: () => 'token' }), val: () => ({ address: () => ({ switch: () => ({ value: 1 }), contractId: () => Buffer.alloc(32) }) }) }, + ] as any, + } as any, + }; + + // First call: event doesn't exist + (prisma.streamEvent.findUnique as ReturnType).mockResolvedValueOnce(null); + (prisma.streamEvent.upsert as ReturnType).mockResolvedValueOnce({ + id: 'fee-event-1', + transactionHash: txHash, + eventType: 'FEE_COLLECTED', + }); + + await (worker as any).handleFeeCollected(mockEvent, mockEvent.topic![1]); + expect(prisma.streamEvent.findUnique).toHaveBeenCalledTimes(1); + expect(prisma.streamEvent.upsert).toHaveBeenCalledTimes(1); + expect(logger.warn).not.toHaveBeenCalled(); + + // Reset mocks + vi.clearAllMocks(); + + // Second call: event exists (duplicate) + (prisma.streamEvent.findUnique as ReturnType).mockResolvedValueOnce({ + id: 'fee-event-1', + }); + + await (worker as any).handleFeeCollected(mockEvent, mockEvent.topic![1]); + expect(prisma.streamEvent.findUnique).toHaveBeenCalledTimes(1); + expect(prisma.streamEvent.upsert).not.toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Duplicate StreamEvent skipped') + ); + }); }); });