diff --git a/CHANGELOG.md b/CHANGELOG.md index a16720a53d..aca78dcc0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changes - Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286) +- Add Unix domain socket support for gRPC execution endpoints via `unix:///path/to/socket` [#3297](https://github.com/evstack/ev-node/pull/3297) +- **BREAKING:** Replace legacy gRPC execution `txs` payload fields with `tx_batch` so clients and servers use contiguous transaction buffers [#3297](https://github.com/evstack/ev-node/pull/3297) - Optimize metadata writes by making it async in cache store [#3298](https://github.com/evstack/ev-node/pull/3298) - Reduce tx cache retention to avoid OOM under (really) heavy tx load [#3299](https://github.com/evstack/ev-node/pull/3299) diff --git a/apps/grpc/README.md b/apps/grpc/README.md index a2d1f2ae11..356dc0462f 100644 --- a/apps/grpc/README.md +++ b/apps/grpc/README.md @@ -1,13 +1,13 @@ # gRPC Single Sequencer App -This application runs a Evolve node with a single sequencer that connects to a remote execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface. +This application runs an Evolve node with a single sequencer that connects to an execution client via gRPC. It allows you to use any execution layer that implements the Evolve execution gRPC interface. ## Overview The gRPC single sequencer app provides: -- A Evolve consensus node with single sequencer -- Connection to remote execution clients via gRPC +- An Evolve consensus node with single sequencer +- Connection to execution clients via TCP or Unix domain socket gRPC - Full data availability layer integration - P2P networking capabilities @@ -58,11 +58,20 @@ Start the Evolve node with: --da.auth-token your-da-token ``` +For a same-machine executor, use a Unix domain socket endpoint: + +```bash +./evgrpc start \ + --root-dir ~/.evgrpc \ + --grpc-executor-url unix:///tmp/evolve-executor.sock \ + --da.address http://localhost:7980 +``` + ## Command-Line Flags ### gRPC-specific Flags -- `--grpc-executor-url`: URL of the gRPC execution service (default: `http://localhost:50051`) +- `--grpc-executor-url`: URL of the gRPC execution service, either `http://host:port` or `unix:///path/to/socket` (default: `http://localhost:50051`) ### Common Evolve Flags diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 22ca71f587..026cfcd0a6 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -28,7 +28,7 @@ import ( const ( grpcDbName = "grpc-single" - // FlagGrpcExecutorURL is the flag for the gRPC executor endpoint + // FlagGrpcExecutorURL is the flag for the gRPC executor endpoint. FlagGrpcExecutorURL = "grpc-executor-url" ) @@ -163,11 +163,10 @@ func createGRPCExecutionClient(cmd *cobra.Command) (execution.Executor, error) { return nil, fmt.Errorf("%s flag is required", FlagGrpcExecutorURL) } - // Create and return the gRPC client - return executiongrpc.NewClient(executorURL), nil + return executiongrpc.NewClient(executorURL) } // addGRPCFlags adds flags specific to the gRPC execution client func addGRPCFlags(cmd *cobra.Command) { - cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service") + cmd.Flags().String(FlagGrpcExecutorURL, "http://localhost:50051", "URL of the gRPC execution service, or unix:///path/to/executor.sock") } diff --git a/buf.yaml b/buf.yaml index bf7debf7cc..c558c32114 100644 --- a/buf.yaml +++ b/buf.yaml @@ -14,3 +14,6 @@ lint: breaking: use: - FILE + ignore_only: + FIELD_NO_DELETE: + - proto/evnode/v1/execution.proto diff --git a/client/crates/types/src/proto/evnode.v1.messages.rs b/client/crates/types/src/proto/evnode.v1.messages.rs index 019046d0b7..c26ec4f2f1 100644 --- a/client/crates/types/src/proto/evnode.v1.messages.rs +++ b/client/crates/types/src/proto/evnode.v1.messages.rs @@ -76,6 +76,19 @@ pub struct SignedHeader { #[prost(message, optional, tag = "3")] pub signer: ::core::option::Option, } +/// DAHeaderEnvelope is a wrapper around SignedHeader for DA submission. +/// It is binary compatible with SignedHeader (fields 1-3) but adds an envelope signature. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DaHeaderEnvelope { + #[prost(message, optional, tag = "1")] + pub header: ::core::option::Option
, + #[prost(bytes = "vec", tag = "2")] + pub signature: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub signer: ::core::option::Option, + #[prost(bytes = "vec", tag = "4")] + pub envelope_signature: ::prost::alloc::vec::Vec, +} /// Signer is a signer of a block in the blockchain. #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct Signer { @@ -139,6 +152,28 @@ pub struct Vote { #[prost(bytes = "vec", tag = "5")] pub validator_address: ::prost::alloc::vec::Vec, } +/// P2PSignedHeader +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct P2pSignedHeader { + #[prost(message, optional, tag = "1")] + pub header: ::core::option::Option
, + #[prost(bytes = "vec", tag = "2")] + pub signature: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub signer: ::core::option::Option, + #[prost(uint64, optional, tag = "4")] + pub da_height_hint: ::core::option::Option, +} +/// P2PData +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct P2pData { + #[prost(message, optional, tag = "1")] + pub metadata: ::core::option::Option, + #[prost(bytes = "vec", repeated, tag = "2")] + pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[prost(uint64, optional, tag = "3")] + pub da_height_hint: ::core::option::Option, +} /// State is the state of the blockchain. #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct State { @@ -159,6 +194,24 @@ pub struct State { #[prost(bytes = "vec", tag = "9")] pub last_header_hash: ::prost::alloc::vec::Vec, } +/// RaftBlockState represents a replicated block state +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RaftBlockState { + #[prost(uint64, tag = "1")] + pub height: u64, + #[prost(uint64, tag = "2")] + pub last_submitted_da_header_height: u64, + #[prost(uint64, tag = "3")] + pub last_submitted_da_data_height: u64, + #[prost(bytes = "vec", tag = "4")] + pub hash: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "5")] + pub timestamp: u64, + #[prost(bytes = "vec", tag = "6")] + pub header: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "7")] + pub data: ::prost::alloc::vec::Vec, +} /// SequencerDACheckpoint tracks the position in the DA where transactions were last processed #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct SequencerDaCheckpoint { @@ -212,6 +265,17 @@ pub struct Batch { #[prost(bytes = "vec", repeated, tag = "1")] pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, } +/// BlockData contains data retrieved from a single DA height. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct BlockData { + #[prost(uint64, tag = "1")] + pub height: u64, + /// Unix timestamp in nanoseconds + #[prost(int64, tag = "2")] + pub timestamp: i64, + #[prost(bytes = "vec", repeated, tag = "3")] + pub blobs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} /// InitChainRequest contains the genesis parameters for chain initialization #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct InitChainRequest { @@ -231,28 +295,32 @@ pub struct InitChainResponse { /// Hash representing initial state #[prost(bytes = "vec", tag = "1")] pub state_root: ::prost::alloc::vec::Vec, - /// Maximum allowed bytes for transactions in a block - #[prost(uint64, tag = "2")] - pub max_bytes: u64, } /// GetTxsRequest is the request for fetching transactions /// /// Empty for now, may include filtering criteria in the future #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetTxsRequest {} +/// TxBatch stores ordered transactions in one contiguous bytes buffer. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TxBatch { + /// Concatenated transaction bytes. + #[prost(bytes = "vec", tag = "1")] + pub data: ::prost::alloc::vec::Vec, + /// Byte length for each transaction in order. + #[prost(uint32, repeated, tag = "2")] + pub tx_sizes: ::prost::alloc::vec::Vec, +} /// GetTxsResponse contains the available transactions #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetTxsResponse { - /// Slice of valid transactions from mempool - #[prost(bytes = "vec", repeated, tag = "1")] - pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Valid transactions from mempool in contiguous batch form. + #[prost(message, optional, tag = "2")] + pub tx_batch: ::core::option::Option, } /// ExecuteTxsRequest contains transactions and block context for execution #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ExecuteTxsRequest { - /// Ordered list of transactions to execute - #[prost(bytes = "vec", repeated, tag = "1")] - pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, /// Height of block being created (must be > 0) #[prost(uint64, tag = "2")] pub block_height: u64, @@ -262,6 +330,9 @@ pub struct ExecuteTxsRequest { /// Previous block's state root hash #[prost(bytes = "vec", tag = "4")] pub prev_state_root: ::prost::alloc::vec::Vec, + /// Ordered transactions to execute in contiguous batch form. + #[prost(message, optional, tag = "5")] + pub tx_batch: ::core::option::Option, } /// ExecuteTxsResponse contains the result of transaction execution #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -285,6 +356,73 @@ pub struct SetFinalRequest { /// Empty response, errors are returned via gRPC status #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct SetFinalResponse {} +/// GetExecutionInfoRequest requests execution layer parameters +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetExecutionInfoRequest {} +/// GetExecutionInfoResponse contains execution layer parameters +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetExecutionInfoResponse { + /// Maximum gas allowed for transactions in a block + /// For non-gas-based execution layers, this should be 0 + #[prost(uint64, tag = "1")] + pub max_gas: u64, +} +/// FilterTxsRequest contains transactions to validate and filter +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct FilterTxsRequest { + /// Maximum cumulative size allowed (0 means no size limit) + #[prost(uint64, tag = "2")] + pub max_bytes: u64, + /// Maximum cumulative gas allowed (0 means no gas limit) + #[prost(uint64, tag = "3")] + pub max_gas: u64, + /// Whether force-included transactions are present + #[prost(bool, tag = "4")] + pub has_force_included_transaction: bool, + /// All transactions (force-included + mempool) in contiguous batch form. + #[prost(message, optional, tag = "5")] + pub tx_batch: ::core::option::Option, +} +/// FilterTxsResponse contains the filter status for each transaction +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct FilterTxsResponse { + /// Filter status for each transaction (same length as txs in request) + #[prost(enumeration = "FilterStatus", repeated, tag = "1")] + pub statuses: ::prost::alloc::vec::Vec, +} +/// FilterStatus represents the result of filtering a transaction +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum FilterStatus { + /// Transaction will make it to the next batch + FilterOk = 0, + /// Transaction will be filtered out because invalid (too big, malformed, etc.) + FilterRemove = 1, + /// Transaction is valid but postponed for later processing due to size/gas constraint + FilterPostpone = 2, +} +impl FilterStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::FilterOk => "FILTER_OK", + Self::FilterRemove => "FILTER_REMOVE", + Self::FilterPostpone => "FILTER_POSTPONE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "FILTER_OK" => Some(Self::FilterOk), + "FILTER_REMOVE" => Some(Self::FilterRemove), + "FILTER_POSTPONE" => Some(Self::FilterPostpone), + _ => None, + } + } +} /// Block contains all the components of a complete block #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct Block { diff --git a/client/crates/types/src/proto/evnode.v1.services.rs b/client/crates/types/src/proto/evnode.v1.services.rs index b34ae918b7..c2bfa6f7c6 100644 --- a/client/crates/types/src/proto/evnode.v1.services.rs +++ b/client/crates/types/src/proto/evnode.v1.services.rs @@ -450,6 +450,19 @@ pub struct SignedHeader { #[prost(message, optional, tag = "3")] pub signer: ::core::option::Option, } +/// DAHeaderEnvelope is a wrapper around SignedHeader for DA submission. +/// It is binary compatible with SignedHeader (fields 1-3) but adds an envelope signature. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DaHeaderEnvelope { + #[prost(message, optional, tag = "1")] + pub header: ::core::option::Option
, + #[prost(bytes = "vec", tag = "2")] + pub signature: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub signer: ::core::option::Option, + #[prost(bytes = "vec", tag = "4")] + pub envelope_signature: ::prost::alloc::vec::Vec, +} /// Signer is a signer of a block in the blockchain. #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct Signer { @@ -513,6 +526,28 @@ pub struct Vote { #[prost(bytes = "vec", tag = "5")] pub validator_address: ::prost::alloc::vec::Vec, } +/// P2PSignedHeader +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct P2pSignedHeader { + #[prost(message, optional, tag = "1")] + pub header: ::core::option::Option
, + #[prost(bytes = "vec", tag = "2")] + pub signature: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub signer: ::core::option::Option, + #[prost(uint64, optional, tag = "4")] + pub da_height_hint: ::core::option::Option, +} +/// P2PData +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct P2pData { + #[prost(message, optional, tag = "1")] + pub metadata: ::core::option::Option, + #[prost(bytes = "vec", repeated, tag = "2")] + pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + #[prost(uint64, optional, tag = "3")] + pub da_height_hint: ::core::option::Option, +} /// State is the state of the blockchain. #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct State { @@ -533,6 +568,24 @@ pub struct State { #[prost(bytes = "vec", tag = "9")] pub last_header_hash: ::prost::alloc::vec::Vec, } +/// RaftBlockState represents a replicated block state +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RaftBlockState { + #[prost(uint64, tag = "1")] + pub height: u64, + #[prost(uint64, tag = "2")] + pub last_submitted_da_header_height: u64, + #[prost(uint64, tag = "3")] + pub last_submitted_da_data_height: u64, + #[prost(bytes = "vec", tag = "4")] + pub hash: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "5")] + pub timestamp: u64, + #[prost(bytes = "vec", tag = "6")] + pub header: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "7")] + pub data: ::prost::alloc::vec::Vec, +} /// SequencerDACheckpoint tracks the position in the DA where transactions were last processed #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct SequencerDaCheckpoint { @@ -957,6 +1010,17 @@ pub struct Batch { #[prost(bytes = "vec", repeated, tag = "1")] pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, } +/// BlockData contains data retrieved from a single DA height. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct BlockData { + #[prost(uint64, tag = "1")] + pub height: u64, + /// Unix timestamp in nanoseconds + #[prost(int64, tag = "2")] + pub timestamp: i64, + #[prost(bytes = "vec", repeated, tag = "3")] + pub blobs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +} /// InitChainRequest contains the genesis parameters for chain initialization #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct InitChainRequest { @@ -976,28 +1040,32 @@ pub struct InitChainResponse { /// Hash representing initial state #[prost(bytes = "vec", tag = "1")] pub state_root: ::prost::alloc::vec::Vec, - /// Maximum allowed bytes for transactions in a block - #[prost(uint64, tag = "2")] - pub max_bytes: u64, } /// GetTxsRequest is the request for fetching transactions /// /// Empty for now, may include filtering criteria in the future #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetTxsRequest {} +/// TxBatch stores ordered transactions in one contiguous bytes buffer. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TxBatch { + /// Concatenated transaction bytes. + #[prost(bytes = "vec", tag = "1")] + pub data: ::prost::alloc::vec::Vec, + /// Byte length for each transaction in order. + #[prost(uint32, repeated, tag = "2")] + pub tx_sizes: ::prost::alloc::vec::Vec, +} /// GetTxsResponse contains the available transactions #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetTxsResponse { - /// Slice of valid transactions from mempool - #[prost(bytes = "vec", repeated, tag = "1")] - pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, + /// Valid transactions from mempool in contiguous batch form. + #[prost(message, optional, tag = "2")] + pub tx_batch: ::core::option::Option, } /// ExecuteTxsRequest contains transactions and block context for execution #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ExecuteTxsRequest { - /// Ordered list of transactions to execute - #[prost(bytes = "vec", repeated, tag = "1")] - pub txs: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, /// Height of block being created (must be > 0) #[prost(uint64, tag = "2")] pub block_height: u64, @@ -1007,6 +1075,9 @@ pub struct ExecuteTxsRequest { /// Previous block's state root hash #[prost(bytes = "vec", tag = "4")] pub prev_state_root: ::prost::alloc::vec::Vec, + /// Ordered transactions to execute in contiguous batch form. + #[prost(message, optional, tag = "5")] + pub tx_batch: ::core::option::Option, } /// ExecuteTxsResponse contains the result of transaction execution #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -1030,6 +1101,73 @@ pub struct SetFinalRequest { /// Empty response, errors are returned via gRPC status #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct SetFinalResponse {} +/// GetExecutionInfoRequest requests execution layer parameters +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetExecutionInfoRequest {} +/// GetExecutionInfoResponse contains execution layer parameters +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetExecutionInfoResponse { + /// Maximum gas allowed for transactions in a block + /// For non-gas-based execution layers, this should be 0 + #[prost(uint64, tag = "1")] + pub max_gas: u64, +} +/// FilterTxsRequest contains transactions to validate and filter +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct FilterTxsRequest { + /// Maximum cumulative size allowed (0 means no size limit) + #[prost(uint64, tag = "2")] + pub max_bytes: u64, + /// Maximum cumulative gas allowed (0 means no gas limit) + #[prost(uint64, tag = "3")] + pub max_gas: u64, + /// Whether force-included transactions are present + #[prost(bool, tag = "4")] + pub has_force_included_transaction: bool, + /// All transactions (force-included + mempool) in contiguous batch form. + #[prost(message, optional, tag = "5")] + pub tx_batch: ::core::option::Option, +} +/// FilterTxsResponse contains the filter status for each transaction +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct FilterTxsResponse { + /// Filter status for each transaction (same length as txs in request) + #[prost(enumeration = "FilterStatus", repeated, tag = "1")] + pub statuses: ::prost::alloc::vec::Vec, +} +/// FilterStatus represents the result of filtering a transaction +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum FilterStatus { + /// Transaction will make it to the next batch + FilterOk = 0, + /// Transaction will be filtered out because invalid (too big, malformed, etc.) + FilterRemove = 1, + /// Transaction is valid but postponed for later processing due to size/gas constraint + FilterPostpone = 2, +} +impl FilterStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::FilterOk => "FILTER_OK", + Self::FilterRemove => "FILTER_REMOVE", + Self::FilterPostpone => "FILTER_POSTPONE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "FILTER_OK" => Some(Self::FilterOk), + "FILTER_REMOVE" => Some(Self::FilterRemove), + "FILTER_POSTPONE" => Some(Self::FilterPostpone), + _ => None, + } + } +} /// Generated client implementations. pub mod executor_service_client { #![allow( @@ -1219,6 +1357,58 @@ pub mod executor_service_client { .insert(GrpcMethod::new("evnode.v1.ExecutorService", "SetFinal")); self.inner.unary(req, path, codec).await } + /// GetExecutionInfo returns current execution layer parameters + pub async fn get_execution_info( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/evnode.v1.ExecutorService/GetExecutionInfo", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("evnode.v1.ExecutorService", "GetExecutionInfo"), + ); + self.inner.unary(req, path, codec).await + } + /// FilterTxs validates force-included transactions and calculates gas for all transactions + pub async fn filter_txs( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/evnode.v1.ExecutorService/FilterTxs", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("evnode.v1.ExecutorService", "FilterTxs")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -1263,6 +1453,22 @@ pub mod executor_service_server { tonic::Response, tonic::Status, >; + /// GetExecutionInfo returns current execution layer parameters + async fn get_execution_info( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// FilterTxs validates force-included transactions and calculates gas for all transactions + async fn filter_txs( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// ExecutorService defines the execution layer interface for EVNode #[derive(Debug)] @@ -1521,6 +1727,97 @@ pub mod executor_service_server { }; Box::pin(fut) } + "/evnode.v1.ExecutorService/GetExecutionInfo" => { + #[allow(non_camel_case_types)] + struct GetExecutionInfoSvc(pub Arc); + impl< + T: ExecutorService, + > tonic::server::UnaryService + for GetExecutionInfoSvc { + type Response = super::GetExecutionInfoResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_execution_info(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetExecutionInfoSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/evnode.v1.ExecutorService/FilterTxs" => { + #[allow(non_camel_case_types)] + struct FilterTxsSvc(pub Arc); + impl< + T: ExecutorService, + > tonic::server::UnaryService + for FilterTxsSvc { + type Response = super::FilterTxsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::filter_txs(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = FilterTxsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/execution/grpc/README.md b/execution/grpc/README.md index 8647b2ad90..4b3742bc4a 100644 --- a/execution/grpc/README.md +++ b/execution/grpc/README.md @@ -1,10 +1,10 @@ # gRPC Execution Client -This package provides a gRPC-based implementation of the Evolve execution interface. It allows Evolve to communicate with remote execution clients via gRPC using the Connect-RPC framework. +This package provides a gRPC-based implementation of the Evolve execution interface. It allows Evolve to communicate with execution clients via gRPC using the Connect-RPC framework. ## Overview -The gRPC execution client enables separation between the consensus layer (Evolve) and the execution layer by providing a network interface for communication. This allows execution clients to run in separate processes or even on different machines. +The gRPC execution client enables separation between the consensus layer (Evolve) and the execution layer by providing a process boundary for communication. Execution clients can run on different machines over TCP, or on the same machine over a Unix domain socket to avoid TCP/IP overhead. ## Usage @@ -17,12 +17,21 @@ import ( "github.com/evstack/ev-node/execution/grpc" ) -// Create a new gRPC client -client := grpc.NewClient("http://localhost:50051") +// Create a new gRPC client over TCP +client, err := grpc.NewClient("http://localhost:50051") +if err != nil { + return err +} + +// Or connect to an executor on the same machine over a Unix domain socket +client, err = grpc.NewClient("unix:///tmp/evolve-executor.sock") +if err != nil { + return err +} // Use the client as an execution.Executor ctx := context.Background() -stateRoot, maxBytes, err := client.InitChain(ctx, time.Now(), 1, "my-chain") +stateRoot, err := client.InitChain(ctx, time.Now(), 1, "my-chain") ``` ### Server @@ -42,6 +51,14 @@ handler := grpc.NewExecutorServiceHandler(myExecutor) http.ListenAndServe(":50051", handler) ``` +To serve on a Unix domain socket: + +```go +import "github.com/evstack/ev-node/execution/grpc" + +err := grpc.ListenAndServeUnix("/tmp/evolve-executor.sock", myExecutor) +``` + ## Protocol The gRPC service is defined in `proto/evnode/v1/execution.proto` and provides the following methods: @@ -50,13 +67,17 @@ The gRPC service is defined in `proto/evnode/v1/execution.proto` and provides th - `GetTxs`: Fetch transactions from the mempool - `ExecuteTxs`: Execute transactions and update state - `SetFinal`: Mark a block as finalized +- `GetExecutionInfo`: Return current execution limits +- `FilterTxs`: Validate and filter force-included transactions ## Features - Full implementation of the `execution.Executor` interface - Support for HTTP/1.1 and HTTP/2 (via h2c) +- Support for Unix domain socket connections with `unix:///path/to/socket` - gRPC reflection for debugging and service discovery - Compression for efficient data transfer +- Contiguous `tx_batch` transaction encoding to reduce per-transaction protobuf overhead - Comprehensive error handling and validation ## Testing diff --git a/execution/grpc/client.go b/execution/grpc/client.go index efb9d2f840..cfcb6612f0 100644 --- a/execution/grpc/client.go +++ b/execution/grpc/client.go @@ -3,9 +3,11 @@ package grpc import ( "context" "crypto/tls" + "errors" "fmt" "net" "net/http" + "strings" "time" "connectrpc.com/connect" @@ -26,6 +28,11 @@ type Client struct { client v1connect.ExecutorServiceClient } +const ( + unixURLPrefix = "unix://" + unixHTTPBaseURL = "http://unix" +) + // newHTTP2Client creates an HTTP/2 client that supports cleartext (h2c) connections. // This is required to connect to native gRPC servers without TLS. func newHTTP2Client() *http.Client { @@ -40,24 +47,64 @@ func newHTTP2Client() *http.Client { } } +// newUnixHTTP2Client creates an HTTP/2 client that speaks h2c over a Unix domain socket. +func newUnixHTTP2Client(socketPath string) (*http.Client, error) { + if socketPath == "" { + return nil, errors.New("unix socket path is required") + } + return &http.Client{ + Transport: &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, _, _ string, _ *tls.Config) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", socketPath) + }, + }, + }, nil +} + +func clientTransportForTarget(target string) (*http.Client, string, error) { + socketPath, ok := unixSocketPath(target) + if ok { + httpClient, err := newUnixHTTP2Client(socketPath) + if err != nil { + return nil, "", err + } + return httpClient, unixHTTPBaseURL, nil + } + return newHTTP2Client(), target, nil +} + +func unixSocketPath(target string) (string, bool) { + if !strings.HasPrefix(target, unixURLPrefix) { + return "", false + } + return strings.TrimPrefix(target, unixURLPrefix), true +} + // NewClient creates a new gRPC execution client. // // Parameters: -// - url: The URL of the gRPC server (e.g., "http://localhost:50051") +// - url: The URL of the gRPC server (e.g., "http://localhost:50051" or "unix:///tmp/executor.sock") // - opts: Optional Connect client options for configuring the connection // // Returns: // - *Client: The initialized gRPC client -func NewClient(url string, opts ...connect.ClientOption) *Client { +// - error: Any client construction error +func NewClient(url string, opts ...connect.ClientOption) (*Client, error) { // Prepend WithGRPC to use the native gRPC protocol (required for tonic/gRPC servers) opts = append([]connect.ClientOption{connect.WithGRPC()}, opts...) + httpClient, targetURL, err := clientTransportForTarget(url) + if err != nil { + return nil, err + } return &Client{ client: v1connect.NewExecutorServiceClient( - newHTTP2Client(), - url, + httpClient, + targetURL, opts..., ), - } + }, nil } // InitChain initializes a new blockchain instance with genesis parameters. @@ -91,7 +138,12 @@ func (c *Client) GetTxs(ctx context.Context) ([][]byte, error) { return nil, fmt.Errorf("grpc client: failed to get txs: %w", err) } - return resp.Msg.Txs, nil + txs, err := decodeTxBatch(resp.Msg.TxBatch) + if err != nil { + return nil, fmt.Errorf("grpc client: invalid get txs response: %w", err) + } + + return txs, nil } // ExecuteTxs processes transactions to produce a new block state. @@ -100,8 +152,13 @@ func (c *Client) GetTxs(ctx context.Context) ([][]byte, error) { // returns the updated state root after execution. The execution service ensures // deterministic execution and validates the state transition. func (c *Client) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, err error) { + txBatch, err := encodeTxBatch(txs) + if err != nil { + return nil, fmt.Errorf("grpc client: failed to encode tx batch: %w", err) + } + req := connect.NewRequest(&pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: txBatch, BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, @@ -154,8 +211,13 @@ func (c *Client) GetExecutionInfo(ctx context.Context) (execution.ExecutionInfo, // This method sends transactions to the remote execution service for validation. // Returns a slice of FilterStatus for each transaction. func (c *Client) FilterTxs(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) { + txBatch, err := encodeTxBatch(txs) + if err != nil { + return nil, fmt.Errorf("grpc client: failed to encode tx batch: %w", err) + } + req := connect.NewRequest(&pb.FilterTxsRequest{ - Txs: txs, + TxBatch: txBatch, MaxBytes: maxBytes, MaxGas: maxGas, HasForceIncludedTransaction: hasForceIncludedTransaction, diff --git a/execution/grpc/client_test.go b/execution/grpc/client_test.go index 59ec6416ff..4c0da44f1b 100644 --- a/execution/grpc/client_test.go +++ b/execution/grpc/client_test.go @@ -2,7 +2,11 @@ package grpc import ( "context" + "errors" + "net" + "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -66,6 +70,16 @@ func (m *mockExecutor) FilterTxs(ctx context.Context, txs [][]byte, maxBytes, ma return result, nil } +func newTestClient(t *testing.T, url string) *Client { + t.Helper() + + client, err := NewClient(url) + if err != nil { + t.Fatalf("new client: %v", err) + } + return client +} + func TestClient_InitChain(t *testing.T) { ctx := context.Background() expectedStateRoot := []byte("test_state_root") @@ -94,7 +108,7 @@ func TestClient_InitChain(t *testing.T) { defer server.Close() // Create client - client := NewClient(server.URL) + client := newTestClient(t, server.URL) // Test InitChain stateRoot, err := client.InitChain(ctx, genesisTime, initialHeight, chainID) @@ -123,7 +137,7 @@ func TestClient_GetTxs(t *testing.T) { defer server.Close() // Create client - client := NewClient(server.URL) + client := newTestClient(t, server.URL) // Test GetTxs txs, err := client.GetTxs(ctx) @@ -174,7 +188,7 @@ func TestClient_ExecuteTxs(t *testing.T) { defer server.Close() // Create client - client := NewClient(server.URL) + client := newTestClient(t, server.URL) // Test ExecuteTxs stateRoot, err := client.ExecuteTxs(ctx, txs, blockHeight, timestamp, prevStateRoot) @@ -206,7 +220,7 @@ func TestClient_SetFinal(t *testing.T) { defer server.Close() // Create client - client := NewClient(server.URL) + client := newTestClient(t, server.URL) // Test SetFinal err := client.SetFinal(ctx, blockHeight) @@ -214,3 +228,130 @@ func TestClient_SetFinal(t *testing.T) { t.Fatalf("unexpected error: %v", err) } } + +func TestClient_FilterTxs(t *testing.T) { + ctx := context.Background() + txs := [][]byte{[]byte("tx1"), []byte{}, []byte("tx3")} + maxBytes := uint64(100) + maxGas := uint64(200) + hasForced := true + expectedStatuses := []execution.FilterStatus{ + execution.FilterOK, + execution.FilterRemove, + execution.FilterPostpone, + } + + mockExec := &mockExecutor{ + filterTxsFunc: func(ctx context.Context, txsIn [][]byte, mb, mg uint64, forced bool) ([]execution.FilterStatus, error) { + if len(txsIn) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(txsIn)) + } + for i, tx := range txsIn { + if string(tx) != string(txs[i]) { + t.Fatalf("tx %d: expected %q, got %q", i, txs[i], tx) + } + } + if mb != maxBytes { + t.Fatalf("expected max bytes %d, got %d", maxBytes, mb) + } + if mg != maxGas { + t.Fatalf("expected max gas %d, got %d", maxGas, mg) + } + if forced != hasForced { + t.Fatalf("expected forced=%t, got %t", hasForced, forced) + } + return expectedStatuses, nil + }, + } + + handler := NewExecutorServiceHandler(mockExec) + server := httptest.NewServer(handler) + defer server.Close() + + client := newTestClient(t, server.URL) + + statuses, err := client.FilterTxs(ctx, txs, maxBytes, maxGas, hasForced) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(statuses) != len(expectedStatuses) { + t.Fatalf("expected %d statuses, got %d", len(expectedStatuses), len(statuses)) + } + for i, status := range statuses { + if status != expectedStatuses[i] { + t.Fatalf("status %d: expected %v, got %v", i, expectedStatuses[i], status) + } + } +} + +func TestClient_UnixSocket(t *testing.T) { + ctx := context.Background() + socketPath := testUnixSocketPath(t) + expectedTxs := [][]byte{[]byte("tx1"), []byte("tx2")} + + mockExec := &mockExecutor{ + getTxsFunc: func(ctx context.Context) ([][]byte, error) { + return expectedTxs, nil + }, + } + + startUnixTestServer(t, mockExec, socketPath) + + client := newTestClient(t, "unix://"+socketPath) + txs, err := client.GetTxs(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(txs) != len(expectedTxs) { + t.Fatalf("expected %d txs, got %d", len(expectedTxs), len(txs)) + } + for i, tx := range txs { + if string(tx) != string(expectedTxs[i]) { + t.Fatalf("tx %d: expected %q, got %q", i, expectedTxs[i], tx) + } + } +} + +func TestNewClientRejectsEmptyUnixSocketPath(t *testing.T) { + client, err := NewClient("unix://") + if err == nil { + t.Fatalf("expected empty unix socket path error") + } + if client != nil { + t.Fatalf("expected nil client, got %v", client) + } + if !strings.Contains(err.Error(), "unix socket path is required") { + t.Fatalf("expected unix socket path error, got %v", err) + } +} + +func startUnixTestServer(t *testing.T, executor execution.Executor, socketPath string) { + t.Helper() + + listener, err := ListenUnix(socketPath) + if err != nil { + t.Fatalf("listen unix socket: %v", err) + } + + server := &http.Server{Handler: NewExecutorServiceHandler(executor)} + done := make(chan error, 1) + go func() { + err := server.Serve(listener) + if errors.Is(err, http.ErrServerClosed) || errors.Is(err, net.ErrClosed) { + err = nil + } + done <- err + }() + + t.Cleanup(func() { + _ = server.Close() + select { + case err := <-done: + if err != nil { + t.Errorf("unix socket server error: %v", err) + } + case <-time.After(time.Second): + t.Error("unix socket server did not stop") + } + }) +} diff --git a/execution/grpc/go.mod b/execution/grpc/go.mod index 25031ee32f..bbedeaf599 100644 --- a/execution/grpc/go.mod +++ b/execution/grpc/go.mod @@ -2,6 +2,8 @@ module github.com/evstack/ev-node/execution/grpc go 1.25.7 +replace github.com/evstack/ev-node => ../.. + require ( connectrpc.com/connect v1.19.2 connectrpc.com/grpcreflect v1.3.0 diff --git a/execution/grpc/server.go b/execution/grpc/server.go index e0488b7655..8f1cbb48ce 100644 --- a/execution/grpc/server.go +++ b/execution/grpc/server.go @@ -77,8 +77,13 @@ func (s *Server) GetTxs( return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to get txs: %w", err)) } + txBatch, err := encodeTxBatch(txs) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to encode tx batch: %w", err)) + } + return connect.NewResponse(&pb.GetTxsResponse{ - Txs: txs, + TxBatch: txBatch, }), nil } @@ -102,9 +107,14 @@ func (s *Server) ExecuteTxs( return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("prev_state_root is required")) } + txs, err := decodeTxBatch(req.Msg.TxBatch) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid tx_batch: %w", err)) + } + updatedStateRoot, err := s.executor.ExecuteTxs( ctx, - req.Msg.Txs, + txs, req.Msg.BlockHeight, req.Msg.Timestamp.AsTime(), req.Msg.PrevStateRoot, @@ -162,7 +172,12 @@ func (s *Server) FilterTxs( ctx context.Context, req *connect.Request[pb.FilterTxsRequest], ) (*connect.Response[pb.FilterTxsResponse], error) { - result, err := s.executor.FilterTxs(ctx, req.Msg.Txs, req.Msg.MaxBytes, req.Msg.MaxGas, req.Msg.HasForceIncludedTransaction) + txs, err := decodeTxBatch(req.Msg.TxBatch) + if err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid tx_batch: %w", err)) + } + + result, err := s.executor.FilterTxs(ctx, txs, req.Msg.MaxBytes, req.Msg.MaxGas, req.Msg.HasForceIncludedTransaction) if err != nil { return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to filter transactions: %w", err)) } diff --git a/execution/grpc/server_test.go b/execution/grpc/server_test.go index e2a01b4bc4..26a6d37ef4 100644 --- a/execution/grpc/server_test.go +++ b/execution/grpc/server_test.go @@ -9,6 +9,7 @@ import ( "connectrpc.com/connect" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/evstack/ev-node/core/execution" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) @@ -172,8 +173,17 @@ func TestServer_GetTxs(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if len(resp.Msg.Txs) != len(expectedTxs) { - t.Fatalf("expected %d txs, got %d", len(expectedTxs), len(resp.Msg.Txs)) + txs, err := decodeTxBatch(resp.Msg.TxBatch) + if err != nil { + t.Fatalf("unexpected tx batch decode error: %v", err) + } + if len(txs) != len(expectedTxs) { + t.Fatalf("expected %d txs, got %d", len(expectedTxs), len(txs)) + } + for i := range expectedTxs { + if string(txs[i]) != string(expectedTxs[i]) { + t.Fatalf("tx batch tx %d: expected %q, got %q", i, expectedTxs[i], txs[i]) + } } }) } @@ -197,7 +207,7 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "success", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, @@ -210,7 +220,7 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "missing block height", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, }, @@ -220,7 +230,7 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "missing timestamp", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, PrevStateRoot: prevStateRoot, }, @@ -230,17 +240,28 @@ func TestServer_ExecuteTxs(t *testing.T) { { name: "missing prev state root", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), }, wantErr: true, wantCode: connect.CodeInvalidArgument, }, + { + name: "invalid tx batch", + req: &pb.ExecuteTxsRequest{ + TxBatch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{3}}, + BlockHeight: blockHeight, + Timestamp: timestamppb.New(timestamp), + PrevStateRoot: prevStateRoot, + }, + wantErr: true, + wantCode: connect.CodeInvalidArgument, + }, { name: "executor error", req: &pb.ExecuteTxsRequest{ - Txs: txs, + TxBatch: mustEncodeTxBatch(t, txs), BlockHeight: blockHeight, Timestamp: timestamppb.New(timestamp), PrevStateRoot: prevStateRoot, @@ -362,3 +383,96 @@ func TestServer_SetFinal(t *testing.T) { }) } } + +func TestServer_FilterTxs(t *testing.T) { + ctx := context.Background() + txs := [][]byte{[]byte("tx1"), []byte("tx2")} + expectedStatuses := []execution.FilterStatus{execution.FilterOK, execution.FilterPostpone} + + tests := []struct { + name string + req *pb.FilterTxsRequest + mockFunc func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) + wantErr bool + wantCode connect.Code + }{ + { + name: "success", + req: &pb.FilterTxsRequest{ + TxBatch: mustEncodeTxBatch(t, txs), + MaxBytes: 100, + MaxGas: 200, + HasForceIncludedTransaction: true, + }, + mockFunc: func(ctx context.Context, txsIn [][]byte, maxBytes, maxGas uint64, forced bool) ([]execution.FilterStatus, error) { + if len(txsIn) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(txsIn)) + } + if maxBytes != 100 { + t.Fatalf("expected max bytes 100, got %d", maxBytes) + } + if maxGas != 200 { + t.Fatalf("expected max gas 200, got %d", maxGas) + } + if !forced { + t.Fatalf("expected forced transaction flag") + } + return expectedStatuses, nil + }, + wantErr: false, + }, + { + name: "invalid tx batch", + req: &pb.FilterTxsRequest{ + TxBatch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{3}}, + }, + wantErr: true, + wantCode: connect.CodeInvalidArgument, + }, + { + name: "executor error", + req: &pb.FilterTxsRequest{ + TxBatch: mustEncodeTxBatch(t, txs), + }, + mockFunc: func(ctx context.Context, txs [][]byte, maxBytes, maxGas uint64, hasForceIncludedTransaction bool) ([]execution.FilterStatus, error) { + return nil, errors.New("filter failed") + }, + wantErr: true, + wantCode: connect.CodeInternal, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockExec := &mockExecutor{ + filterTxsFunc: tt.mockFunc, + } + server := NewServer(mockExec) + + req := connect.NewRequest(tt.req) + resp, err := server.FilterTxs(ctx, req) + + if tt.wantErr { + if err == nil { + t.Fatalf("expected error but got none") + } + var connectErr *connect.Error + if errors.As(err, &connectErr) { + if connectErr.Code() != tt.wantCode { + t.Errorf("expected error code %v, got %v", tt.wantCode, connectErr.Code()) + } + } else { + t.Errorf("expected connect error, got %v", err) + } + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(resp.Msg.Statuses) != len(expectedStatuses) { + t.Fatalf("expected %d statuses, got %d", len(expectedStatuses), len(resp.Msg.Statuses)) + } + }) + } +} diff --git a/execution/grpc/tx_batch.go b/execution/grpc/tx_batch.go new file mode 100644 index 0000000000..2e3e600596 --- /dev/null +++ b/execution/grpc/tx_batch.go @@ -0,0 +1,74 @@ +package grpc + +import ( + "fmt" + + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +// maxTxBatchTxSize is the largest transaction length representable in TxBatch.TxSizes: +// 4 GiB - 1 byte, or 4,294,967,295 bytes. +const maxTxBatchTxSize = uint64(1<<32 - 1) + +func encodeTxBatch(txs [][]byte) (*pb.TxBatch, error) { + if len(txs) == 0 { + return &pb.TxBatch{}, nil + } + + maxInt := uint64(int(^uint(0) >> 1)) + var total uint64 + txSizes := make([]uint32, len(txs)) + for i, tx := range txs { + txLen := uint64(len(tx)) + if txLen > maxTxBatchTxSize { + return nil, fmt.Errorf("tx %d size %d exceeds uint32", i, txLen) + } + total += txLen + if total > maxInt { + return nil, fmt.Errorf("tx batch size %d exceeds int", total) + } + txSizes[i] = uint32(txLen) + } + + data := make([]byte, 0, int(total)) + for _, tx := range txs { + data = append(data, tx...) + } + + return &pb.TxBatch{ + Data: data, + TxSizes: txSizes, + }, nil +} + +func decodeTxBatch(batch *pb.TxBatch) ([][]byte, error) { + if batch == nil { + return nil, nil + } + if len(batch.TxSizes) == 0 { + if len(batch.Data) != 0 { + return nil, fmt.Errorf("tx batch has %d data bytes but no tx sizes", len(batch.Data)) + } + return nil, nil + } + + var total uint64 + for i, txSize := range batch.TxSizes { + total += uint64(txSize) + if total > uint64(len(batch.Data)) { + return nil, fmt.Errorf("tx sizes exceed data length at index %d", i) + } + } + if total != uint64(len(batch.Data)) { + return nil, fmt.Errorf("tx sizes total %d does not match data length %d", total, len(batch.Data)) + } + + txs := make([][]byte, len(batch.TxSizes)) + offset := 0 + for i, txSize := range batch.TxSizes { + end := offset + int(txSize) + txs[i] = batch.Data[offset:end:end] + offset = end + } + return txs, nil +} diff --git a/execution/grpc/tx_batch_test.go b/execution/grpc/tx_batch_test.go new file mode 100644 index 0000000000..71c48a7306 --- /dev/null +++ b/execution/grpc/tx_batch_test.go @@ -0,0 +1,79 @@ +package grpc + +import ( + "bytes" + "testing" + + pb "github.com/evstack/ev-node/types/pb/evnode/v1" +) + +func mustEncodeTxBatch(t *testing.T, txs [][]byte) *pb.TxBatch { + t.Helper() + + batch, err := encodeTxBatch(txs) + if err != nil { + t.Fatalf("encode tx batch: %v", err) + } + return batch +} + +func TestEncodeDecodeTxBatch(t *testing.T) { + txs := [][]byte{[]byte("tx1"), nil, []byte("tx3"), []byte{}} + + batch := mustEncodeTxBatch(t, txs) + decoded, err := decodeTxBatch(batch) + if err != nil { + t.Fatalf("decode tx batch: %v", err) + } + if len(decoded) != len(txs) { + t.Fatalf("expected %d txs, got %d", len(txs), len(decoded)) + } + for i := range txs { + if !bytes.Equal(decoded[i], txs[i]) { + t.Fatalf("tx %d: expected %q, got %q", i, txs[i], decoded[i]) + } + } + + decoded[0] = append(decoded[0], 'x') + if !bytes.Equal(decoded[2], txs[2]) { + t.Fatalf("decoded tx slices should not have capacity overlap") + } +} + +func TestDecodeTxBatchRejectsMalformedInput(t *testing.T) { + tests := []struct { + name string + batch *pb.TxBatch + }{ + { + name: "data without sizes", + batch: &pb.TxBatch{Data: []byte("tx")}, + }, + { + name: "sizes exceed data", + batch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{3}}, + }, + { + name: "sizes do not consume data", + batch: &pb.TxBatch{Data: []byte("tx"), TxSizes: []uint32{1}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if _, err := decodeTxBatch(tt.batch); err == nil { + t.Fatalf("expected decode error") + } + }) + } +} + +func TestDecodeTxBatchNil(t *testing.T) { + txs, err := decodeTxBatch(nil) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(txs) != 0 { + t.Fatalf("expected nil tx_batch to decode to empty txs, got %d txs", len(txs)) + } +} diff --git a/execution/grpc/unix.go b/execution/grpc/unix.go new file mode 100644 index 0000000000..69cb59daea --- /dev/null +++ b/execution/grpc/unix.go @@ -0,0 +1,69 @@ +package grpc + +import ( + "errors" + "fmt" + "net" + "net/http" + "os" + + "connectrpc.com/connect" + + "github.com/evstack/ev-node/core/execution" +) + +// ListenUnix creates a Unix domain socket listener for the gRPC execution service. +// +// If socketPath already exists, ListenUnix removes it only when it is a stale +// socket. Regular files, directories, and other path types are left untouched. +func ListenUnix(socketPath string) (net.Listener, error) { + if socketPath == "" { + return nil, errors.New("unix socket path is required") + } + if err := removeStaleUnixSocket(socketPath); err != nil { + return nil, err + } + listener, err := net.Listen("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("listen unix socket %q: %w", socketPath, err) + } + return listener, nil +} + +// ListenAndServeUnix serves the gRPC execution service over a Unix domain socket. +// +// The NewExecutorServiceHandler handler is passed to http.Serve, so this +// function blocks until http.Serve returns an error. When it returns, deferred +// cleanup closes the listener with listener.Close and then removes the socket +// with removeStaleUnixSocket. Cleanup errors are currently ignored. +func ListenAndServeUnix(socketPath string, executor execution.Executor, opts ...connect.HandlerOption) error { + listener, err := ListenUnix(socketPath) + if err != nil { + return err + } + defer func() { + _ = listener.Close() + }() + defer func() { + _ = removeStaleUnixSocket(socketPath) + }() + + return http.Serve(listener, NewExecutorServiceHandler(executor, opts...)) +} + +func removeStaleUnixSocket(socketPath string) error { + info, err := os.Lstat(socketPath) + if errors.Is(err, os.ErrNotExist) { + return nil + } + if err != nil { + return fmt.Errorf("stat unix socket %q: %w", socketPath, err) + } + if info.Mode()&os.ModeSocket == 0 { + return fmt.Errorf("refusing to remove non-socket path %q", socketPath) + } + if err := os.Remove(socketPath); err != nil { + return fmt.Errorf("remove stale unix socket %q: %w", socketPath, err) + } + return nil +} diff --git a/execution/grpc/unix_test.go b/execution/grpc/unix_test.go new file mode 100644 index 0000000000..d90133a8e9 --- /dev/null +++ b/execution/grpc/unix_test.go @@ -0,0 +1,59 @@ +package grpc + +import ( + "fmt" + "net" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestListenUnixRejectsNonSocketPath(t *testing.T) { + socketPath := filepath.Join(t.TempDir(), "executor.sock") + if err := os.WriteFile(socketPath, []byte("not a socket"), 0o600); err != nil { + t.Fatalf("write test file: %v", err) + } + + listener, err := ListenUnix(socketPath) + if err == nil { + _ = listener.Close() + t.Fatal("expected error for non-socket path") + } + if !strings.Contains(err.Error(), "refusing to remove non-socket path") { + t.Fatalf("expected non-socket refusal, got %v", err) + } +} + +func TestListenUnixRemovesStaleSocket(t *testing.T) { + socketPath := testUnixSocketPath(t) + staleListener, err := net.Listen("unix", socketPath) + if err != nil { + t.Fatalf("create stale unix socket: %v", err) + } + if err := staleListener.Close(); err != nil { + t.Fatalf("close stale unix socket: %v", err) + } + + listener, err := ListenUnix(socketPath) + if err != nil { + t.Fatalf("listen unix socket: %v", err) + } + if err := listener.Close(); err != nil { + t.Fatalf("close unix socket: %v", err) + } +} + +func testUnixSocketPath(t *testing.T) string { + t.Helper() + + socketPath := filepath.Join( + os.TempDir(), + fmt.Sprintf("ev-node-grpc-%d-%d.sock", os.Getpid(), time.Now().UnixNano()), + ) + t.Cleanup(func() { + _ = os.Remove(socketPath) + }) + return socketPath +} diff --git a/proto/evnode/v1/execution.proto b/proto/evnode/v1/execution.proto index a3abbea36a..d1773c4289 100644 --- a/proto/evnode/v1/execution.proto +++ b/proto/evnode/v1/execution.proto @@ -49,16 +49,28 @@ message GetTxsRequest { // Empty for now, may include filtering criteria in the future } +// TxBatch stores ordered transactions in one contiguous bytes buffer. +message TxBatch { + // Concatenated transaction bytes. + bytes data = 1; + + // Byte length for each transaction in order. + repeated uint32 tx_sizes = 2; +} + // GetTxsResponse contains the available transactions message GetTxsResponse { - // Slice of valid transactions from mempool - repeated bytes txs = 1; + reserved 1; + reserved "txs"; + + // Valid transactions from mempool in contiguous batch form. + TxBatch tx_batch = 2; } // ExecuteTxsRequest contains transactions and block context for execution message ExecuteTxsRequest { - // Ordered list of transactions to execute - repeated bytes txs = 1; + reserved 1; + reserved "txs"; // Height of block being created (must be > 0) uint64 block_height = 2; @@ -68,6 +80,9 @@ message ExecuteTxsRequest { // Previous block's state root hash bytes prev_state_root = 4; + + // Ordered transactions to execute in contiguous batch form. + TxBatch tx_batch = 5; } // ExecuteTxsResponse contains the result of transaction execution @@ -112,8 +127,8 @@ enum FilterStatus { // FilterTxsRequest contains transactions to validate and filter message FilterTxsRequest { - // All transactions (force-included + mempool) - repeated bytes txs = 1; + reserved 1; + reserved "txs"; // Maximum cumulative size allowed (0 means no size limit) uint64 max_bytes = 2; @@ -123,6 +138,9 @@ message FilterTxsRequest { // Whether force-included transactions are present bool has_force_included_transaction = 4; + + // All transactions (force-included + mempool) in contiguous batch form. + TxBatch tx_batch = 5; } // FilterTxsResponse contains the filter status for each transaction diff --git a/types/pb/evnode/v1/execution.pb.go b/types/pb/evnode/v1/execution.pb.go index 2b33c910d2..9f4e3d5809 100644 --- a/types/pb/evnode/v1/execution.pb.go +++ b/types/pb/evnode/v1/execution.pb.go @@ -222,18 +222,73 @@ func (*GetTxsRequest) Descriptor() ([]byte, []int) { return file_evnode_v1_execution_proto_rawDescGZIP(), []int{2} } +// TxBatch stores ordered transactions in one contiguous bytes buffer. +type TxBatch struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Concatenated transaction bytes. + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + // Byte length for each transaction in order. + TxSizes []uint32 `protobuf:"varint,2,rep,packed,name=tx_sizes,json=txSizes,proto3" json:"tx_sizes,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TxBatch) Reset() { + *x = TxBatch{} + mi := &file_evnode_v1_execution_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TxBatch) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TxBatch) ProtoMessage() {} + +func (x *TxBatch) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_execution_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TxBatch.ProtoReflect.Descriptor instead. +func (*TxBatch) Descriptor() ([]byte, []int) { + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{3} +} + +func (x *TxBatch) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *TxBatch) GetTxSizes() []uint32 { + if x != nil { + return x.TxSizes + } + return nil +} + // GetTxsResponse contains the available transactions type GetTxsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - // Slice of valid transactions from mempool - Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` + // Valid transactions from mempool in contiguous batch form. + TxBatch *TxBatch `protobuf:"bytes,2,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *GetTxsResponse) Reset() { *x = GetTxsResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[3] + mi := &file_evnode_v1_execution_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -245,7 +300,7 @@ func (x *GetTxsResponse) String() string { func (*GetTxsResponse) ProtoMessage() {} func (x *GetTxsResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[3] + mi := &file_evnode_v1_execution_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -258,12 +313,12 @@ func (x *GetTxsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTxsResponse.ProtoReflect.Descriptor instead. func (*GetTxsResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{3} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{4} } -func (x *GetTxsResponse) GetTxs() [][]byte { +func (x *GetTxsResponse) GetTxBatch() *TxBatch { if x != nil { - return x.Txs + return x.TxBatch } return nil } @@ -271,21 +326,21 @@ func (x *GetTxsResponse) GetTxs() [][]byte { // ExecuteTxsRequest contains transactions and block context for execution type ExecuteTxsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - // Ordered list of transactions to execute - Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` // Height of block being created (must be > 0) BlockHeight uint64 `protobuf:"varint,2,opt,name=block_height,json=blockHeight,proto3" json:"block_height,omitempty"` // Block creation time in UTC Timestamp *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Previous block's state root hash PrevStateRoot []byte `protobuf:"bytes,4,opt,name=prev_state_root,json=prevStateRoot,proto3" json:"prev_state_root,omitempty"` + // Ordered transactions to execute in contiguous batch form. + TxBatch *TxBatch `protobuf:"bytes,5,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *ExecuteTxsRequest) Reset() { *x = ExecuteTxsRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[4] + mi := &file_evnode_v1_execution_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -297,7 +352,7 @@ func (x *ExecuteTxsRequest) String() string { func (*ExecuteTxsRequest) ProtoMessage() {} func (x *ExecuteTxsRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[4] + mi := &file_evnode_v1_execution_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -310,14 +365,7 @@ func (x *ExecuteTxsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ExecuteTxsRequest.ProtoReflect.Descriptor instead. func (*ExecuteTxsRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{4} -} - -func (x *ExecuteTxsRequest) GetTxs() [][]byte { - if x != nil { - return x.Txs - } - return nil + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{5} } func (x *ExecuteTxsRequest) GetBlockHeight() uint64 { @@ -341,6 +389,13 @@ func (x *ExecuteTxsRequest) GetPrevStateRoot() []byte { return nil } +func (x *ExecuteTxsRequest) GetTxBatch() *TxBatch { + if x != nil { + return x.TxBatch + } + return nil +} + // ExecuteTxsResponse contains the result of transaction execution type ExecuteTxsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -354,7 +409,7 @@ type ExecuteTxsResponse struct { func (x *ExecuteTxsResponse) Reset() { *x = ExecuteTxsResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[5] + mi := &file_evnode_v1_execution_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -366,7 +421,7 @@ func (x *ExecuteTxsResponse) String() string { func (*ExecuteTxsResponse) ProtoMessage() {} func (x *ExecuteTxsResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[5] + mi := &file_evnode_v1_execution_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -379,7 +434,7 @@ func (x *ExecuteTxsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ExecuteTxsResponse.ProtoReflect.Descriptor instead. func (*ExecuteTxsResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{5} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{6} } func (x *ExecuteTxsResponse) GetUpdatedStateRoot() []byte { @@ -407,7 +462,7 @@ type SetFinalRequest struct { func (x *SetFinalRequest) Reset() { *x = SetFinalRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[6] + mi := &file_evnode_v1_execution_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -419,7 +474,7 @@ func (x *SetFinalRequest) String() string { func (*SetFinalRequest) ProtoMessage() {} func (x *SetFinalRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[6] + mi := &file_evnode_v1_execution_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -432,7 +487,7 @@ func (x *SetFinalRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SetFinalRequest.ProtoReflect.Descriptor instead. func (*SetFinalRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{6} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{7} } func (x *SetFinalRequest) GetBlockHeight() uint64 { @@ -451,7 +506,7 @@ type SetFinalResponse struct { func (x *SetFinalResponse) Reset() { *x = SetFinalResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[7] + mi := &file_evnode_v1_execution_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -463,7 +518,7 @@ func (x *SetFinalResponse) String() string { func (*SetFinalResponse) ProtoMessage() {} func (x *SetFinalResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[7] + mi := &file_evnode_v1_execution_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -476,7 +531,7 @@ func (x *SetFinalResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SetFinalResponse.ProtoReflect.Descriptor instead. func (*SetFinalResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{7} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{8} } // GetExecutionInfoRequest requests execution layer parameters @@ -488,7 +543,7 @@ type GetExecutionInfoRequest struct { func (x *GetExecutionInfoRequest) Reset() { *x = GetExecutionInfoRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[8] + mi := &file_evnode_v1_execution_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -500,7 +555,7 @@ func (x *GetExecutionInfoRequest) String() string { func (*GetExecutionInfoRequest) ProtoMessage() {} func (x *GetExecutionInfoRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[8] + mi := &file_evnode_v1_execution_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -513,7 +568,7 @@ func (x *GetExecutionInfoRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetExecutionInfoRequest.ProtoReflect.Descriptor instead. func (*GetExecutionInfoRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{8} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{9} } // GetExecutionInfoResponse contains execution layer parameters @@ -528,7 +583,7 @@ type GetExecutionInfoResponse struct { func (x *GetExecutionInfoResponse) Reset() { *x = GetExecutionInfoResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[9] + mi := &file_evnode_v1_execution_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -540,7 +595,7 @@ func (x *GetExecutionInfoResponse) String() string { func (*GetExecutionInfoResponse) ProtoMessage() {} func (x *GetExecutionInfoResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[9] + mi := &file_evnode_v1_execution_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -553,7 +608,7 @@ func (x *GetExecutionInfoResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetExecutionInfoResponse.ProtoReflect.Descriptor instead. func (*GetExecutionInfoResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{9} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{10} } func (x *GetExecutionInfoResponse) GetMaxGas() uint64 { @@ -566,21 +621,21 @@ func (x *GetExecutionInfoResponse) GetMaxGas() uint64 { // FilterTxsRequest contains transactions to validate and filter type FilterTxsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - // All transactions (force-included + mempool) - Txs [][]byte `protobuf:"bytes,1,rep,name=txs,proto3" json:"txs,omitempty"` // Maximum cumulative size allowed (0 means no size limit) MaxBytes uint64 `protobuf:"varint,2,opt,name=max_bytes,json=maxBytes,proto3" json:"max_bytes,omitempty"` // Maximum cumulative gas allowed (0 means no gas limit) MaxGas uint64 `protobuf:"varint,3,opt,name=max_gas,json=maxGas,proto3" json:"max_gas,omitempty"` // Whether force-included transactions are present HasForceIncludedTransaction bool `protobuf:"varint,4,opt,name=has_force_included_transaction,json=hasForceIncludedTransaction,proto3" json:"has_force_included_transaction,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // All transactions (force-included + mempool) in contiguous batch form. + TxBatch *TxBatch `protobuf:"bytes,5,opt,name=tx_batch,json=txBatch,proto3" json:"tx_batch,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *FilterTxsRequest) Reset() { *x = FilterTxsRequest{} - mi := &file_evnode_v1_execution_proto_msgTypes[10] + mi := &file_evnode_v1_execution_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -592,7 +647,7 @@ func (x *FilterTxsRequest) String() string { func (*FilterTxsRequest) ProtoMessage() {} func (x *FilterTxsRequest) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[10] + mi := &file_evnode_v1_execution_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -605,14 +660,7 @@ func (x *FilterTxsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use FilterTxsRequest.ProtoReflect.Descriptor instead. func (*FilterTxsRequest) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{10} -} - -func (x *FilterTxsRequest) GetTxs() [][]byte { - if x != nil { - return x.Txs - } - return nil + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{11} } func (x *FilterTxsRequest) GetMaxBytes() uint64 { @@ -636,6 +684,13 @@ func (x *FilterTxsRequest) GetHasForceIncludedTransaction() bool { return false } +func (x *FilterTxsRequest) GetTxBatch() *TxBatch { + if x != nil { + return x.TxBatch + } + return nil +} + // FilterTxsResponse contains the filter status for each transaction type FilterTxsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -647,7 +702,7 @@ type FilterTxsResponse struct { func (x *FilterTxsResponse) Reset() { *x = FilterTxsResponse{} - mi := &file_evnode_v1_execution_proto_msgTypes[11] + mi := &file_evnode_v1_execution_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -659,7 +714,7 @@ func (x *FilterTxsResponse) String() string { func (*FilterTxsResponse) ProtoMessage() {} func (x *FilterTxsResponse) ProtoReflect() protoreflect.Message { - mi := &file_evnode_v1_execution_proto_msgTypes[11] + mi := &file_evnode_v1_execution_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -672,7 +727,7 @@ func (x *FilterTxsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use FilterTxsResponse.ProtoReflect.Descriptor instead. func (*FilterTxsResponse) Descriptor() ([]byte, []int) { - return file_evnode_v1_execution_proto_rawDescGZIP(), []int{11} + return file_evnode_v1_execution_proto_rawDescGZIP(), []int{12} } func (x *FilterTxsResponse) GetStatuses() []FilterStatus { @@ -694,14 +749,17 @@ const file_evnode_v1_execution_proto_rawDesc = "" + "\x11InitChainResponse\x12\x1d\n" + "\n" + "state_root\x18\x01 \x01(\fR\tstateRoot\"\x0f\n" + - "\rGetTxsRequest\"\"\n" + - "\x0eGetTxsResponse\x12\x10\n" + - "\x03txs\x18\x01 \x03(\fR\x03txs\"\xaa\x01\n" + - "\x11ExecuteTxsRequest\x12\x10\n" + - "\x03txs\x18\x01 \x03(\fR\x03txs\x12!\n" + + "\rGetTxsRequest\"8\n" + + "\aTxBatch\x12\x12\n" + + "\x04data\x18\x01 \x01(\fR\x04data\x12\x19\n" + + "\btx_sizes\x18\x02 \x03(\rR\atxSizes\"J\n" + + "\x0eGetTxsResponse\x12-\n" + + "\btx_batch\x18\x02 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatchJ\x04\b\x01\x10\x02R\x03txs\"\xd2\x01\n" + + "\x11ExecuteTxsRequest\x12!\n" + "\fblock_height\x18\x02 \x01(\x04R\vblockHeight\x128\n" + "\ttimestamp\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12&\n" + - "\x0fprev_state_root\x18\x04 \x01(\fR\rprevStateRoot\"_\n" + + "\x0fprev_state_root\x18\x04 \x01(\fR\rprevStateRoot\x12-\n" + + "\btx_batch\x18\x05 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatchJ\x04\b\x01\x10\x02R\x03txs\"_\n" + "\x12ExecuteTxsResponse\x12,\n" + "\x12updated_state_root\x18\x01 \x01(\fR\x10updatedStateRoot\x12\x1b\n" + "\tmax_bytes\x18\x02 \x01(\x04R\bmaxBytes\"4\n" + @@ -710,12 +768,12 @@ const file_evnode_v1_execution_proto_rawDesc = "" + "\x10SetFinalResponse\"\x19\n" + "\x17GetExecutionInfoRequest\"3\n" + "\x18GetExecutionInfoResponse\x12\x17\n" + - "\amax_gas\x18\x01 \x01(\x04R\x06maxGas\"\x9f\x01\n" + - "\x10FilterTxsRequest\x12\x10\n" + - "\x03txs\x18\x01 \x03(\fR\x03txs\x12\x1b\n" + + "\amax_gas\x18\x01 \x01(\x04R\x06maxGas\"\xc7\x01\n" + + "\x10FilterTxsRequest\x12\x1b\n" + "\tmax_bytes\x18\x02 \x01(\x04R\bmaxBytes\x12\x17\n" + "\amax_gas\x18\x03 \x01(\x04R\x06maxGas\x12C\n" + - "\x1ehas_force_included_transaction\x18\x04 \x01(\bR\x1bhasForceIncludedTransaction\"H\n" + + "\x1ehas_force_included_transaction\x18\x04 \x01(\bR\x1bhasForceIncludedTransaction\x12-\n" + + "\btx_batch\x18\x05 \x01(\v2\x12.evnode.v1.TxBatchR\atxBatchJ\x04\b\x01\x10\x02R\x03txs\"H\n" + "\x11FilterTxsResponse\x123\n" + "\bstatuses\x18\x01 \x03(\x0e2\x17.evnode.v1.FilterStatusR\bstatuses*E\n" + "\fFilterStatus\x12\r\n" + @@ -744,44 +802,48 @@ func file_evnode_v1_execution_proto_rawDescGZIP() []byte { } var file_evnode_v1_execution_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_evnode_v1_execution_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_evnode_v1_execution_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_evnode_v1_execution_proto_goTypes = []any{ (FilterStatus)(0), // 0: evnode.v1.FilterStatus (*InitChainRequest)(nil), // 1: evnode.v1.InitChainRequest (*InitChainResponse)(nil), // 2: evnode.v1.InitChainResponse (*GetTxsRequest)(nil), // 3: evnode.v1.GetTxsRequest - (*GetTxsResponse)(nil), // 4: evnode.v1.GetTxsResponse - (*ExecuteTxsRequest)(nil), // 5: evnode.v1.ExecuteTxsRequest - (*ExecuteTxsResponse)(nil), // 6: evnode.v1.ExecuteTxsResponse - (*SetFinalRequest)(nil), // 7: evnode.v1.SetFinalRequest - (*SetFinalResponse)(nil), // 8: evnode.v1.SetFinalResponse - (*GetExecutionInfoRequest)(nil), // 9: evnode.v1.GetExecutionInfoRequest - (*GetExecutionInfoResponse)(nil), // 10: evnode.v1.GetExecutionInfoResponse - (*FilterTxsRequest)(nil), // 11: evnode.v1.FilterTxsRequest - (*FilterTxsResponse)(nil), // 12: evnode.v1.FilterTxsResponse - (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp + (*TxBatch)(nil), // 4: evnode.v1.TxBatch + (*GetTxsResponse)(nil), // 5: evnode.v1.GetTxsResponse + (*ExecuteTxsRequest)(nil), // 6: evnode.v1.ExecuteTxsRequest + (*ExecuteTxsResponse)(nil), // 7: evnode.v1.ExecuteTxsResponse + (*SetFinalRequest)(nil), // 8: evnode.v1.SetFinalRequest + (*SetFinalResponse)(nil), // 9: evnode.v1.SetFinalResponse + (*GetExecutionInfoRequest)(nil), // 10: evnode.v1.GetExecutionInfoRequest + (*GetExecutionInfoResponse)(nil), // 11: evnode.v1.GetExecutionInfoResponse + (*FilterTxsRequest)(nil), // 12: evnode.v1.FilterTxsRequest + (*FilterTxsResponse)(nil), // 13: evnode.v1.FilterTxsResponse + (*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp } var file_evnode_v1_execution_proto_depIdxs = []int32{ - 13, // 0: evnode.v1.InitChainRequest.genesis_time:type_name -> google.protobuf.Timestamp - 13, // 1: evnode.v1.ExecuteTxsRequest.timestamp:type_name -> google.protobuf.Timestamp - 0, // 2: evnode.v1.FilterTxsResponse.statuses:type_name -> evnode.v1.FilterStatus - 1, // 3: evnode.v1.ExecutorService.InitChain:input_type -> evnode.v1.InitChainRequest - 3, // 4: evnode.v1.ExecutorService.GetTxs:input_type -> evnode.v1.GetTxsRequest - 5, // 5: evnode.v1.ExecutorService.ExecuteTxs:input_type -> evnode.v1.ExecuteTxsRequest - 7, // 6: evnode.v1.ExecutorService.SetFinal:input_type -> evnode.v1.SetFinalRequest - 9, // 7: evnode.v1.ExecutorService.GetExecutionInfo:input_type -> evnode.v1.GetExecutionInfoRequest - 11, // 8: evnode.v1.ExecutorService.FilterTxs:input_type -> evnode.v1.FilterTxsRequest - 2, // 9: evnode.v1.ExecutorService.InitChain:output_type -> evnode.v1.InitChainResponse - 4, // 10: evnode.v1.ExecutorService.GetTxs:output_type -> evnode.v1.GetTxsResponse - 6, // 11: evnode.v1.ExecutorService.ExecuteTxs:output_type -> evnode.v1.ExecuteTxsResponse - 8, // 12: evnode.v1.ExecutorService.SetFinal:output_type -> evnode.v1.SetFinalResponse - 10, // 13: evnode.v1.ExecutorService.GetExecutionInfo:output_type -> evnode.v1.GetExecutionInfoResponse - 12, // 14: evnode.v1.ExecutorService.FilterTxs:output_type -> evnode.v1.FilterTxsResponse - 9, // [9:15] is the sub-list for method output_type - 3, // [3:9] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 14, // 0: evnode.v1.InitChainRequest.genesis_time:type_name -> google.protobuf.Timestamp + 4, // 1: evnode.v1.GetTxsResponse.tx_batch:type_name -> evnode.v1.TxBatch + 14, // 2: evnode.v1.ExecuteTxsRequest.timestamp:type_name -> google.protobuf.Timestamp + 4, // 3: evnode.v1.ExecuteTxsRequest.tx_batch:type_name -> evnode.v1.TxBatch + 4, // 4: evnode.v1.FilterTxsRequest.tx_batch:type_name -> evnode.v1.TxBatch + 0, // 5: evnode.v1.FilterTxsResponse.statuses:type_name -> evnode.v1.FilterStatus + 1, // 6: evnode.v1.ExecutorService.InitChain:input_type -> evnode.v1.InitChainRequest + 3, // 7: evnode.v1.ExecutorService.GetTxs:input_type -> evnode.v1.GetTxsRequest + 6, // 8: evnode.v1.ExecutorService.ExecuteTxs:input_type -> evnode.v1.ExecuteTxsRequest + 8, // 9: evnode.v1.ExecutorService.SetFinal:input_type -> evnode.v1.SetFinalRequest + 10, // 10: evnode.v1.ExecutorService.GetExecutionInfo:input_type -> evnode.v1.GetExecutionInfoRequest + 12, // 11: evnode.v1.ExecutorService.FilterTxs:input_type -> evnode.v1.FilterTxsRequest + 2, // 12: evnode.v1.ExecutorService.InitChain:output_type -> evnode.v1.InitChainResponse + 5, // 13: evnode.v1.ExecutorService.GetTxs:output_type -> evnode.v1.GetTxsResponse + 7, // 14: evnode.v1.ExecutorService.ExecuteTxs:output_type -> evnode.v1.ExecuteTxsResponse + 9, // 15: evnode.v1.ExecutorService.SetFinal:output_type -> evnode.v1.SetFinalResponse + 11, // 16: evnode.v1.ExecutorService.GetExecutionInfo:output_type -> evnode.v1.GetExecutionInfoResponse + 13, // 17: evnode.v1.ExecutorService.FilterTxs:output_type -> evnode.v1.FilterTxsResponse + 12, // [12:18] is the sub-list for method output_type + 6, // [6:12] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_evnode_v1_execution_proto_init() } @@ -795,7 +857,7 @@ func file_evnode_v1_execution_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_evnode_v1_execution_proto_rawDesc), len(file_evnode_v1_execution_proto_rawDesc)), NumEnums: 1, - NumMessages: 12, + NumMessages: 13, NumExtensions: 0, NumServices: 1, },