Skip to content

Commit 087636c

Browse files
committed
refactoring MinaTransactionSender.
- polling the chain and updating persistant storage is now handled by L1TransactionDispatcher - TxStatusWaiter is used for waiting on a txn to get sent, included.
1 parent 73f6a85 commit 087636c

10 files changed

Lines changed: 692 additions & 515 deletions

packages/sequencer/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ export * from "./settlement/permissions/SignedSettlementPermissions";
110110
export * from "./settlement/tasks/SettlementProvingTask";
111111
export * from "./settlement/transactions/L1TransactionRetryStrategy";
112112
export * from "./settlement/transactions/DefaultL1TransactionRetryStrategy";
113+
export * from "./settlement/transactions/L1TransactionDispatcher";
114+
export * from "./settlement/transactions/TxStatusWaiter";
113115
export * from "./settlement/transactions/MinaTransactionSender";
114116
export * from "./settlement/transactions/MinaTransactionSimulator";
115117
export * from "./settlement/transactions/MinaSimulationService";

packages/sequencer/src/settlement/transactions/DefaultL1TransactionRetryStrategy.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { noop, sleep } from "@proto-kit/common";
1+
import { noop } from "@proto-kit/common";
22
import { Transaction, UInt64 } from "o1js";
33
import { inject } from "tsyringe";
44

@@ -57,21 +57,17 @@ export class DefaultL1TransactionRetryStrategy
5757
return record.attempts < this.retryConfig.maxAttempts;
5858
}
5959

