From 64a5b881ea585ff0f24003bd05e5bfeed079f338 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 13 Nov 2025 15:52:23 -0800 Subject: [PATCH 1/9] enable reading events from system collection --- state/process.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/state/process.go b/state/process.go index 90c799a..ca9289e 100644 --- a/state/process.go +++ b/state/process.go @@ -378,11 +378,6 @@ outer: newCounter := 0 transfers := 0 for _, col := range cols { - // TODO(tav): We may want to index events from the system collection - // at some point in the future. - if col.system { - continue - } for idx, txnResult := range col.txnResults { select { case <-ctx.Done(): From 4218c2c7413e924e2c031080f9da3f44e7f219bc Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 14 Nov 2025 09:46:39 -0800 Subject: [PATCH 2/9] add to slow path (when GetTransactionResultsByBlockID has failed) --- state/process.go | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/state/process.go b/state/process.go index ca9289e..21b04b5 100644 --- a/state/process.go +++ b/state/process.go @@ -201,10 +201,7 @@ outer: // the self-sealed root block of a spork (where a ZeroID is used for // the event collection hash). if !(spork.Prev != nil && height == spork.RootBlock) { - // TODO(tav): We check for just one transaction in the system - // collection. If this changes in the future, we will need to - // update the logic here to speculatively fetch more transaction - // results. + // We always retrieve the first transaction of the system collection. txnIndex++ for { select { @@ -225,6 +222,39 @@ outer: col.txnResults = append(col.txnResults, txnResult) break } + if spork.Version >= 8 { + // check if the first tx in the system collection contains events indicating scheduled transactions were run + // We are already on the slow path where GetTransactionsByBlockID/GetTransactionResultsByBlockID has failed + systemTxEvents := col.txnResults[len(col.txnResults)-1].Events + scheduledTxs := 0 + for _, event := range systemTxEvents { + if strings.HasSuffix(event.Type, ".FlowTransactionScheduler.PendingExecution") { + scheduledTxs++ + } + } + for range scheduledTxs { + txnIndex++ + for { + select { + case <-ctx.Done(): + return + default: + } + client := spork.AccessNodes.Client() + txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex)) + if err != nil { + log.Errorf( + "Failed to fetch transaction result at index %d in block %x at height %d: %s", + txnIndex, hash, height, err, + ) + time.Sleep(time.Second) + continue + } + col.txnResults = append(col.txnResults, txnResult) + break + } + } + } } } else { col := &collectionData{} From 24b67c47cb47ddce3a4d976e055e29761521c36d Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 14 Nov 2025 11:04:59 -0800 Subject: [PATCH 3/9] fix for final system transaction --- state/process.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/state/process.go b/state/process.go index 21b04b5..ecac6eb 100644 --- a/state/process.go +++ b/state/process.go @@ -232,7 +232,8 @@ outer: scheduledTxs++ } } - for range scheduledTxs { + // Request scheduled transactions and the final system transaction + for range scheduledTxs + 1 { txnIndex++ for { select { @@ -247,6 +248,10 @@ outer: "Failed to fetch transaction result at index %d in block %x at height %d: %s", txnIndex, hash, height, err, ) + if status.Code(err) == codes.NotFound { + // ensure we don't continuously request a nonexistent transaction + break + } time.Sleep(time.Second) continue } From 4e492f065dd7be356c3a2030049b28b89076ef7f Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:20:54 -0800 Subject: [PATCH 4/9] Update state/process.go Co-authored-by: Jordan Schalm --- state/process.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/process.go b/state/process.go index ecac6eb..6c30193 100644 --- a/state/process.go +++ b/state/process.go @@ -225,7 +225,7 @@ outer: if spork.Version >= 8 { // check if the first tx in the system collection contains events indicating scheduled transactions were run // We are already on the slow path where GetTransactionsByBlockID/GetTransactionResultsByBlockID has failed - systemTxEvents := col.txnResults[len(col.txnResults)-1].Events + systemTxEvents := col.txnResults[0].Events scheduledTxs := 0 for _, event := range systemTxEvents { if strings.HasSuffix(event.Type, ".FlowTransactionScheduler.PendingExecution") { From 3e83242d04a819baf34697ff2f8f07448be0c3e3 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 14 Nov 2025 11:56:58 -0800 Subject: [PATCH 5/9] update context cancellation handling on slow path --- state/process.go | 58 ++++++++++++++++++++---------------------------- 1 file changed, 24 insertions(+), 34 deletions(-) diff --git a/state/process.go b/state/process.go index 6c30193..e8b1b21 100644 --- a/state/process.go +++ b/state/process.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "errors" "strings" "time" @@ -127,19 +128,7 @@ outer: for _, col := range block.CollectionGuarantees { colData := &collectionData{} cols = append(cols, colData) - initialCol := true for { - select { - case <-ctx.Done(): - span.End() - return - default: - } - if initialCol { - initialCol = false - } else { - time.Sleep(time.Second) - } client := spork.AccessNodes.Client() info, err := client.CollectionByID(ctx, col.CollectionId) if err != nil { @@ -147,25 +136,18 @@ outer: "Failed to fetch collection %x in block %x at height %d: %s", col.CollectionId, hash, height, err, ) + if errors.Is(err, context.Canceled) { + span.End() + return + } + time.Sleep(time.Second) continue } tctx := ctx for _, txnHash := range info.TransactionIds { ctx = tctx - initialTxn := true txnIndex++ for { - select { - case <-ctx.Done(): - span.End() - return - default: - } - if initialTxn { - initialTxn = false - } else { - time.Sleep(time.Second) - } client := spork.AccessNodes.Client() info, err := client.Transaction(ctx, txnHash) if err != nil { @@ -173,6 +155,11 @@ outer: "Failed to fetch transaction %x in block %x at height %d: %s", txnHash, hash, height, err, ) + if errors.Is(err, context.Canceled) { + span.End() + return + } + time.Sleep(time.Second) continue } client = spork.AccessNodes.Client() @@ -182,6 +169,11 @@ outer: "Failed to fetch transaction result for %x in block %x at height %d: %s", txnHash, hash, height, err, ) + if errors.Is(err, context.Canceled) { + span.End() + return + } + time.Sleep(time.Second) continue } colData.txns = append(colData.txns, info) @@ -204,11 +196,6 @@ outer: // We always retrieve the first transaction of the system collection. txnIndex++ for { - select { - case <-ctx.Done(): - return - default: - } client := spork.AccessNodes.Client() txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex)) if err != nil { @@ -216,6 +203,10 @@ outer: "Failed to fetch transaction result at index %d in block %x at height %d: %s", txnIndex, hash, height, err, ) + if errors.Is(err, context.Canceled) { + span.End() + return + } time.Sleep(time.Second) continue } @@ -236,11 +227,6 @@ outer: for range scheduledTxs + 1 { txnIndex++ for { - select { - case <-ctx.Done(): - return - default: - } client := spork.AccessNodes.Client() txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex)) if err != nil { @@ -248,6 +234,10 @@ outer: "Failed to fetch transaction result at index %d in block %x at height %d: %s", txnIndex, hash, height, err, ) + if errors.Is(err, context.Canceled) { + span.End() + return + } if status.Code(err) == codes.NotFound { // ensure we don't continuously request a nonexistent transaction break From 28901760f58cac8255bfd16912c6a5b69dc4a398 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 14 Nov 2025 12:26:31 -0800 Subject: [PATCH 6/9] minor fix stop entirely when system collection is shorter than the expected length --- state/process.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/state/process.go b/state/process.go index e8b1b21..2d2e4f6 100644 --- a/state/process.go +++ b/state/process.go @@ -224,6 +224,7 @@ outer: } } // Request scheduled transactions and the final system transaction + fetchSystemTxs: for range scheduledTxs + 1 { txnIndex++ for { @@ -239,8 +240,8 @@ outer: return } if status.Code(err) == codes.NotFound { - // ensure we don't continuously request a nonexistent transaction - break + // no more transactions available + break fetchSystemTxs } time.Sleep(time.Second) continue From 5ee0f16b3a73e9a007da1d35a795b49438121bc0 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 14 Nov 2025 12:46:29 -0800 Subject: [PATCH 7/9] retrieve transaction infos for system collection on slow path --- state/process.go | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/state/process.go b/state/process.go index 2d2e4f6..5155547 100644 --- a/state/process.go +++ b/state/process.go @@ -162,7 +162,11 @@ outer: time.Sleep(time.Second) continue } - client = spork.AccessNodes.Client() + colData.txns = append(colData.txns, info) + break + } + for { + client := spork.AccessNodes.Client() txnResult, err := client.TransactionResult(ctx, hash, uint32(txnIndex)) if err != nil { log.Errorf( @@ -176,7 +180,6 @@ outer: time.Sleep(time.Second) continue } - colData.txns = append(colData.txns, info) colData.txnResults = append(colData.txnResults, txnResult) break } @@ -251,6 +254,27 @@ outer: } } } + // we have the transaction results for the system collection; now fetch transaction infos + for _, result := range col.txnResults { + for { + client := spork.AccessNodes.Client() + info, err := client.Transaction(ctx, result.TransactionId) + if err != nil { + log.Errorf( + "Failed to fetch transaction %x in block %x at height %d: %s", + result.TransactionId, hash, height, err, + ) + if errors.Is(err, context.Canceled) { + span.End() + return + } + time.Sleep(time.Second) + continue + } + col.txns = append(col.txns, info) + break + } + } } } else { col := &collectionData{} From d0701d0fde4fea120b0f8d4675a131fa7bddf64d Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 14 Nov 2025 16:53:17 -0800 Subject: [PATCH 8/9] use defer to end the tracing span --- state/process.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/state/process.go b/state/process.go index 5155547..cf5bd86 100644 --- a/state/process.go +++ b/state/process.go @@ -42,6 +42,7 @@ func (i *Indexer) processBlock(rctx context.Context, spork *config.Spork, height slowPath := false unsealed := false var span trace.Span + defer func() { span.End() }() outer: for { if span != nil { @@ -137,7 +138,6 @@ outer: col.CollectionId, hash, height, err, ) if errors.Is(err, context.Canceled) { - span.End() return } time.Sleep(time.Second) @@ -156,7 +156,6 @@ outer: txnHash, hash, height, err, ) if errors.Is(err, context.Canceled) { - span.End() return } time.Sleep(time.Second) @@ -174,7 +173,6 @@ outer: txnHash, hash, height, err, ) if errors.Is(err, context.Canceled) { - span.End() return } time.Sleep(time.Second) @@ -207,7 +205,6 @@ outer: txnIndex, hash, height, err, ) if errors.Is(err, context.Canceled) { - span.End() return } time.Sleep(time.Second) @@ -239,7 +236,6 @@ outer: txnIndex, hash, height, err, ) if errors.Is(err, context.Canceled) { - span.End() return } if status.Code(err) == codes.NotFound { @@ -265,7 +261,6 @@ outer: result.TransactionId, hash, height, err, ) if errors.Is(err, context.Canceled) { - span.End() return } time.Sleep(time.Second) @@ -431,7 +426,6 @@ outer: for idx, txnResult := range col.txnResults { select { case <-ctx.Done(): - span.End() return default: } From 730a34ea6873632144c02b330a523bb276b57827 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 14 Nov 2025 17:17:23 -0800 Subject: [PATCH 9/9] check grpc status code for cancellation --- state/process.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/state/process.go b/state/process.go index cf5bd86..da8111c 100644 --- a/state/process.go +++ b/state/process.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/hex" - "errors" "strings" "time" @@ -137,7 +136,7 @@ outer: "Failed to fetch collection %x in block %x at height %d: %s", col.CollectionId, hash, height, err, ) - if errors.Is(err, context.Canceled) { + if status.Code(err) == codes.Canceled { return } time.Sleep(time.Second) @@ -155,7 +154,7 @@ outer: "Failed to fetch transaction %x in block %x at height %d: %s", txnHash, hash, height, err, ) - if errors.Is(err, context.Canceled) { + if status.Code(err) == codes.Canceled { return } time.Sleep(time.Second) @@ -172,7 +171,7 @@ outer: "Failed to fetch transaction result for %x in block %x at height %d: %s", txnHash, hash, height, err, ) - if errors.Is(err, context.Canceled) { + if status.Code(err) == codes.Canceled { return } time.Sleep(time.Second) @@ -204,7 +203,7 @@ outer: "Failed to fetch transaction result at index %d in block %x at height %d: %s", txnIndex, hash, height, err, ) - if errors.Is(err, context.Canceled) { + if status.Code(err) == codes.Canceled { return } time.Sleep(time.Second) @@ -235,7 +234,7 @@ outer: "Failed to fetch transaction result at index %d in block %x at height %d: %s", txnIndex, hash, height, err, ) - if errors.Is(err, context.Canceled) { + if status.Code(err) == codes.Canceled { return } if status.Code(err) == codes.NotFound { @@ -260,7 +259,7 @@ outer: "Failed to fetch transaction %x in block %x at height %d: %s", result.TransactionId, hash, height, err, ) - if errors.Is(err, context.Canceled) { + if status.Code(err) == codes.Canceled { return } time.Sleep(time.Second)