From 560127b497c4a4655faf97482d762d812c46a1b9 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 30 Oct 2025 09:23:41 -0400 Subject: [PATCH] add refactor experiment --- app/app.go | 131 ++++------------------ app/workflow.go | 289 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 310 insertions(+), 110 deletions(-) create mode 100644 app/workflow.go diff --git a/app/app.go b/app/app.go index 152bcd4386..9bdb9dfb7b 100644 --- a/app/app.go +++ b/app/app.go @@ -1245,10 +1245,13 @@ func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) if app.EvmKeeper.EthReplayConfig.Enabled || app.EvmKeeper.EthBlockTestConfig.Enabled { return &abci.ResponseFinalizeBlock{}, nil } - cms := app.WriteState() - app.LightInvarianceChecks(cms, app.lightInvarianceConfig) - appHash := app.GetWorkingHash() - resp := app.getFinalizeBlockResponse(appHash, events, txRes, endBlockResp) + // Flush state using the workflow stage + wf := &BlockWorkflow{Ctx: ctx} + if err := app.FlushStateStage(wf); err != nil { + ctx.Logger().Error("FlushStateStage failed in optimistic path", "error", err) + return nil, err + } + resp := app.getFinalizeBlockResponse(wf.AppHash, events, txRes, endBlockResp) return &resp, nil } } @@ -1260,14 +1263,19 @@ func (app *App) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) return nil, processErr } - app.SetDeliverStateToCommit() + // Flush state using the workflow stage (state flush is separate from block processing) + // This prepares state for commit but doesn't perform the actual Commit call + // The actual Commit happens separately via ABCI Commit + wf := &BlockWorkflow{Ctx: ctx} + if err := app.FlushStateStage(wf); err != nil { + ctx.Logger().Error("FlushStateStage failed in FinalizeBlocker", "error", err) + return nil, err + } + if app.EvmKeeper.EthReplayConfig.Enabled || app.EvmKeeper.EthBlockTestConfig.Enabled { return &abci.ResponseFinalizeBlock{}, nil } - cms := app.WriteState() - app.LightInvarianceChecks(cms, app.lightInvarianceConfig) - appHash := app.GetWorkingHash() - resp := app.getFinalizeBlockResponse(appHash, events, txResults, endBlockResp) + resp := app.getFinalizeBlockResponse(wf.AppHash, events, txResults, endBlockResp) return &resp, nil } @@ -1619,108 +1627,11 @@ func (app *App) BuildDependenciesAndRunTxs(ctx sdk.Context, txs [][]byte, typedT return app.ProcessBlockSynchronous(ctx, txs, typedTxs, absoluteTxIndices), ctx } +// ProcessBlock is the main entrypoint for block processing. +// It delegates to ProcessBlockWorkflow which organizes the work into clear stages. +// This method is maintained for backward compatibility. func (app *App) ProcessBlock(ctx sdk.Context, txs [][]byte, req BlockProcessRequest, lastCommit abci.CommitInfo, simulate bool) (events []abci.Event, txResults []*abci.ExecTxResult, endBlockResp abci.ResponseEndBlock, err error) { - defer func() { - if r := recover(); r != nil { - panicMsg := fmt.Sprintf("%v", r) - // Re-panic for upgrade-related panics to allow proper upgrade mechanism - if upgradePanicRe.MatchString(panicMsg) { - ctx.Logger().Error("upgrade panic detected, panicking to trigger upgrade", "panic", r) - panic(r) // Re-panic to trigger upgrade mechanism - } - ctx.Logger().Error("panic recovered in ProcessBlock", "panic", r) - err = fmt.Errorf("ProcessBlock panic: %v", r) - events = nil - txResults = nil - endBlockResp = abci.ResponseEndBlock{} - } - }() - - defer func() { - if !app.httpServerStartSignalSent { - app.httpServerStartSignalSent = true - app.httpServerStartSignal <- struct{}{} - } - if !app.wsServerStartSignalSent { - app.wsServerStartSignalSent = true - app.wsServerStartSignal <- struct{}{} - } - }() - ctx = ctx.WithIsOCCEnabled(app.OccEnabled()) - - events = []abci.Event{} - beginBlockReq := abci.RequestBeginBlock{ - Hash: req.GetHash(), - ByzantineValidators: utils.Map(req.GetByzantineValidators(), func(mis abci.Misbehavior) abci.Evidence { - return abci.Evidence(mis) - }), - LastCommitInfo: abci.LastCommitInfo{ - Round: lastCommit.Round, - Votes: utils.Map(lastCommit.Votes, func(vote abci.VoteInfo) abci.VoteInfo { - return abci.VoteInfo{ - Validator: vote.Validator, - SignedLastBlock: vote.SignedLastBlock, - } - }), - }, - Header: tmproto.Header{ - ChainID: app.ChainID, - Height: req.GetHeight(), - Time: req.GetTime(), - ProposerAddress: ctx.BlockHeader().ProposerAddress, - }, - Simulate: simulate, - } - beginBlockResp := app.BeginBlock(ctx, beginBlockReq) - events = append(events, beginBlockResp.Events...) - - evmTxs := make([]*evmtypes.MsgEVMTransaction, len(txs)) // nil for non-EVM txs - txResults = make([]*abci.ExecTxResult, len(txs)) - typedTxs := app.DecodeTransactionsConcurrently(ctx, txs) - - prioritizedTxs, otherTxs, prioritizedTypedTxs, otherTypedTxs, prioritizedIndices, otherIndices := app.PartitionPrioritizedTxs(ctx, txs, typedTxs) - - // run the prioritized txs - prioritizedResults, ctx := app.ExecuteTxsConcurrently(ctx, prioritizedTxs, prioritizedTypedTxs, prioritizedIndices) - for relativePrioritizedIndex, originalIndex := range prioritizedIndices { - txResults[originalIndex] = prioritizedResults[relativePrioritizedIndex] - evmTxs[originalIndex] = app.GetEVMMsg(prioritizedTypedTxs[relativePrioritizedIndex]) - } - - // Finalize all Bank Module Transfers here so that events are included for prioritiezd txs - deferredWriteEvents := app.BankKeeper.WriteDeferredBalances(ctx) - events = append(events, deferredWriteEvents...) - - midBlockEvents := app.MidBlock(ctx, req.GetHeight()) - events = append(events, midBlockEvents...) - - otherResults, ctx := app.ExecuteTxsConcurrently(ctx, otherTxs, otherTypedTxs, otherIndices) - for relativeOtherIndex, originalIndex := range otherIndices { - txResults[originalIndex] = otherResults[relativeOtherIndex] - evmTxs[originalIndex] = app.GetEVMMsg(otherTypedTxs[relativeOtherIndex]) - } - app.EvmKeeper.SetTxResults(txResults) - app.EvmKeeper.SetMsgs(evmTxs) - - // Finalize all Bank Module Transfers here so that events are included - lazyWriteEvents := app.BankKeeper.WriteDeferredBalances(ctx) - events = append(events, lazyWriteEvents...) - - // Sum up total used per block only for evm transactions - evmTotalGasUsed := int64(0) - for _, txResult := range txResults { - if txResult.EvmTxInfo != nil { - evmTotalGasUsed += txResult.GasUsed - } - } - - endBlockResp = app.EndBlock(ctx, abci.RequestEndBlock{ - Height: req.GetHeight(), - BlockGasUsed: evmTotalGasUsed, - }) - - events = append(events, endBlockResp.Events...) - return events, txResults, endBlockResp, nil + return app.ProcessBlockWorkflow(ctx, txs, req, lastCommit, simulate) } func (app *App) GetEVMMsg(tx sdk.Tx) (res *evmtypes.MsgEVMTransaction) { diff --git a/app/workflow.go b/app/workflow.go new file mode 100644 index 0000000000..d262b13faf --- /dev/null +++ b/app/workflow.go @@ -0,0 +1,289 @@ +package app + +// Block Processing Workflow Architecture +// +// This file defines a workflow-based architecture for block processing that separates +// concerns into clear, testable stages: +// +// 1. IngestStage - Block reception and transaction decoding +// 2. PreProcessStage - Stateless validation and EVM sender recovery +// 3. ExecuteStage - BeginBlock, DeliverTx (prioritized + regular), MidBlock, EndBlock +// 4. PostProcessStage - Event aggregation, gas calculation, state updates +// 5. FlushStateStage - WriteState, invariance checks, prepare for commit +// +// The workflow is orchestrated by ProcessBlockWorkflow, which maintains backward +// compatibility with the existing ProcessBlock method. +// +// Usage: +// - ProcessBlock() automatically uses the workflow (backward compatible) +// - ProcessBlockWorkflow() can be called directly for more control +// - Individual stages can be tested and replaced independently +// - FlushStateStage is typically called after ProcessBlockWorkflow in FinalizeBlocker +// +// Example of replacing a stage: +// - Override IngestStage to add custom decoding logic +// - Override PreProcessStage to add custom validation +// - Each stage returns an error, making error handling clear + +import ( + "fmt" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/sei-protocol/sei-chain/utils" + evmtypes "github.com/sei-protocol/sei-chain/x/evm/types" + abci "github.com/tendermint/tendermint/abci/types" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" +) + +// BlockWorkflow holds the state that flows through the block processing stages +type BlockWorkflow struct { + // Inputs (set at start) + Ctx sdk.Context + Txs [][]byte + Req BlockProcessRequest + LastCommit abci.CommitInfo + Simulate bool + + // Stage outputs + TypedTxs []sdk.Tx // Output from Ingest + EvmTxs []*evmtypes.MsgEVMTransaction // Output from PreProcess + PrioritizedTxs [][]byte // Output from PreProcess + OtherTxs [][]byte // Output from PreProcess + PrioritizedTypedTxs []sdk.Tx // Output from PreProcess + OtherTypedTxs []sdk.Tx // Output from PreProcess + PrioritizedIndices []int // Output from PreProcess + OtherIndices []int // Output from PreProcess + + // Execution outputs + BeginBlockResp abci.ResponseBeginBlock + PrioritizedResults []*abci.ExecTxResult + OtherResults []*abci.ExecTxResult + TxResults []*abci.ExecTxResult + MidBlockEvents []abci.Event + EndBlockResp abci.ResponseEndBlock + + // Final outputs + Events []abci.Event + AppHash []byte +} + +// IngestStage handles block reception and transaction decoding +// This stage decodes raw transaction bytes into typed transactions +func (app *App) IngestStage(wf *BlockWorkflow) error { + wf.Ctx.Logger().Debug("Starting Ingest stage") + + // Decode all transactions concurrently + wf.TypedTxs = app.DecodeTransactionsConcurrently(wf.Ctx, wf.Txs) + wf.EvmTxs = make([]*evmtypes.MsgEVMTransaction, len(wf.Txs)) + + wf.Ctx.Logger().Debug("Completed Ingest stage", "txCount", len(wf.TypedTxs)) + return nil +} + +// PreProcessStage handles stateless validation and EVM sender recovery +// This stage performs validation that doesn't require state access +func (app *App) PreProcessStage(wf *BlockWorkflow) error { + wf.Ctx.Logger().Debug("Starting PreProcess stage") + + // Partition transactions into prioritized and regular + wf.PrioritizedTxs, wf.OtherTxs, wf.PrioritizedTypedTxs, wf.OtherTypedTxs, + wf.PrioritizedIndices, wf.OtherIndices = app.PartitionPrioritizedTxs( + wf.Ctx, wf.Txs, wf.TypedTxs) + + // Prepare EVM transaction tracking arrays + wf.TxResults = make([]*abci.ExecTxResult, len(wf.Txs)) + + wf.Ctx.Logger().Debug("Completed PreProcess stage", + "prioritizedCount", len(wf.PrioritizedTxs), + "otherCount", len(wf.OtherTxs)) + return nil +} + +// ExecuteStage handles transaction execution +// This stage runs BeginBlock, DeliverTx for each transaction, MidBlock, and EndBlock +func (app *App) ExecuteStage(wf *BlockWorkflow) error { + wf.Ctx.Logger().Debug("Starting Execute stage") + + // Prepare BeginBlock request + beginBlockReq := abci.RequestBeginBlock{ + Hash: wf.Req.GetHash(), + ByzantineValidators: utils.Map(wf.Req.GetByzantineValidators(), func(mis abci.Misbehavior) abci.Evidence { + return abci.Evidence(mis) + }), + LastCommitInfo: abci.LastCommitInfo{ + Round: wf.LastCommit.Round, + Votes: utils.Map(wf.LastCommit.Votes, func(vote abci.VoteInfo) abci.VoteInfo { + return abci.VoteInfo{ + Validator: vote.Validator, + SignedLastBlock: vote.SignedLastBlock, + } + }), + }, + Header: tmproto.Header{ + ChainID: app.ChainID, + Height: wf.Req.GetHeight(), + Time: wf.Req.GetTime(), + ProposerAddress: wf.Ctx.BlockHeader().ProposerAddress, + }, + Simulate: wf.Simulate, + } + + // BeginBlock + wf.BeginBlockResp = app.BeginBlock(wf.Ctx, beginBlockReq) + wf.Events = append(wf.Events, wf.BeginBlockResp.Events...) + + // Execute prioritized transactions + if len(wf.PrioritizedTxs) > 0 { + wf.PrioritizedResults, wf.Ctx = app.ExecuteTxsConcurrently( + wf.Ctx, wf.PrioritizedTxs, wf.PrioritizedTypedTxs, wf.PrioritizedIndices) + for relativePrioritizedIndex, originalIndex := range wf.PrioritizedIndices { + wf.TxResults[originalIndex] = wf.PrioritizedResults[relativePrioritizedIndex] + wf.EvmTxs[originalIndex] = app.GetEVMMsg(wf.PrioritizedTypedTxs[relativePrioritizedIndex]) + } + + // Finalize Bank Module Transfers for prioritized txs + deferredWriteEvents := app.BankKeeper.WriteDeferredBalances(wf.Ctx) + wf.Events = append(wf.Events, deferredWriteEvents...) + } + + // MidBlock + wf.MidBlockEvents = app.MidBlock(wf.Ctx, wf.Req.GetHeight()) + wf.Events = append(wf.Events, wf.MidBlockEvents...) + + // Execute other transactions + if len(wf.OtherTxs) > 0 { + wf.OtherResults, wf.Ctx = app.ExecuteTxsConcurrently( + wf.Ctx, wf.OtherTxs, wf.OtherTypedTxs, wf.OtherIndices) + for relativeOtherIndex, originalIndex := range wf.OtherIndices { + wf.TxResults[originalIndex] = wf.OtherResults[relativeOtherIndex] + wf.EvmTxs[originalIndex] = app.GetEVMMsg(wf.OtherTypedTxs[relativeOtherIndex]) + } + + // Finalize Bank Module Transfers for other txs + lazyWriteEvents := app.BankKeeper.WriteDeferredBalances(wf.Ctx) + wf.Events = append(wf.Events, lazyWriteEvents...) + } + + wf.Ctx.Logger().Debug("Completed Execute stage") + return nil +} + +// PostProcessStage handles post-execution tasks like event aggregation and state updates +func (app *App) PostProcessStage(wf *BlockWorkflow) error { + wf.Ctx.Logger().Debug("Starting PostProcess stage") + + // Set EVM keeper state + app.EvmKeeper.SetTxResults(wf.TxResults) + app.EvmKeeper.SetMsgs(wf.EvmTxs) + + // Calculate total EVM gas used + evmTotalGasUsed := int64(0) + for _, txResult := range wf.TxResults { + if txResult.EvmTxInfo != nil { + evmTotalGasUsed += txResult.GasUsed + } + } + + // EndBlock + wf.EndBlockResp = app.EndBlock(wf.Ctx, abci.RequestEndBlock{ + Height: wf.Req.GetHeight(), + BlockGasUsed: evmTotalGasUsed, + }) + wf.Events = append(wf.Events, wf.EndBlockResp.Events...) + + wf.Ctx.Logger().Debug("Completed PostProcess stage") + return nil +} + +// FlushStateStage prepares state for commit (WriteState, but not the actual Commit call) +// The actual Commit call happens separately via ABCI Commit +func (app *App) FlushStateStage(wf *BlockWorkflow) error { + wf.Ctx.Logger().Debug("Starting FlushState stage") + + // Set deliver state to commit state + app.SetDeliverStateToCommit() + + // Write state (prepares state for commit) + cms := app.WriteState() + + // Light invariance checks + app.LightInvarianceChecks(cms, app.lightInvarianceConfig) + + // Get the app hash + wf.AppHash = app.GetWorkingHash() + + wf.Ctx.Logger().Debug("Completed FlushState stage") + return nil +} + +// ProcessBlockWorkflow orchestrates all the stages in order +// This replaces the previous ProcessBlock method with a clearer workflow structure +func (app *App) ProcessBlockWorkflow( + ctx sdk.Context, + txs [][]byte, + req BlockProcessRequest, + lastCommit abci.CommitInfo, + simulate bool, +) (events []abci.Event, txResults []*abci.ExecTxResult, endBlockResp abci.ResponseEndBlock, err error) { + defer func() { + if r := recover(); r != nil { + panicMsg := fmt.Sprintf("%v", r) + // Re-panic for upgrade-related panics to allow proper upgrade mechanism + if upgradePanicRe.MatchString(panicMsg) { + ctx.Logger().Error("upgrade panic detected, panicking to trigger upgrade", "panic", r) + panic(r) // Re-panic to trigger upgrade mechanism + } + ctx.Logger().Error("panic recovered in ProcessBlockWorkflow", "panic", r) + err = fmt.Errorf("ProcessBlockWorkflow panic: %v", r) + events = nil + txResults = nil + endBlockResp = abci.ResponseEndBlock{} + } + }() + + defer func() { + if !app.httpServerStartSignalSent { + app.httpServerStartSignalSent = true + app.httpServerStartSignal <- struct{}{} + } + if !app.wsServerStartSignalSent { + app.wsServerStartSignalSent = true + app.wsServerStartSignal <- struct{}{} + } + }() + + ctx = ctx.WithIsOCCEnabled(app.OccEnabled()) + + // Initialize workflow state + wf := &BlockWorkflow{ + Ctx: ctx, + Txs: txs, + Req: req, + LastCommit: lastCommit, + Simulate: simulate, + Events: []abci.Event{}, + } + + // Execute workflow stages in order + if err := app.IngestStage(wf); err != nil { + return nil, nil, abci.ResponseEndBlock{}, fmt.Errorf("ingest stage failed: %w", err) + } + + if err := app.PreProcessStage(wf); err != nil { + return nil, nil, abci.ResponseEndBlock{}, fmt.Errorf("preprocess stage failed: %w", err) + } + + if err := app.ExecuteStage(wf); err != nil { + return nil, nil, abci.ResponseEndBlock{}, fmt.Errorf("execute stage failed: %w", err) + } + + if err := app.PostProcessStage(wf); err != nil { + return nil, nil, abci.ResponseEndBlock{}, fmt.Errorf("postprocess stage failed: %w", err) + } + + // Note: FlushStateStage is typically called after ProcessBlockWorkflow, + // from FinalizeBlocker. We don't call it here to maintain the existing + // separation between ProcessBlock and state flushing. + + return wf.Events, wf.TxResults, wf.EndBlockResp, nil +}