diff --git a/.github/workflows/deploy-tag.yaml b/.github/workflows/deploy-tag.yaml index 52a40534..6e9e5702 100644 --- a/.github/workflows/deploy-tag.yaml +++ b/.github/workflows/deploy-tag.yaml @@ -74,8 +74,7 @@ jobs: platforms: linux/amd64 tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha - cache-to: type=gha,mode=max + dispatch: runs-on: aelfscan-runner needs: build-and-push-image diff --git a/.github/workflows/deploy-testnet-pre.yaml b/.github/workflows/deploy-testnet-pre.yaml index dd807f8c..07a60391 100644 --- a/.github/workflows/deploy-testnet-pre.yaml +++ b/.github/workflows/deploy-testnet-pre.yaml @@ -73,8 +73,7 @@ jobs: platforms: linux/amd64 tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha - cache-to: type=gha,mode=max + dispatch: runs-on: aelfscan-runner needs: build-and-push-image diff --git a/.github/workflows/deploy-testnet.yaml b/.github/workflows/deploy-testnet.yaml index a47a4d71..bcce1fde 100644 --- a/.github/workflows/deploy-testnet.yaml +++ b/.github/workflows/deploy-testnet.yaml @@ -73,8 +73,6 @@ jobs: platforms: linux/amd64 tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} - cache-from: type=gha - cache-to: type=gha,mode=max dispatch: runs-on: aelfscan-runner diff --git a/doc/token-transfer-monitoring-design.md b/doc/token-transfer-monitoring-design.md new file mode 100644 index 00000000..3b08b51a --- /dev/null +++ b/doc/token-transfer-monitoring-design.md @@ -0,0 +1,279 @@ +# Token Transfer Monitoring System Design + +## Overview +This document outlines the design for a comprehensive token transfer monitoring system for AElfScan. The system monitors blockchain transfer events using time-based incremental scanning and sends metrics to Prometheus for alerting and analysis. + +## Architecture + +### Core Components +1. **TokenTransferMonitoringWorker** - Scheduled background worker with startup delay +2. **TokenTransferMonitoringService** - Business logic and time-based data processing +3. **OpenTelemetry Integration** - Metrics collection and transmission +4. **Prometheus** - Metrics storage and alerting + +### Data Flow +``` +Blockchain → AElfScan Indexer → TokenTransferMonitoringWorker → TokenTransferMonitoringService → OpenTelemetry → Prometheus → Alerting +``` + +### Key Features +- **Time-based incremental scanning** (not block height based) +- **System contract filtering** using existing GlobalOptions.ContractNames +- **Simplified address classification** (Normal, Blacklist only) +- **30-second startup delay** to avoid system startup overload +- **UTC time handling** with Redis-based scan time tracking +- **Single simplified metric** with essential dimensions + +## Prometheus Metrics Design + +### Single Unified Metric +We use one simplified histogram metric that captures essential transfer event dimensions: + +```prometheus +# HELP aelf_transfer_events Token transfer events with amount and metadata +# TYPE aelf_transfer_events histogram +aelf_transfer_events{ + chain_id="AELF", + symbol="ELF", + transfer_type="Transfer", + from_address="2N6dJpBcS5TLm2Pj4GkMdj4MnLhbKu8FGDX3Mz...", + to_address="2N6dJpBcS5TLm2Pj4GkMdj4MnLhbKu8FGDX3Mz...", + from_address_type="Normal", + to_address_type="Blacklist", + transaction_id="abc123..." +} +``` + +### Metric Dimensions + +| Label | Values | Description | +|-------|--------|-------------| +| `chain_id` | AELF, tDVV, tDVW | Blockchain identifier | +| `symbol` | ELF, USDT, BTC, ETH, etc. | Token symbol | +| `transfer_type` | Transfer, Burn, CrossChainTransfer, CrossChainReceive | Transfer operation type | +| `from_address` | Address string | Source address of the transfer | +| `to_address` | Address string | Destination address of the transfer | +| `from_address_type` | Normal, Blacklist | Source address classification | +| `to_address_type` | Normal, Blacklist | Destination address classification | +| `transaction_id` | Transaction hash | Unique transaction identifier for tracking | + +### Histogram Buckets +Amount distribution tracking with 4 buckets for clear categorization: +- **10**: Micro transfers (≤10) +- **1000**: Small transfers (10-1000) +- **100000**: Large transfers (1000-100000) +- **Infinity**: Massive transfers (>100000) + +## PromQL Query Examples + +### 1. Large Amount Transfers +```promql +# Transfers over 100,000 ELF in the last hour +increase(aelf_transfer_events_sum{symbol="ELF"}[1h]) +/ increase(aelf_transfer_events_count{symbol="ELF"}[1h]) > 100000 + +# Total large transfers by from address +sum by (from_address) ( + increase(aelf_transfer_events_sum{symbol="ELF"}[1h]) +) > 500000 +``` + +### 2. High-Frequency Trading +```promql +# Addresses with more than 100 transfers in the last hour +sum by (from_address) ( + increase(aelf_transfer_events_count[1h]) +) > 100 + +# High-frequency between specific addresses +sum by (from_address, to_address) ( + increase(aelf_transfer_events_count[1h]) +) > 50 +``` + +### 3. Blacklist Address Monitoring +```promql +# All transfers from blacklist addresses +increase(aelf_transfer_events_count{from_address_type="Blacklist"}[1h]) + +# All transfers to blacklist addresses +increase(aelf_transfer_events_count{to_address_type="Blacklist"}[1h]) + +# Large amounts involving blacklist addresses +increase(aelf_transfer_events_sum{ + from_address_type="Blacklist" OR to_address_type="Blacklist" +}[1h]) +``` + +### 4. Cross-Chain Activity +```promql +# Cross-chain transfer volume +sum by (chain_id) ( + increase(aelf_transfer_events_sum{transfer_type="CrossChainTransfer"}[1h]) +) + +# Cross-chain transfer frequency +sum by (chain_id) ( + increase(aelf_transfer_events_count{transfer_type="CrossChainTransfer"}[1h]) +) +``` + +### 5. Transaction Tracking +```promql +# Specific transaction monitoring +aelf_transfer_events_count{transaction_id="abc123..."} + +# Transactions involving specific addresses +aelf_transfer_events_count{ + from_address="2N6dJpBcS5TLm2Pj4GkMdj4MnLhbKu8FGDX3Mz..." OR + to_address="2N6dJpBcS5TLm2Pj4GkMdj4MnLhbKu8FGDX3Mz..." +} +``` + +## Alert Rules Configuration + +### 1. Large Amount Alerts +```yaml +groups: +- name: large_transfers + rules: + - alert: LargeELFTransfer + expr: | + increase(aelf_transfer_events_sum{symbol="ELF"}[5m]) + / increase(aelf_transfer_events_count{symbol="ELF"}[5m]) > 100000 + for: 0m + labels: + severity: warning + annotations: + summary: "Large ELF transfer detected" + description: "Transfer of {{ $value }} ELF detected from {{ $labels.from_address }}" + + - alert: MassiveTransferVolume + expr: | + sum by (from_address) ( + increase(aelf_transfer_events_sum[1h]) + ) > 1000000 + for: 5m + labels: + severity: critical + annotations: + summary: "Massive transfer volume from {{ $labels.from_address }}" +``` + +### 2. High Frequency Alerts +```yaml +- name: high_frequency + rules: + - alert: HighFrequencyTrading + expr: | + sum by (from_address) ( + increase(aelf_transfer_events_count[1h]) + ) > 100 + for: 10m + labels: + severity: warning + annotations: + summary: "High frequency trading detected from {{ $labels.from_address }}" + + - alert: TransferBurst + expr: | + sum by (from_address) ( + increase(aelf_transfer_events_count[5m]) + ) > 20 + for: 0m + labels: + severity: critical + annotations: + summary: "Transfer burst detected from {{ $labels.from_address }}" +``` + +### 3. Blacklist Alerts +```yaml +- name: blacklist_monitoring + rules: + - alert: BlacklistActivity + expr: | + increase(aelf_transfer_events_count{ + from_address_type="Blacklist" OR to_address_type="Blacklist" + }[1m]) > 0 + for: 0m + labels: + severity: critical + annotations: + summary: "Blacklist address activity detected" + description: "Transfer involving blacklist address: from={{ $labels.from_address }}, to={{ $labels.to_address }}" +``` + +## Configuration Management + +### Application Configuration +```json +{ + "TokenTransferMonitoring": { + "EnableMonitoring": true, + "EnableSystemContractFilter": true, + "BlacklistAddresses": [ + "2N6dJpBcS5TLm2Pj4GkMdj4MnLhbKu8FGDX3Mz1", + "2N6dJpBcS5TLm2Pj4GkMdj4MnLhbKu8FGDX3Mz2" + ], + "MonitoredTokens": ["ELF", "USDT", "BTC", "ETH"], + "ScanConfig": { + "ChainIds": ["AELF", "tDVV", "tDVW"], + "IntervalSeconds": 30, + "BatchSize": 1000, + "RedisKeyPrefix": "token_transfer_monitoring" + }, + "HistogramBuckets": [10, 1000, 100000, "Infinity"] + } +} +``` + +### System Contract Filtering +The system uses existing `GlobalOptions.ContractNames` configuration for system contract filtering: +- No additional configuration needed +- Leverages existing contract address mappings +- Can be disabled via `EnableSystemContractFilter: false` + +## Implementation Details + +### Time-Based Scanning +- **Incremental scanning** based on block time, not block height +- **Default scan window**: 60 minutes backward from current time +- **Redis state management**: Stores last scan time per chain +- **UTC time handling**: Ensures consistent time processing across systems + +### Worker Startup Strategy +- **30-second startup delay** to avoid system startup overload +- **No immediate execution** (RunOnStart = false) +- **Graceful startup** with other system Workers + +### Error Handling +- **Chain-level isolation**: Failure in one chain doesn't affect others +- **Comprehensive logging**: Detailed error tracking and performance metrics +- **Graceful degradation**: Continues operation even with partial failures + +### Performance Optimizations +- **Batch processing**: Configurable batch sizes for efficient data processing +- **Safety limits**: 10,000 record limit to prevent memory issues +- **Incremental updates**: Only processes new data since last scan +- **Efficient Redis operations**: Minimal Redis calls with optimized key management + +## Monitoring and Observability + +### Logs +- Worker startup and configuration +- Scan progress and timing +- Transfer processing statistics +- Error conditions and recovery + +### Metrics +- Transfer volume and frequency +- Processing performance +- System contract filtering effectiveness +- Blacklist address activity + +### Health Checks +- Redis connectivity +- Indexer API availability +- Metric transmission success +- Configuration validation \ No newline at end of file diff --git a/src/AElfScanServer.Worker.Core/AElfScanServerWorkerCoreModule.cs b/src/AElfScanServer.Worker.Core/AElfScanServerWorkerCoreModule.cs index 00e5f77f..39bf9d2b 100644 --- a/src/AElfScanServer.Worker.Core/AElfScanServerWorkerCoreModule.cs +++ b/src/AElfScanServer.Worker.Core/AElfScanServerWorkerCoreModule.cs @@ -3,6 +3,7 @@ using AElfScanServer.HttpApi.DataStrategy; using AElfScanServer.Common.Options; using AElfScanServer.HttpApi; +using AElfScanServer.Worker.Core.Options; using AElfScanServer.Worker.Core.Provider; using Microsoft.Extensions.DependencyInjection; using Volo.Abp.AutoMapper; @@ -24,5 +25,6 @@ public override void ConfigureServices(ServiceConfigurationContext context) context.Services.AddSingleton(); var configuration = context.Services.GetConfiguration(); Configure(configuration.GetSection("PullTransactionChainIds")); + Configure(configuration.GetSection("TokenTransferMonitoring")); } } \ No newline at end of file diff --git a/src/AElfScanServer.Worker.Core/Dtos/TransferEventDto.cs b/src/AElfScanServer.Worker.Core/Dtos/TransferEventDto.cs new file mode 100644 index 00000000..8ad12942 --- /dev/null +++ b/src/AElfScanServer.Worker.Core/Dtos/TransferEventDto.cs @@ -0,0 +1,57 @@ +using System; + +namespace AElfScanServer.Worker.Core.Dtos; + +public class TransferEventDto +{ + public string ChainId { get; set; } + public string TransactionId { get; set; } + public long BlockHeight { get; set; } + public DateTime Timestamp { get; set; } + public string Symbol { get; set; } + public string FromAddress { get; set; } + public string ToAddress { get; set; } + public decimal Amount { get; set; } + public decimal UsdValue { get; set; } + public TransferType Type { get; set; } + public AddressClassification FromAddressType { get; set; } + public AddressClassification ToAddressType { get; set; } +} + +public enum TransferType +{ + Transfer, + Burn, + CrossChainTransfer, + CrossChainReceive +} + +public enum AddressClassification +{ + Normal, + Blacklist, + ToOnlyMonitored, + LargeAmountOnly +} + +/// +/// Bidirectional transfer perspective data +/// +public class TransferDirectionDto +{ + public string ChainId { get; set; } + public string Symbol { get; set; } + public TransferType TransferType { get; set; } + public string Direction { get; set; } // "out" or "in" + public string Address { get; set; } + public string CounterpartAddress { get; set; } + public AddressClassification AddressType { get; set; } + public AddressClassification CounterpartAddressType { get; set; } + public string TransactionId { get; set; } + public string BlockHeight { get; set; } + public decimal Amount { get; set; } +} + +/// +/// Transfer event data transfer object for monitoring purposes +/// \ No newline at end of file diff --git a/src/AElfScanServer.Worker.Core/Options/TokenTransferMonitoringOptions.cs b/src/AElfScanServer.Worker.Core/Options/TokenTransferMonitoringOptions.cs new file mode 100644 index 00000000..e314d74e --- /dev/null +++ b/src/AElfScanServer.Worker.Core/Options/TokenTransferMonitoringOptions.cs @@ -0,0 +1,69 @@ +using System.Collections.Generic; + +namespace AElfScanServer.Worker.Core.Options; + +public class TokenTransferMonitoringOptions +{ + /// + /// Enable or disable token transfer monitoring, default is true + /// + public bool EnableMonitoring { get; set; } = true; + + /// + /// Enable or disable system contract transfer filtering, default is true + /// + public bool EnableSystemContractFilter { get; set; } = true; + + /// + /// Blacklist addresses for monitoring + /// + public List BlacklistAddresses { get; set; } = new(); + + /// + /// Addresses that are only monitored when they are recipients (to addresses) + /// + public List ToOnlyMonitoredAddresses { get; set; } = new(); + + /// + /// Addresses that are only monitored for large amount transfers + /// + public List LargeAmountOnlyAddresses { get; set; } = new(); + + /// + /// Minimum USD value threshold for histogram recording, default is 0 + /// + public decimal MinUsdValueThreshold { get; set; } = 0m; + + /// + /// List of tokens to monitor + /// + public List MonitoredTokens { get; set; } = new() { "ELF", "USDT", "BTC", "ETH" }; + + /// + /// Scan configuration + /// + public ScanConfig ScanConfig { get; set; } = new(); +} + +public class ScanConfig +{ + /// + /// Chain IDs to monitor + /// + public List ChainIds { get; set; } = new() { "AELF", "tDVV" }; + + /// + /// Scan interval in seconds + /// + public int IntervalSeconds { get; set; } = 30; + + /// + /// Batch size for each scan + /// + public int BatchSize { get; set; } = 1000; + + /// + /// Redis key prefix for storing scan progress + /// + public string RedisKeyPrefix { get; set; } = "token_transfer_monitoring"; +} \ No newline at end of file diff --git a/src/AElfScanServer.Worker.Core/Service/ITokenTransferMonitoringService.cs b/src/AElfScanServer.Worker.Core/Service/ITokenTransferMonitoringService.cs new file mode 100644 index 00000000..7cae0d99 --- /dev/null +++ b/src/AElfScanServer.Worker.Core/Service/ITokenTransferMonitoringService.cs @@ -0,0 +1,28 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using AElfScanServer.Worker.Core.Dtos; + +namespace AElfScanServer.Worker.Core.Service; + +public interface ITokenTransferMonitoringService +{ + /// + /// Incrementally get transfer events based on time scanning + /// + Task> GetTransfersAsync(string chainId); + + /// + /// Process single transfer event and send metrics + /// + void ProcessTransfer(TransferEventDto transfer); + + /// + /// Process multiple transfer events and send metrics + /// + void ProcessTransfers(List transfers); + + /// + /// Send transfer metrics to monitoring system + /// + void SendTransferMetrics(TransferEventDto transfer); +} \ No newline at end of file diff --git a/src/AElfScanServer.Worker.Core/Service/TokenTransferMonitoringService.cs b/src/AElfScanServer.Worker.Core/Service/TokenTransferMonitoringService.cs new file mode 100644 index 00000000..ea86e49b --- /dev/null +++ b/src/AElfScanServer.Worker.Core/Service/TokenTransferMonitoringService.cs @@ -0,0 +1,428 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Diagnostics.Metrics; +using System.Globalization; +using AElf.OpenTelemetry; +using AElfScanServer.Common.Constant; +using AElfScanServer.Common.Dtos; +using AElfScanServer.Common.Dtos.Input; +using AElfScanServer.Common.IndexerPluginProvider; +using AElfScanServer.Common.Options; +using AElfScanServer.Common.Token; +using AElfScanServer.Worker.Core.Dtos; +using AElfScanServer.Worker.Core.Options; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Volo.Abp.Caching; +using Volo.Abp.DependencyInjection; + +namespace AElfScanServer.Worker.Core.Service; + +public class TokenTransferMonitoringService : ITokenTransferMonitoringService, ISingletonDependency +{ + private const int DefaultMaxResultCount = 1000; + private const int SafetyRecordLimit = 10000; + private const int DefaultScanTimeMinutes = -60; + private const string LastScanTimeKey = "last_scan_time"; + + private readonly ITokenIndexerProvider _tokenIndexerProvider; + private readonly IDistributedCache _distributedCache; + private readonly ILogger _logger; + private readonly TokenTransferMonitoringOptions _options; + private readonly IOptionsMonitor _globalOptions; + private readonly ITokenPriceService _tokenPriceService; + private readonly Histogram _transferUSDEventsHistogram; + private readonly Counter _transferCountsCounter; + private readonly HashSet _blacklistAddresses; + private readonly HashSet _toOnlyMonitoredAddresses; + private readonly HashSet _largeAmountOnlyAddresses; + private readonly IOptionsMonitor _optionsMonitor; + + public TokenTransferMonitoringService( + ITokenIndexerProvider tokenIndexerProvider, + IDistributedCache distributedCache, + ILogger logger, + IOptions options, + IOptionsMonitor globalOptions, + IOptionsMonitor optionsMonitor, + IInstrumentationProvider instrumentationProvider, + ITokenPriceService tokenPriceService) + { + _tokenIndexerProvider = tokenIndexerProvider; + _distributedCache = distributedCache; + _logger = logger; + _options = options.Value; + _globalOptions = globalOptions; + _optionsMonitor = optionsMonitor; + _tokenPriceService = tokenPriceService; + + // Initialize address sets for fast lookup + _blacklistAddresses = new HashSet(_options.BlacklistAddresses, StringComparer.OrdinalIgnoreCase); + _toOnlyMonitoredAddresses = new HashSet(_options.ToOnlyMonitoredAddresses, StringComparer.OrdinalIgnoreCase); + _largeAmountOnlyAddresses = new HashSet(_options.LargeAmountOnlyAddresses, StringComparer.OrdinalIgnoreCase); + // Initialize histogram with configured buckets + _transferUSDEventsHistogram = instrumentationProvider.Meter.CreateHistogram( + "aelf_transfer_usd_value", + "ms", + "Token transfer events with amount distribution"); + + // Initialize counter for transfer counts + _transferCountsCounter = instrumentationProvider.Meter.CreateCounter( + "aelf_transfer_counts", + "counts", + "Token transfer counts by various dimensions"); + } + + public async Task> GetTransfersAsync(string chainId) + { + var transfers = new List(); + DateTime? latestBlockTime = null; + + try + { + var beginBlockTime = await GetLastScanTimeAsync(chainId) ?? DateTime.UtcNow.AddMinutes(DefaultScanTimeMinutes); + var skip = 0; + + while (skip < SafetyRecordLimit) + { + var batchResult = await GetTransferBatchAsync(chainId, beginBlockTime, skip); + if (!batchResult.hasData) + break; + + transfers.AddRange(batchResult.transfers); + + // Track the latest block time from the data + if (batchResult.latestBlockTime.HasValue) + { + latestBlockTime = latestBlockTime.HasValue + ? (batchResult.latestBlockTime > latestBlockTime ? batchResult.latestBlockTime : latestBlockTime) + : batchResult.latestBlockTime; + } + + if (batchResult.transfers.Count < DefaultMaxResultCount) + break; + + skip += DefaultMaxResultCount; + } + + if (skip >= SafetyRecordLimit) + { + _logger.LogWarning("Reached safety limit of {Limit} records for chain {ChainId}", SafetyRecordLimit, chainId); + } + + // Only update last scan time when we actually processed data + if (latestBlockTime.HasValue) + { + await UpdateLastScanTimeAsync(chainId, latestBlockTime.Value); + _logger.LogInformation("Retrieved {Count} transfers for chain {ChainId} from time {BeginTime}, updated to latest block time: {LatestBlockTime}", + transfers.Count, chainId, beginBlockTime, latestBlockTime.Value); + } + else + { + _logger.LogInformation("No transfers found for chain {ChainId} from time {BeginTime}, scan time not updated", + chainId, beginBlockTime); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error retrieving transfers for chain {ChainId}", chainId); + } + + return transfers; + } + + private async Task<(List transfers, long maxHeight, bool hasData, DateTime? latestBlockTime)> GetTransferBatchAsync( + string chainId, DateTime beginBlockTime, int skip) + { + var input = new TokenTransferInput + { + ChainId = chainId, + BeginBlockTime = beginBlockTime, + SkipCount = skip, + MaxResultCount = DefaultMaxResultCount, + Types = new List { SymbolType.Token } + }; + + var result = await _tokenIndexerProvider.GetTokenTransfersAsync(input); + + if (result?.List == null || !result.List.Any()) + { + return (new List(), 0, false, null); + } + + var filteredList = result.List + .Where(item => _options.MonitoredTokens.Contains(item.Symbol)) + .ToList(); + + if (!filteredList.Any()) + { + return (new List(), 0, false, null); + } + + // Get unique symbols for price lookup + var uniqueSymbols = filteredList.Select(x => x.Symbol).Distinct().ToList(); + var priceDict = new Dictionary(); + + // Fetch prices for all unique symbols + foreach (var symbol in uniqueSymbols) + { + try + { + var priceDto = await _tokenPriceService.GetTokenPriceAsync(symbol, CurrencyConstant.UsdCurrency); + priceDict[symbol] = priceDto.Price; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to get price for symbol {Symbol}, using 0", symbol); + priceDict[symbol] = 0m; + } + } + + // Convert all transfers and calculate USD value + var transfers = new List(); + + foreach (var item in filteredList) + { + var transfer = ConvertToTransferEventDto(item); + + // Calculate USD value + if (priceDict.TryGetValue(item.Symbol, out var price)) + { + transfer.UsdValue = Math.Round(transfer.Amount * price, CommonConstant.UsdValueDecimals); + } + + // Reclassify addresses with USD value context + transfer.FromAddressType = ClassifyAddress(transfer.FromAddress); + transfer.ToAddressType = ClassifyAddress(transfer.ToAddress); + + // Add all transfers (filtering will be done in SendTransferMetrics) + transfers.Add(transfer); + } + + var maxHeight = result.List.Max(x => x.BlockHeight); + + // Track the latest block time from the data + var latestBlockTime = result.List.Max(x => x.DateTime); + + // Return all transfers for processing + return (transfers, maxHeight, true, latestBlockTime); + } + + private async Task GetLastScanTimeAsync(string chainId) + { + var timeString = await GetRedisValueAsync(LastScanTimeKey, chainId); + if (!string.IsNullOrEmpty(timeString) && DateTime.TryParse(timeString, null, DateTimeStyles.RoundtripKind, out var lastTime)) + { + // Ensure the parsed time is treated as UTC + return lastTime.Kind == DateTimeKind.Utc ? lastTime : DateTime.SpecifyKind(lastTime, DateTimeKind.Utc); + } + return null; + } + + private async Task UpdateLastScanTimeAsync(string chainId, DateTime scanTime) + { + // Ensure the time is UTC before saving + var utcTime = scanTime.Kind == DateTimeKind.Utc ? scanTime : scanTime.ToUniversalTime(); + await SetRedisValueAsync(LastScanTimeKey, chainId, utcTime.ToString("O")); + } + + private async Task GetRedisValueAsync(string keyType, string chainId) + { + try + { + var key = BuildRedisKey(keyType, chainId); + return await _distributedCache.GetAsync(key) ?? ""; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error getting Redis value for key type {KeyType}, chain {ChainId}", keyType, chainId); + return ""; + } + } + + private async Task SetRedisValueAsync(string keyType, string chainId, string value) + { + try + { + var key = BuildRedisKey(keyType, chainId); + await _distributedCache.SetAsync(key, value); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error setting Redis value for key type {KeyType}, chain {ChainId}", keyType, chainId); + } + } + + public void ProcessTransfer(TransferEventDto transfer) + { + try + { + // Filter system contract transfers if enabled + var options = _optionsMonitor.CurrentValue; + if (options.EnableSystemContractFilter && IsSystemContractTransfer(transfer.FromAddress)) + { + _logger.LogInformation("Skipping system contract transfer from {FromAddress}", transfer.FromAddress); + return; + } + + SendTransferMetrics(transfer); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing transfer: {TransferId}", transfer.TransactionId); + } + } + + /// + /// Check if the from address is a system contract address + /// + private bool IsSystemContractTransfer(string fromAddress) + { + if (string.IsNullOrEmpty(fromAddress)) + return false; + + var contractNames = _globalOptions.CurrentValue.ContractNames; + if (contractNames == null) + return false; + + // Check if the address exists in any chain's contract names + return contractNames.Values.Any(chainContracts => + chainContracts != null && chainContracts.ContainsKey(fromAddress)); + } + + public void ProcessTransfers(List transfers) + { + foreach (var transfer in transfers) + { + ProcessTransfer(transfer); + } + } + + public void SendTransferMetrics(TransferEventDto transfer) + { + try + { + // Get current options for dynamic configuration + var currentOptions = _optionsMonitor.CurrentValue; + + // Determine if this is a high-value transfer once + var isHighValue = transfer.UsdValue >= currentOptions.MinUsdValueThreshold; + + // Record outbound transaction (from perspective) + var outboundTags = new KeyValuePair[] + { + new("chain_id", transfer.ChainId), + new("symbol", transfer.Symbol), + new("transfer_type", transfer.Type.ToString()), + new("address", transfer.FromAddress), + new("direction", "outbound"), + new("address_type", transfer.FromAddressType.ToString()), + }; + + // Record inbound transaction (to perspective) + var inboundTags = new KeyValuePair[] + { + new("chain_id", transfer.ChainId), + new("symbol", transfer.Symbol), + new("transfer_type", transfer.Type.ToString()), + new("address", transfer.ToAddress), + new("direction", "inbound"), + new("address_type", transfer.ToAddressType.ToString()), + }; + + // Always record counter (for all transfers) + + if (!transfer.FromAddress.IsNullOrEmpty()) + { + _transferCountsCounter.Add(1, outboundTags); + } + + if (!transfer.ToAddress.IsNullOrEmpty() || !IsSystemContractTransfer(transfer.ToAddress)) + { + _transferCountsCounter.Add(1, inboundTags); + } + // Only record histogram for high-value transfers + if (isHighValue) + { + if (!transfer.FromAddress.IsNullOrEmpty()) + { + _transferUSDEventsHistogram.Record((double)transfer.UsdValue, outboundTags); + } + + if (!transfer.ToAddress.IsNullOrEmpty() || !IsSystemContractTransfer(transfer.ToAddress)) + { + _transferUSDEventsHistogram.Record((double)transfer.UsdValue, inboundTags); + } + _logger.LogInformation("Sent transfer metrics for transaction {TransactionId}, amount {Amount} {Symbol}, USD value {UsdValue}", + transfer.TransactionId, transfer.Amount, transfer.Symbol, transfer.UsdValue); + } + else + { + _logger.LogInformation("Sent counter metrics only for transaction {TransactionId}, amount {Amount} {Symbol}, USD value {UsdValue} below histogram threshold", + transfer.TransactionId, transfer.Amount, transfer.Symbol, transfer.UsdValue); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error sending transfer metrics for transaction {TransactionId}", + transfer.TransactionId); + } + } + + private TransferEventDto ConvertToTransferEventDto(TokenTransferInfoDto dto) + { + return new TransferEventDto + { + ChainId = dto.ChainId, + TransactionId = dto.TransactionId, + BlockHeight = dto.BlockHeight, + Timestamp = dto.DateTime, + Symbol = dto.Symbol, + FromAddress = dto.From?.Address ?? "", + ToAddress = dto.To?.Address ?? "", + Amount = dto.Quantity, + Type = ParseTransferType(dto.Method), + FromAddressType = ClassifyAddress(dto.From?.Address ?? ""), + ToAddressType = ClassifyAddress(dto.To?.Address ?? "") + }; + } + + private AddressClassification ClassifyAddress(string address) + { + if (string.IsNullOrEmpty(address)) + return AddressClassification.Normal; + + // Check blacklist first (highest priority) + if (_blacklistAddresses.Contains(address)) + return AddressClassification.Blacklist; + + // Check ToOnlyMonitored addresses (only when it's a recipient address) + if (_toOnlyMonitoredAddresses.Contains(address)) + return AddressClassification.ToOnlyMonitored; + + // Check LargeAmountOnly addresses (only for large transfers) + if (_largeAmountOnlyAddresses.Contains(address) ) + return AddressClassification.LargeAmountOnly; + + return AddressClassification.Normal; + } + + private static TransferType ParseTransferType(string method) + { + return method?.ToLower() switch + { + "transfer" => TransferType.Transfer, + "burn" => TransferType.Burn, + "crosschaintransfer" => TransferType.CrossChainTransfer, + "crosschainreceive" => TransferType.CrossChainReceive, + _ => TransferType.Transfer + }; + } + + private string BuildRedisKey(string keyType, string chainId) + { + return $"{_options.ScanConfig.RedisKeyPrefix}:{keyType}:{chainId}"; + } +} \ No newline at end of file diff --git a/src/AElfScanServer.Worker.Core/Worker/BlockSizeWorker.cs b/src/AElfScanServer.Worker.Core/Worker/BlockSizeWorker.cs index d8aa5855..ff39bdf6 100644 --- a/src/AElfScanServer.Worker.Core/Worker/BlockSizeWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/BlockSizeWorker.cs @@ -19,7 +19,6 @@ public BlockSizeWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFac serviceScopeFactory) { timer.Period = 1000 * 60 * 60; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/ContractFileWorker.cs b/src/AElfScanServer.Worker.Core/Worker/ContractFileWorker.cs index 4d5bfedb..46bbb73f 100644 --- a/src/AElfScanServer.Worker.Core/Worker/ContractFileWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/ContractFileWorker.cs @@ -26,7 +26,6 @@ public ContractFileWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScope serviceScopeFactory) { timer.Period = 1000 * 60 * 5; - timer.RunOnStart = true; _logger = logger; _workerOptions = workerOptions; _contractAppService = contractAppService; diff --git a/src/AElfScanServer.Worker.Core/Worker/DailyNetworkStatisticWorker.cs b/src/AElfScanServer.Worker.Core/Worker/DailyNetworkStatisticWorker.cs index 0c2dc3c6..6b5881c2 100644 --- a/src/AElfScanServer.Worker.Core/Worker/DailyNetworkStatisticWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/DailyNetworkStatisticWorker.cs @@ -19,7 +19,6 @@ public DailyNetworkStatisticWorker(AbpAsyncTimer timer, IServiceScopeFactory ser serviceScopeFactory) { timer.Period = 1000 * 60 * 60; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/FixDailyTransactionWorker.cs b/src/AElfScanServer.Worker.Core/Worker/FixDailyTransactionWorker.cs index db62b85a..d5caf5a7 100644 --- a/src/AElfScanServer.Worker.Core/Worker/FixDailyTransactionWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/FixDailyTransactionWorker.cs @@ -19,7 +19,6 @@ public FixDailyTransactionWorker(AbpAsyncTimer timer, IServiceScopeFactory servi serviceScopeFactory) { timer.Period = 1000 * 30; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/LogEventDelWorker.cs b/src/AElfScanServer.Worker.Core/Worker/LogEventDelWorker.cs index 77602533..2c018502 100644 --- a/src/AElfScanServer.Worker.Core/Worker/LogEventDelWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/LogEventDelWorker.cs @@ -19,7 +19,6 @@ public LogEventDelWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeF serviceScopeFactory) { timer.Period = 1000 * 60 * 15; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/LogEventWorker.cs b/src/AElfScanServer.Worker.Core/Worker/LogEventWorker.cs index e36aa768..00699d76 100644 --- a/src/AElfScanServer.Worker.Core/Worker/LogEventWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/LogEventWorker.cs @@ -19,7 +19,6 @@ public LogEventWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFact serviceScopeFactory) { timer.Period = 1000 * 2; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/MergeAddressWorker.cs b/src/AElfScanServer.Worker.Core/Worker/MergeAddressWorker.cs index 094eb935..1f9d7c73 100644 --- a/src/AElfScanServer.Worker.Core/Worker/MergeAddressWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/MergeAddressWorker.cs @@ -18,7 +18,6 @@ public MergeAddressWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScope serviceScopeFactory) { timer.Period = 1000 * 60 * 60 * 2; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/DeleteMergeBlocksWorker.cs b/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/DeleteMergeBlocksWorker.cs index 3ad9cebd..9fe05fe3 100644 --- a/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/DeleteMergeBlocksWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/DeleteMergeBlocksWorker.cs @@ -19,7 +19,6 @@ public DeleteMergeBlocksWorker(AbpAsyncTimer timer, IServiceScopeFactory service serviceScopeFactory) { timer.Period = 1000 * 60 * 60; - timer.RunOnStart = true; _logger = logger; _addressService = addressService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/FixTokenHolderInfoWorker.cs b/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/FixTokenHolderInfoWorker.cs index 0379bba0..ab563214 100644 --- a/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/FixTokenHolderInfoWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/FixTokenHolderInfoWorker.cs @@ -20,7 +20,6 @@ public FixTokenHolderInfoWorker(AbpAsyncTimer timer, IServiceScopeFactory servic serviceScopeFactory) { timer.Period = 1000 * 60 * 5; - timer.RunOnStart = true; _logger = logger; _addressService = addressService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/TokenInfoWorker.cs b/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/TokenInfoWorker.cs index 5666fa95..0d436aba 100644 --- a/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/TokenInfoWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/MergeDataWorker/TokenInfoWorker.cs @@ -20,7 +20,6 @@ public TokenInfoWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFac serviceScopeFactory) { timer.Period = 1000 * 60 * 5; - timer.RunOnStart = true; _logger = logger; _addressService = addressService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/MonthlyActiveAddressWorker.cs b/src/AElfScanServer.Worker.Core/Worker/MonthlyActiveAddressWorker.cs index 6aa0dc68..b6b3975b 100644 --- a/src/AElfScanServer.Worker.Core/Worker/MonthlyActiveAddressWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/MonthlyActiveAddressWorker.cs @@ -19,7 +19,6 @@ public MonthlyActiveAddressWorker(AbpAsyncTimer timer, IServiceScopeFactory serv serviceScopeFactory) { timer.Period = 1000 * 60; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/RoundWorker.cs b/src/AElfScanServer.Worker.Core/Worker/RoundWorker.cs index 959b7ab3..cd39271e 100644 --- a/src/AElfScanServer.Worker.Core/Worker/RoundWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/RoundWorker.cs @@ -19,7 +19,6 @@ public RoundWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory serviceScopeFactory) { timer.Period = 1000 * 60; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/TokenHolderPercentWorker.cs b/src/AElfScanServer.Worker.Core/Worker/TokenHolderPercentWorker.cs index 8097bc7e..47d7a479 100644 --- a/src/AElfScanServer.Worker.Core/Worker/TokenHolderPercentWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/TokenHolderPercentWorker.cs @@ -38,7 +38,6 @@ public TokenHolderPercentWorker(AbpAsyncTimer timer, IServiceScopeFactory servic _tokenHolderPercentProvider = tokenHolderPercentProvider; _tokenIndexerProvider = tokenIndexerProvider; _workerOptions = workerOptionsMonitor; - Timer.RunOnStart = true; timer.Period = _workerOptions.CurrentValue.GetWorkerPeriodMinutes(WorkerKey) * 60 * 1000; var uris = options.CurrentValue.Url.ConvertAll(x => new Uri(x)); var connectionPool = new StaticConnectionPool(uris); diff --git a/src/AElfScanServer.Worker.Core/Worker/TokenTransferMonitoringWorker.cs b/src/AElfScanServer.Worker.Core/Worker/TokenTransferMonitoringWorker.cs new file mode 100644 index 00000000..51c7cfed --- /dev/null +++ b/src/AElfScanServer.Worker.Core/Worker/TokenTransferMonitoringWorker.cs @@ -0,0 +1,121 @@ +using System; +using System.Threading.Tasks; +using AElfScanServer.Worker.Core.Options; +using AElfScanServer.Worker.Core.Service; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Volo.Abp.BackgroundWorkers; +using Volo.Abp.Threading; + +namespace AElfScanServer.Worker.Core.Worker; + +public class TokenTransferMonitoringWorker : AsyncPeriodicBackgroundWorkerBase +{ + private const string WorkerName = "TokenTransferMonitoringWorker"; + private readonly ILogger _logger; + private readonly IOptionsMonitor _optionsMonitor; + + public TokenTransferMonitoringWorker( + AbpAsyncTimer timer, + IServiceScopeFactory serviceScopeFactory, + ILogger logger, + IOptionsMonitor optionsMonitor, + IOptionsMonitor workerOptions) : base(timer, serviceScopeFactory) + { + _logger = logger; + _optionsMonitor = optionsMonitor; + + // Use WorkerOptions for timer period configuration, fallback to TokenTransferMonitoringOptions + var workerPeriodMinutes = workerOptions.CurrentValue.GetWorkerPeriodMinutes(WorkerName); + if (workerPeriodMinutes == Options.Worker.DefaultMinutes) // If not configured in WorkerOptions, use TokenTransferMonitoringOptions + { + var intervalSeconds = _optionsMonitor.CurrentValue.ScanConfig.IntervalSeconds; + timer.Period = intervalSeconds * 1000; + _logger.LogInformation("TokenTransferMonitoringWorker initialized with TokenTransferMonitoringOptions interval: {Interval}s", intervalSeconds); + } + else + { + timer.Period = workerPeriodMinutes * 60 * 1000; + _logger.LogInformation("TokenTransferMonitoringWorker initialized with WorkerOptions interval: {Interval} minutes", workerPeriodMinutes); + } + _logger.LogInformation("TokenTransferMonitoringWorker configured successfully"); + } + + protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext) + { + try + { + var options = _optionsMonitor.CurrentValue; + + if (!options.EnableMonitoring) + { + _logger.LogDebug("Token transfer monitoring is disabled"); + return; + } + + _logger.LogInformation("Starting Token transfer monitoring scan..."); + + using var scope = ServiceScopeFactory.CreateScope(); + var monitoringService = scope.ServiceProvider.GetRequiredService(); + + var chainIds = options.ScanConfig.ChainIds; + var batchSize = options.ScanConfig.BatchSize; + + foreach (var chainId in chainIds) + { + try + { + await ProcessChainTransfers(monitoringService, chainId, batchSize); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to process transfers for chain {ChainId}", chainId); + // Continue processing other chains even if one fails + } + } + + _logger.LogInformation("Token transfer monitoring scan completed successfully"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Critical error in TokenTransferMonitoringWorker"); + } + } + + private async Task ProcessChainTransfers(ITokenTransferMonitoringService monitoringService, + string chainId, int batchSize) + { + _logger.LogInformation("Processing transfers for chain {ChainId}", chainId); + + try + { + var startTime = DateTime.UtcNow; + + // Get transfer events based on time scanning + var transfers = await monitoringService.GetTransfersAsync(chainId); + + // Process transfers and send metrics + if (transfers.Count > 0) + { + monitoringService.ProcessTransfers(transfers); + _logger.LogInformation("Processed {Count} transfers for chain {ChainId}", + transfers.Count, chainId); + } + else + { + _logger.LogInformation("No new transfers found for chain {ChainId}", chainId); + } + + var duration = DateTime.UtcNow - startTime; + + _logger.LogInformation("Completed processing chain {ChainId}: {Count} transfers in {Duration}ms", + chainId, transfers.Count, duration.TotalMilliseconds); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to process transfers for chain {ChainId}", chainId); + throw; // Re-throw to be caught by the caller + } + } +} \ No newline at end of file diff --git a/src/AElfScanServer.Worker.Core/Worker/TransactionIndexWorker.cs b/src/AElfScanServer.Worker.Core/Worker/TransactionIndexWorker.cs index dabd32b5..226c066c 100644 --- a/src/AElfScanServer.Worker.Core/Worker/TransactionIndexWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/TransactionIndexWorker.cs @@ -19,7 +19,6 @@ public TransactionIndexWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceS serviceScopeFactory) { timer.Period = 1000 * 60 * 60; - timer.RunOnStart = true; _logger = logger; _transactionService = transactionService; } diff --git a/src/AElfScanServer.Worker.Core/Worker/TwitterSyncWorker.cs b/src/AElfScanServer.Worker.Core/Worker/TwitterSyncWorker.cs index 4e9f6a41..73808e38 100644 --- a/src/AElfScanServer.Worker.Core/Worker/TwitterSyncWorker.cs +++ b/src/AElfScanServer.Worker.Core/Worker/TwitterSyncWorker.cs @@ -24,7 +24,6 @@ public TwitterSyncWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeF serviceScopeFactory) { timer.Period = workerOptions.CurrentValue.GetWorkerPeriodMinutes(WorkerName) * 60 * 1000; - timer.RunOnStart = true; _logger = logger; _adsService = adsService; } diff --git a/src/AElfScanServer.Worker/AElfScanServerWorkerModule.cs b/src/AElfScanServer.Worker/AElfScanServerWorkerModule.cs index 63a336eb..a900e582 100644 --- a/src/AElfScanServer.Worker/AElfScanServerWorkerModule.cs +++ b/src/AElfScanServer.Worker/AElfScanServerWorkerModule.cs @@ -208,6 +208,7 @@ public void ConfigureEsIndex(IConfiguration configuration) public override void OnApplicationInitialization(ApplicationInitializationContext context) { + context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); @@ -223,13 +224,14 @@ public override void OnApplicationInitialization(ApplicationInitializationContex context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); - context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); context.AddBackgroundWorkerAsync(); + + context.AddBackgroundWorkerAsync(); } } \ No newline at end of file diff --git a/src/AElfScanServer.Worker/appsettings.json b/src/AElfScanServer.Worker/appsettings.json index ec29e970..fcf9f34d 100644 --- a/src/AElfScanServer.Worker/appsettings.json +++ b/src/AElfScanServer.Worker/appsettings.json @@ -203,7 +203,7 @@ "Indexers": { "IndexerInfos": { "TokenIndexer": { - "BaseUrl": "https://gcptest-indexer-api.aefinder.io/api/app/graphql/dailyholderapp" + "BaseUrl": "https://test-indexer-api.aefinder.io/api/app/graphql/dailyholderapp" }, "GenesisIndexer": { "BaseUrl": "https://indexer-api.aefinder.io/api/app/graphql/genesisapp" @@ -261,5 +261,17 @@ "Decompiler": { "Url": "http://127.0.0.1:5566/getfiles" + }, + "TokenTransferMonitoring": { + "EnableMonitoring": true, + "EnableSystemContractFilter":false, + "BlacklistAddresses": [ + "2Ue8S2qAb8dGMrRgcNtjWgBx48Bx3XmNT6UBBKwNLYm5ZpbA"], + "MonitoredTokens": ["ELF", "USDT", "BTC", "ETH"], + "ScanConfig": { + "ChainIds": ["AELF", "tDVW"], + "IntervalSeconds": 30, + "BatchSize": 1000 + } } } \ No newline at end of file