60+
public getRetryDelayMs(): number {
61+
return this.retryConfig.retryDelayMs;
62+
}
63+
6064
public async prepareRetryTransaction(
6165
record: PendingL1TransactionRecord
6266
): Promise<Transaction<any, false>> {
6367
const tx = record.transaction;
6468
const currentFee = tx.transaction.feePayer.body.fee;
6569
const newFee = UInt64.from(this.bumpFee(Number(currentFee.toBigInt())));
6670
await tx.setFee(newFee);
67-
// Delay if needed
68-
await sleep(
69-
Math.max(
70-
0,
71-
this.retryConfig.retryDelayMs -
72-
(Date.now() - (record.sentAt?.getTime() ?? 0))
73-
)
74-
);
7571
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
7672
return tx as Transaction<any, false>;
7773
}
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
import { Transaction } from "o1js";
2+
import { inject, injectable } from "tsyringe";
3+
import { log } from "@proto-kit/common";
4+
5+
import {
6+
PendingL1TransactionRecord,
7+
PendingL1TransactionStorage,
8+
} from "../../storage/repositories/PendingL1TransactionStorage";
9+
import { MinaSigner } from "../MinaSigner";
10+
11+
import { L1TransactionRetryStrategy } from "./L1TransactionRetryStrategy";
12+
import { TxStatusWaiter } from "./TxStatusWaiter";
13+
import { checkZkappTransactionStatus } from "./ZkappTransactionStatus";
14+
15+
export interface DispatcherConfig {
16+
pollIntervalMs?: number;
17+
statusCheckIntervalMs?: number;
18+
inclusionTimeoutMs?: number;
19+
}
20+
21+
const DEFAULT_CONFIG: Required<DispatcherConfig> = {
22+
pollIntervalMs: 5000,
23+
statusCheckIntervalMs: 5000,
24+
inclusionTimeoutMs: 10 * 60 * 1000,
25+
};
26+
27+
@injectable()
28+
export class L1TransactionDispatcher {
29+
private pollingTimeout?: NodeJS.Timeout;
30+
31+
/**
32+
* Serialize all dispatcher work (polling ticks and sender-level wakeups) to avoid
33+
* concurrent send attempts of the same queued transaction.
34+
*/
35+
private workInFlight?: Promise<void>;
36+
37+
public constructor(
38+
@inject("PendingL1TransactionStorage")
39+
private readonly pendingStorage: PendingL1TransactionStorage,
40+
@inject("L1TransactionRetryStrategy")
41+
private readonly retryStrategy: L1TransactionRetryStrategy,
42+
@inject("SettlementSigner") private readonly signer: MinaSigner,
43+
private readonly waiter: TxStatusWaiter,
44+
private readonly dispatcherConfig: DispatcherConfig = {}
45+
) {}
46+
47+
private get config(): Required<DispatcherConfig> {
48+
return {
49+
pollIntervalMs:
50+
this.dispatcherConfig.pollIntervalMs ?? DEFAULT_CONFIG.pollIntervalMs,
51+
statusCheckIntervalMs:
52+
this.dispatcherConfig.statusCheckIntervalMs ??
53+
DEFAULT_CONFIG.statusCheckIntervalMs,
54+
inclusionTimeoutMs:
55+
this.dispatcherConfig.inclusionTimeoutMs ??
56+
DEFAULT_CONFIG.inclusionTimeoutMs,
57+
};
58+
}
59+
60+
public start() {
61+
this.startPolling();
62+
}
63+
64+
public async stop(): Promise<void> {
65+
if (this.pollingTimeout !== undefined) {
66+
clearTimeout(this.pollingTimeout);
67+
this.pollingTimeout = undefined;
68+
}
69+
}
70+
71+
public requestDispatch(sender: string) {
72+
void this.enqueueWork(async () => {
73+
const txs = await this.pendingStorage.findByStatuses(["queued", "sent"]);
74+
await this.processSender(sender, txs);
75+
});
76+
}
77+
78+
private startPolling() {
79+
const poll = async () => {
80+
try {
81+
await this.enqueueWork(async () => {
82+
await this.tick();
83+
});
84+
} catch (e) {
85+
log.error("Error in L1TransactionDispatcher polling loop", e);
86+
} finally {
87+
this.pollingTimeout = setTimeout(poll, this.config.pollIntervalMs);
88+
this.pollingTimeout.unref?.();
89+
}
90+
};
91+
this.pollingTimeout = setTimeout(poll, this.config.pollIntervalMs);
92+
this.pollingTimeout.unref?.();
93+
}
94+
95+
private async enqueueWork(work: () => Promise<void>): Promise<void> {
96+
const previous = this.workInFlight ?? Promise.resolve();
97+
const next = previous.then(work, work);
98+
this.workInFlight = next.finally(() => {
99+
if (this.workInFlight === next) {
100+
this.workInFlight = undefined;
101+
}
102+
});
103+
return await this.workInFlight;
104+
}
105+
106+
private async tick(): Promise<void> {
107+
const now = new Date();
108+
const pendingTransactions = (
109+
await this.pendingStorage.findByStatuses(["queued", "sent"])
110+
).filter((r) => (r.nextActionAt ?? now) <= now);
111+
112+
const bySender: Record<string, PendingL1TransactionRecord[]> = {};
113+
for (const tx of pendingTransactions) {
114+
(bySender[tx.sender] ??= []).push(tx);
115+
}
116+
117+
for (const sender of Object.keys(bySender)) {
118+
// eslint-disable-next-line no-await-in-loop
119+
await this.processSender(sender, bySender[sender]);
120+
}
121+
}
122+
123+
private async processSender(
124+
sender: string,
125+
txsInput: PendingL1TransactionRecord[]
126+
): Promise<void> {
127+
// Sort in ascending order of nonce
128+
const txsSorted = txsInput
129+
.sort((a, b) => a.nonce - b.nonce)
130+
.filter((r) => r.sender === sender);
131+
132+
// Send queued txs in ascending nonce order. Then check sent txs.
133+
for (const record of txsSorted) {
134+
if (record.status === "queued") {
135+
// eslint-disable-next-line no-await-in-loop
136+
await this.sendQueuedTransaction(record.id);
137+
} else if (record.status === "sent") {
138+
// eslint-disable-next-line no-await-in-loop
139+
await this.checkSentTransaction(record.id);
140+
}
141+
}
142+
}
143+
144+
private async sendQueuedTransaction(txId: string): Promise<void> {
145+
const record = await this.pendingStorage.findById(txId);
146+
if (!record) return;
147+
if (record.status !== "queued") return;
148+
149+
await this.sendTransaction(record);
150+
}
151+
152+
private async sendTransaction(
153+
record: Omit<
154+
PendingL1TransactionRecord,
155+
"status" | "sentAt" | "lastError" | "nextActionAt" | "hash"
156+
>
157+
): Promise<void> {
158+
const tx = record.transaction;
159+
try {
160+
const pendingTx = await tx.send();
161+
const now = new Date();
162+
163+
await this.pendingStorage.update(record.id, {
164+
status: "sent",
165+
attempts: record.attempts + 1,
166+
sentAt: now,
167+
transaction: tx,
168+
hash: pendingTx.hash,
169+
lastError: undefined,
170+
nextActionAt: new Date(
171+
now.getTime() + this.config.statusCheckIntervalMs
172+
),
173+
});
174+
log.info(
175+
`Sent L1 transaction ${pendingTx.hash} for nonce ${record.nonce} (Attempt ${record.attempts + 1})`
176+
);
177+
this.waiter.notifySent(record.id, pendingTx.hash);
178+
} catch (error) {
179+
log.error(
180+
`Failed to send transaction ${record.sender}:${record.nonce}`,
181+
error
182+
);
183+
await this.pendingStorage.update(record.id, {
184+
status: "failed",
185+
lastError: error instanceof Error ? error.message : String(error),
186+
});
187+
this.waiter.notifyFailed(record.id, error);
188+
}
189+
}
190+
191+
private async checkSentTransaction(txId: string): Promise<void> {
192+
const record = await this.pendingStorage.findById(txId);
193+
if (!record) return;
194+
if (record.status !== "sent") return;
195+
196+
if (record.hash === undefined || record.hash.length === 0) {
197+
await this.retryTransaction(record);
198+
return;
199+
}
200+
201+
// Single status check (no long blocking loops)
202+
const result = await checkZkappTransactionStatus(record.hash);
203+
if (result.success) {
204+
await this.pendingStorage.update(record.id, {
205+
status: "included",
206+
});
207+
this.waiter.notifyIncluded(record.id, record.hash);
208+
return;
209+
} else if (result.failureReason) {
210+
log.error(`Transaction ${record.hash} failed`, result.failureReason);
211+
await this.retryTransaction(record);
212+
return;
213+
} else if (!result.success) {
214+
// recheck status after a timeout
215+
const now = new Date();
216+
const sentAt = record.sentAt?.getTime() ?? 0;
217+
const elapsed = now.getTime() - sentAt;
218+
if (elapsed < this.config.inclusionTimeoutMs) {
219+
await this.pendingStorage.update(record.id, {
220+
nextActionAt: new Date(
221+
now.getTime() + this.config.statusCheckIntervalMs
222+
),
223+
});
224+
return;
225+
}
226+
}
227+
228+
// if the transaction is not included after the inclusionTimeoutMs, retry the transaction
229+
await this.retryTransaction(record);
230+
}
231+
232+
private async retryTransaction(
233+
record: PendingL1TransactionRecord
234+
): Promise<void> {
235+
const latest = await this.pendingStorage.findById(record.id);
236+
if (!latest) return;
237+
if (latest.status === "included" || latest.status === "failed") return;
238+
239+
const shouldRetry = await this.retryStrategy.shouldRetry(latest);
240+
if (!shouldRetry) {
241+
await this.pendingStorage.update(latest.id, {
242+
status: "failed",
243+
lastError: latest.lastError ?? "Max attempts reached",
244+
});
245+
this.waiter.notifyFailed(
246+
latest.id,
247+
new Error(`Max attempts reached for ${latest.sender}:${latest.nonce}`)
248+
);
249+
return;
250+
}
251+
252+
try {
253+
// If the strategy wants a delay, schedule using nextActionAt
254+
const delayMs = this.retryStrategy.getRetryDelayMs?.(latest) ?? 0;
255+
const now = new Date();
256+
const earliestRetryAt =
257+
latest.sentAt !== undefined
258+
? new Date(latest.sentAt.getTime() + delayMs)
259+
: now;
260+
if (earliestRetryAt > now) {
261+
await this.pendingStorage.update(latest.id, {
262+
nextActionAt: earliestRetryAt,
263+
});
264+
return;
265+
}
266+
267+
const retryTx = await this.retryStrategy.prepareRetryTransaction(latest);
268+
const signedRetryTx = this.signer.signTx(
269+
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
270+
retryTx as Transaction<false, false>
271+
);
272+
await this.sendTransaction({
273+
...latest,
274+
transaction: signedRetryTx,
275+
});
276+
} catch (error) {
277+
log.error(`Failed to retry ${latest.sender}:${latest.nonce}`, error);
278+
await this.pendingStorage.update(latest.id, {
279+
status: "failed",
280+
lastError: error instanceof Error ? error.message : String(error),
281+
});
282+
this.waiter.notifyFailed(latest.id, error);
283+
}
284+
}
285+
}

packages/sequencer/src/settlement/transactions/L1TransactionRetryStrategy.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ import { PendingL1TransactionRecord } from "../../storage/repositories/PendingL1
55
export interface L1TransactionRetryStrategy {
66
shouldRetry(record: PendingL1TransactionRecord): Promise<boolean>;
77

8+
/**
9+
* Optional retry delay. If provided, the dispatcher will schedule
10+
* the next retry after the delay from the last sentAt time.
11+
*/
12+
getRetryDelayMs?(record: PendingL1TransactionRecord): number;
13+
814
prepareRetryTransaction(
915
record: PendingL1TransactionRecord
1016
): Promise<Transaction<any, false>>;

0 commit comments

Comments
 (0)