diff --git a/execution/evm/engine_rpc_client.go b/execution/evm/engine_rpc_client.go new file mode 100644 index 000000000..ec04564aa --- /dev/null +++ b/execution/evm/engine_rpc_client.go @@ -0,0 +1,47 @@ +package evm + +import ( + "context" + + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/rpc" +) + +var _ EngineRPCClient = (*engineRPCClient)(nil) + +// engineRPCClient is the concrete implementation wrapping *rpc.Client. +type engineRPCClient struct { + client *rpc.Client +} + +// NewEngineRPCClient creates a new Engine API client. +func NewEngineRPCClient(client *rpc.Client) EngineRPCClient { + return &engineRPCClient{client: client} +} + +func (e *engineRPCClient) ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) { + var result engine.ForkChoiceResponse + err := e.client.CallContext(ctx, &result, "engine_forkchoiceUpdatedV3", state, args) + if err != nil { + return nil, err + } + return &result, nil +} + +func (e *engineRPCClient) GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) { + var result engine.ExecutionPayloadEnvelope + err := e.client.CallContext(ctx, &result, "engine_getPayloadV4", payloadID) + if err != nil { + return nil, err + } + return &result, nil +} + +func (e *engineRPCClient) NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) { + var result engine.PayloadStatusV1 + err := e.client.CallContext(ctx, &result, "engine_newPayloadV4", payload, blobHashes, parentBeaconBlockRoot, executionRequests) + if err != nil { + return nil, err + } + return &result, nil +} diff --git a/execution/evm/engine_rpc_tracing.go b/execution/evm/engine_rpc_tracing.go new file mode 100644 index 000000000..f5bf09e4b --- /dev/null +++ b/execution/evm/engine_rpc_tracing.go @@ -0,0 +1,122 @@ +package evm + +import ( + "context" + + "github.com/ethereum/go-ethereum/beacon/engine" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +var _ EngineRPCClient = (*tracedEngineRPCClient)(nil) + +// tracedEngineRPCClient wraps an EngineRPCClient and records spans. +type tracedEngineRPCClient struct { + inner EngineRPCClient + tracer trace.Tracer +} + +// withTracingEngineRPCClient decorates an EngineRPCClient with OpenTelemetry spans. +func withTracingEngineRPCClient(inner EngineRPCClient) EngineRPCClient { + return &tracedEngineRPCClient{ + inner: inner, + tracer: otel.Tracer("ev-node/execution/engine-rpc"), + } +} + +func (t *tracedEngineRPCClient) ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) { + ctx, span := t.tracer.Start(ctx, "Engine.ForkchoiceUpdated", + trace.WithAttributes( + attribute.String("method", "engine_forkchoiceUpdatedV3"), + attribute.String("head_block_hash", state.HeadBlockHash.Hex()), + attribute.String("safe_block_hash", state.SafeBlockHash.Hex()), + attribute.String("finalized_block_hash", state.FinalizedBlockHash.Hex()), + ), + ) + defer span.End() + + result, err := t.inner.ForkchoiceUpdated(ctx, state, args) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + attributes := []attribute.KeyValue{ + attribute.String("payload_status", result.PayloadStatus.Status), + } + + if result.PayloadID != nil { + attributes = append(attributes, attribute.String("payload_id", result.PayloadID.String())) + } + + if result.PayloadStatus.LatestValidHash != nil { + attributes = append(attributes, attribute.String("latest_valid_hash", result.PayloadStatus.LatestValidHash.Hex())) + } + + span.SetAttributes( + attributes..., + ) + + return result, nil +} + +func (t *tracedEngineRPCClient) GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) { + ctx, span := t.tracer.Start(ctx, "Engine.GetPayload", + trace.WithAttributes( + attribute.String("method", "engine_getPayloadV4"), + attribute.String("payload_id", payloadID.String()), + ), + ) + defer span.End() + + result, err := t.inner.GetPayload(ctx, payloadID) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int64("block_number", int64(result.ExecutionPayload.Number)), + attribute.String("block_hash", result.ExecutionPayload.BlockHash.Hex()), + attribute.String("state_root", result.ExecutionPayload.StateRoot.Hex()), + attribute.Int("tx_count", len(result.ExecutionPayload.Transactions)), + attribute.Int64("gas_used", int64(result.ExecutionPayload.GasUsed)), + ) + + return result, nil +} + +func (t *tracedEngineRPCClient) NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) { + ctx, span := t.tracer.Start(ctx, "Engine.NewPayload", + trace.WithAttributes( + attribute.String("method", "engine_newPayloadV4"), + attribute.Int64("block_number", int64(payload.Number)), + attribute.String("block_hash", payload.BlockHash.Hex()), + attribute.String("parent_hash", payload.ParentHash.Hex()), + attribute.Int("tx_count", len(payload.Transactions)), + attribute.Int64("gas_used", int64(payload.GasUsed)), + ), + ) + defer span.End() + + result, err := t.inner.NewPayload(ctx, payload, blobHashes, parentBeaconBlockRoot, executionRequests) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + attributes := []attribute.KeyValue{attribute.String("payload_status", result.Status)} + + if result.LatestValidHash != nil { + attributes = append(attributes, attribute.String("latest_valid_hash", result.LatestValidHash.Hex())) + } + + span.SetAttributes(attributes...) + + return result, nil +} diff --git a/execution/evm/eth_rpc_client.go b/execution/evm/eth_rpc_client.go new file mode 100644 index 000000000..8799a8177 --- /dev/null +++ b/execution/evm/eth_rpc_client.go @@ -0,0 +1,30 @@ +package evm + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +type ethRPCClient struct { + client *ethclient.Client +} + +func NewEthRPCClient(client *ethclient.Client) EthRPCClient { + return ðRPCClient{client: client} +} + +func (e *ethRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return e.client.HeaderByNumber(ctx, number) +} + +func (e *ethRPCClient) GetTxs(ctx context.Context) ([]string, error) { + var result []string + err := e.client.Client().CallContext(ctx, &result, "txpoolExt_getTxs") + if err != nil { + return nil, err + } + return result, nil +} diff --git a/execution/evm/eth_rpc_tracing.go b/execution/evm/eth_rpc_tracing.go new file mode 100644 index 000000000..a2842d7f2 --- /dev/null +++ b/execution/evm/eth_rpc_tracing.go @@ -0,0 +1,82 @@ +package evm + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// tracedEthRPCClient wraps an EthRPCClient and records spans for observability. +type tracedEthRPCClient struct { + inner EthRPCClient + tracer trace.Tracer +} + +// withTracingEthRPCClient decorates an EthRPCClient with OpenTelemetry tracing. +func withTracingEthRPCClient(inner EthRPCClient) EthRPCClient { + return &tracedEthRPCClient{ + inner: inner, + tracer: otel.Tracer("ev-node/execution/eth-rpc"), + } +} + +func (t *tracedEthRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + var blockNumber string + if number == nil { + blockNumber = "latest" + } else { + blockNumber = number.String() + } + + ctx, span := t.tracer.Start(ctx, "Eth.GetBlockByNumber", + trace.WithAttributes( + attribute.String("method", "eth_getBlockByNumber"), + attribute.String("block_number", blockNumber), + ), + ) + defer span.End() + + result, err := t.inner.HeaderByNumber(ctx, number) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.String("block_hash", result.Hash().Hex()), + attribute.String("state_root", result.Root.Hex()), + attribute.Int64("gas_limit", int64(result.GasLimit)), + attribute.Int64("gas_used", int64(result.GasUsed)), + attribute.Int64("timestamp", int64(result.Time)), + ) + + return result, nil +} + +func (t *tracedEthRPCClient) GetTxs(ctx context.Context) ([]string, error) { + ctx, span := t.tracer.Start(ctx, "TxPool.GetTxs", + trace.WithAttributes( + attribute.String("method", "txpoolExt_getTxs"), + ), + ) + defer span.End() + + result, err := t.inner.GetTxs(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int("tx_count", len(result)), + ) + + return result, nil +} diff --git a/execution/evm/eth_rpc_tracing_test.go b/execution/evm/eth_rpc_tracing_test.go new file mode 100644 index 000000000..832a03fef --- /dev/null +++ b/execution/evm/eth_rpc_tracing_test.go @@ -0,0 +1,301 @@ +package evm + +import ( + "context" + "errors" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +// setupTestEthRPCTracing creates a traced eth RPC client with an in-memory span recorder +func setupTestEthRPCTracing(t *testing.T, mockClient EthRPCClient) (EthRPCClient, *tracetest.SpanRecorder) { + t.Helper() + + // create in-memory span recorder + sr := tracetest.NewSpanRecorder() + tp := trace.NewTracerProvider( + trace.WithSpanProcessor(sr), + ) + t.Cleanup(func() { + _ = tp.Shutdown(context.Background()) + }) + + // set as global provider for the test + otel.SetTracerProvider(tp) + + // create traced client + traced := withTracingEthRPCClient(mockClient) + + return traced, sr +} + +// mockEthRPCClient is a simple mock for testing +type mockEthRPCClient struct { + headerByNumberFn func(ctx context.Context, number *big.Int) (*types.Header, error) + getTxsFn func(ctx context.Context) ([]string, error) +} + +func (m *mockEthRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + if m.headerByNumberFn != nil { + return m.headerByNumberFn(ctx, number) + } + return nil, nil +} + +func (m *mockEthRPCClient) GetTxs(ctx context.Context) ([]string, error) { + if m.getTxsFn != nil { + return m.getTxsFn(ctx) + } + return nil, nil +} + +func TestTracedEthRPCClient_HeaderByNumber_Success(t *testing.T) { + expectedHeader := &types.Header{ + GasLimit: 30000000, + GasUsed: 15000000, + Time: 1234567890, + } + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + return expectedHeader, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + blockNumber := big.NewInt(100) + + header, err := traced.HeaderByNumber(ctx, blockNumber) + + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + + // verify span was created + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + // verify attributes + attrs := span.Attributes() + requireAttribute(t, attrs, "method", "eth_getBlockByNumber") + requireAttribute(t, attrs, "block_number", "100") + requireAttribute(t, attrs, "block_hash", expectedHeader.Hash().Hex()) + requireAttribute(t, attrs, "state_root", expectedHeader.Root.Hex()) + requireAttribute(t, attrs, "gas_limit", int64(expectedHeader.GasLimit)) + requireAttribute(t, attrs, "gas_used", int64(expectedHeader.GasUsed)) + requireAttribute(t, attrs, "timestamp", int64(expectedHeader.Time)) +} + +func TestTracedEthRPCClient_HeaderByNumber_Latest(t *testing.T) { + expectedHeader := &types.Header{ + GasLimit: 30000000, + GasUsed: 15000000, + Time: 1234567890, + } + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + require.Nil(t, number, "number should be nil for latest block") + return expectedHeader, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + header, err := traced.HeaderByNumber(ctx, nil) + + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + + // verify span + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + + // verify block_number is "latest" when nil + attrs := span.Attributes() + requireAttribute(t, attrs, "block_number", "latest") +} + +func TestTracedEthRPCClient_HeaderByNumber_Error(t *testing.T) { + expectedErr := errors.New("failed to get block header") + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + return nil, expectedErr + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + blockNumber := big.NewInt(100) + + _, err := traced.HeaderByNumber(ctx, blockNumber) + + require.Error(t, err) + require.Equal(t, expectedErr, err) + + // verify span recorded the error + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) + + // verify error event was recorded + events := span.Events() + require.Len(t, events, 1) + require.Equal(t, "exception", events[0].Name) + + // verify block header attributes NOT set on error + attrs := span.Attributes() + for _, attr := range attrs { + key := string(attr.Key) + require.NotEqual(t, "block_hash", key) + require.NotEqual(t, "state_root", key) + require.NotEqual(t, "gas_limit", key) + require.NotEqual(t, "gas_used", key) + } +} + +func TestTracedEthRPCClient_GetTxs_Success(t *testing.T) { + expectedTxs := []string{"0xabcd", "0xef01", "0x2345"} + + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return expectedTxs, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + txs, err := traced.GetTxs(ctx) + + require.NoError(t, err) + require.Equal(t, expectedTxs, txs) + + // verify span was created + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + // verify attributes + attrs := span.Attributes() + requireAttribute(t, attrs, "method", "txpoolExt_getTxs") + requireAttribute(t, attrs, "tx_count", len(expectedTxs)) +} + +func TestTracedEthRPCClient_GetTxs_EmptyPool(t *testing.T) { + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return []string{}, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + txs, err := traced.GetTxs(ctx) + + require.NoError(t, err) + require.Empty(t, txs) + + // verify span + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + + // verify tx_count is 0 + attrs := span.Attributes() + requireAttribute(t, attrs, "tx_count", 0) +} + +func TestTracedEthRPCClient_GetTxs_Error(t *testing.T) { + expectedErr := errors.New("failed to get transactions") + + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return nil, expectedErr + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + _, err := traced.GetTxs(ctx) + + require.Error(t, err) + require.Equal(t, expectedErr, err) + + // verify span recorded the error + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) + + // verify error event was recorded + events := span.Events() + require.Len(t, events, 1) + require.Equal(t, "exception", events[0].Name) + + // verify tx_count NOT set on error + attrs := span.Attributes() + for _, attr := range attrs { + require.NotEqual(t, "tx_count", string(attr.Key)) + } +} + +// requireAttribute is a helper to check span attributes +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} diff --git a/execution/evm/execution.go b/execution/evm/execution.go index 20fc59e51..c310af06d 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -130,13 +130,34 @@ func retryWithBackoffOnPayloadStatus(ctx context.Context, fn func() error, maxRe return fmt.Errorf("max retries (%d) exceeded for %s", maxRetries, operation) } +// EngineRPCClient abstracts Engine API RPC calls for tracing and testing. +type EngineRPCClient interface { + // ForkchoiceUpdated updates the forkchoice state and optionally starts payload building. + ForkchoiceUpdated(ctx context.Context, state engine.ForkchoiceStateV1, args map[string]any) (*engine.ForkChoiceResponse, error) + + // GetPayload retrieves a previously requested execution payload. + GetPayload(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) + + // NewPayload submits a new execution payload for validation. + NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) +} + +// EthRPCClient abstracts Ethereum JSON-RPC calls for tracing and testing. +type EthRPCClient interface { + // HeaderByNumber retrieves a block header by number (nil = latest). + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + + // GetTxs retrieves pending transactions from the transaction pool. + GetTxs(ctx context.Context) ([]string, error) +} + // EngineClient represents a client that interacts with an Ethereum execution engine // through the Engine API. It manages connections to both the engine and standard Ethereum // APIs, and maintains state related to block processing. type EngineClient struct { - engineClient *rpc.Client // Client for Engine API calls - ethClient *ethclient.Client // Client for standard Ethereum API calls - genesisHash common.Hash // Hash of the genesis block + engineClient EngineRPCClient // Client for Engine API calls + ethClient EthRPCClient // Client for standard Ethereum API calls + genesisHash common.Hash // Hash of the genesis block initialHeight uint64 feeRecipient common.Address // Address to receive transaction fees @@ -159,7 +180,7 @@ type EngineClient struct { // execution and crash recovery. The db is wrapped with a prefix to isolate // EVM execution data from other ev-node data. // When tracingEnabled is true, the client will inject W3C trace context headers -// and wrap Engine API calls with OpenTelemetry spans. +// and wrap Engine API and Eth API calls with OpenTelemetry spans. func NewEngineExecutionClient( ethURL, engineURL string, @@ -185,7 +206,7 @@ func NewEngineExecutionClient( if err != nil { return nil, err } - ethClient := ethclient.NewClient(ethRPC) + rawEthClient := ethclient.NewClient(ethRPC) secret, err := decodeSecret(jwtSecret) if err != nil { @@ -206,11 +227,21 @@ func NewEngineExecutionClient( } return nil })) - engineClient, err := rpc.DialOptions(context.Background(), engineURL, engineOptions...) + rawEngineClient, err := rpc.DialOptions(context.Background(), engineURL, engineOptions...) if err != nil { return nil, err } + // wrap raw clients with interfaces + engineClient := NewEngineRPCClient(rawEngineClient) + ethClient := NewEthRPCClient(rawEthClient) + + // if tracing enabled, wrap with traced decorators + if tracingEnabled { + engineClient = withTracingEngineRPCClient(engineClient) + ethClient = withTracingEthRPCClient(ethClient) + } + return &EngineClient{ engineClient: engineClient, ethClient: ethClient, @@ -238,8 +269,7 @@ func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, ini // Acknowledge the genesis block with retry logic for SYNCING status err := retryWithBackoffOnPayloadStatus(ctx, func() error { - var forkchoiceResult engine.ForkChoiceResponse - err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", + forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, engine.ForkchoiceStateV1{ HeadBlockHash: c.genesisHash, SafeBlockHash: c.genesisHash, @@ -279,8 +309,7 @@ func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, ini // GetTxs retrieves transactions from the current execution payload func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) { - var result []string - err := c.ethClient.Client().CallContext(ctx, &result, "txpoolExt_getTxs") + result, err := c.ethClient.GetTxs(ctx) if err != nil { return nil, fmt.Errorf("failed to get tx pool content: %w", err) } @@ -372,8 +401,7 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight // 3. Call forkchoice update to get PayloadID var newPayloadID *engine.PayloadID err = retryWithBackoffOnPayloadStatus(ctx, func() error { - var forkchoiceResult engine.ForkChoiceResponse - err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, evPayloadAttrs) + forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, evPayloadAttrs) if err != nil { return fmt.Errorf("forkchoice update failed: %w", err) } @@ -522,8 +550,7 @@ func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common. func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error { // Call forkchoice update with retry logic for SYNCING status err := retryWithBackoffOnPayloadStatus(ctx, func() error { - var forkchoiceResult engine.ForkChoiceResponse - err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, nil) + forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil) if err != nil { return fmt.Errorf("forkchoice update failed: %w", err) } @@ -774,8 +801,7 @@ func (c *EngineClient) filterTransactions(ctx context.Context, txs [][]byte, blo // processPayload handles the common logic of getting, submitting, and finalizing a payload. func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.PayloadID, txs [][]byte) ([]byte, uint64, error) { // 1. Get Payload - var payloadResult engine.ExecutionPayloadEnvelope - err := c.engineClient.CallContext(ctx, &payloadResult, "engine_getPayloadV4", payloadID) + payloadResult, err := c.engineClient.GetPayload(ctx, payloadID) if err != nil { return nil, 0, fmt.Errorf("get payload failed: %w", err) } @@ -784,9 +810,8 @@ func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.Payl blockTimestamp := int64(payloadResult.ExecutionPayload.Timestamp) // 2. Submit Payload (newPayload) - var newPayloadResult engine.PayloadStatusV1 err = retryWithBackoffOnPayloadStatus(ctx, func() error { - err := c.engineClient.CallContext(ctx, &newPayloadResult, "engine_newPayloadV4", + newPayloadResult, err := c.engineClient.NewPayload(ctx, payloadResult.ExecutionPayload, []string{}, // No blob hashes common.Hash{}.Hex(), // Use zero hash for parentBeaconBlockRoot @@ -796,7 +821,7 @@ func (c *EngineClient) processPayload(ctx context.Context, payloadID engine.Payl return fmt.Errorf("new payload submission failed: %w", err) } - if err := validatePayloadStatus(newPayloadResult); err != nil { + if err := validatePayloadStatus(*newPayloadResult); err != nil { c.logger.Warn(). Str("status", newPayloadResult.Status). Str("latestValidHash", latestValidHashHex(newPayloadResult.LatestValidHash)). diff --git a/execution/evm/go.mod b/execution/evm/go.mod index 8197f36a6..1243b05e5 100644 --- a/execution/evm/go.mod +++ b/execution/evm/go.mod @@ -10,9 +10,20 @@ require ( github.com/ipfs/go-datastore v0.9.0 github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 google.golang.org/protobuf v1.36.10 ) +require ( + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 + go.opentelemetry.io/proto/otlp v1.7.1 // indirect +) + require ( github.com/DataDog/zstd v1.5.5 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect @@ -78,14 +89,6 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect - go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel v1.39.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect - go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/sdk v1.38.0 // indirect - go.opentelemetry.io/otel/trace v1.39.0 // indirect - go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.46.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect