Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ export async function removeContractSubscription(fastify: FastifyInstance) {
handler: async (request, reply) => {
const { contractSubscriptionId } = request.body;

const contractSubscription = await deleteContractSubscription(
contractSubscriptionId,
);
await deleteContractSubscription(contractSubscriptionId);

reply.status(StatusCodes.OK).send({
result: {
Expand Down
4 changes: 4 additions & 0 deletions src/shared/utils/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export const env = createEnv({
QUEUE_FAIL_HISTORY_COUNT: z.coerce.number().default(10_000),
// Sets the number of recent nonces to map to queue IDs.
NONCE_MAP_COUNT: z.coerce.number().default(10_000),
// Sets the estimated number of blocks to query per contract subscription job. Defaults to 1 block (real-time).
CONTRACT_SUBSCRIPTION_BLOCK_RANGE: z.coerce.number().default(1),

ENABLE_KEYPAIR_AUTH: boolEnvSchema(false),
ENABLE_CUSTOM_HMAC_AUTH: boolEnvSchema(false),
Expand Down Expand Up @@ -136,6 +138,8 @@ export const env = createEnv({
QUEUE_COMPLETE_HISTORY_COUNT: process.env.QUEUE_COMPLETE_HISTORY_COUNT,
QUEUE_FAIL_HISTORY_COUNT: process.env.QUEUE_FAIL_HISTORY_COUNT,
NONCE_MAP_COUNT: process.env.NONCE_MAP_COUNT,
CONTRACT_SUBSCRIPTION_BLOCK_RANGE:
process.env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS:
process.env.EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS,
EXPERIMENTAL__MAX_GAS_PRICE_WEI:
Expand Down
24 changes: 19 additions & 5 deletions src/worker/indexers/chain-indexer-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import cron from "node-cron";
import { getBlockTimeSeconds } from "../../shared/utils/indexer/get-block-time";
import { logger } from "../../shared/utils/logger";
import { handleContractSubscriptions } from "../tasks/chain-indexer";
import { env } from "../../shared/utils/env";

// @TODO: Move all worker logic to Bullmq to better handle multiple hosts.
export const INDEXER_REGISTRY = {} as Record<number, cron.ScheduledTask>;
Expand All @@ -24,9 +25,10 @@ export const addChainIndexer = async (chainId: number) => {
});
blockTimeSeconds = 2;
}
const cronSchedule = createScheduleSeconds(
Math.max(Math.round(blockTimeSeconds), 1),
);
const cronSchedule = createScheduleSeconds({
blockTimeSeconds,
numBlocks: env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
});
logger({
service: "worker",
level: "info",
Expand Down Expand Up @@ -74,5 +76,17 @@ export const removeChainIndexer = async (chainId: number) => {
delete INDEXER_REGISTRY[chainId];
};

export const createScheduleSeconds = (seconds: number) =>
`*/${seconds} * * * * *`;
/**
* Returns the cron schedule given the chain's block time and the number of blocks to batch per job.
* Minimum is every 2 seconds.
*/
function createScheduleSeconds({
blockTimeSeconds,
numBlocks,
}: { blockTimeSeconds: number; numBlocks: number }) {
const pollFrequencySeconds = Math.max(
Math.round(blockTimeSeconds * numBlocks),
2,
);
return `*/${pollFrequencySeconds} * * * * *`;
}
Loading