diff --git a/crates/consensus_manager/src/consensus_manager.rs b/crates/consensus_manager/src/consensus_manager.rs index 0a7d6e69c8..ccdc1f9a2d 100644 --- a/crates/consensus_manager/src/consensus_manager.rs +++ b/crates/consensus_manager/src/consensus_manager.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use starknet_batcher_types::communication::SharedBatcherClient; -use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +use starknet_mempool_infra::component_runner::ComponentStarter; +use starknet_mempool_infra::errors::ComponentError; use tracing::info; use crate::config::ConsensusManagerConfig; @@ -28,7 +29,7 @@ pub fn create_consensus_manager( #[async_trait] impl ComponentStarter for ConsensusManager { - async fn start(&mut self) -> Result<(), ComponentStartError> { + async fn start(&mut self) -> Result<(), ComponentError> { info!("ConsensusManager::start()"); Ok(()) } diff --git a/crates/gateway/src/gateway.rs b/crates/gateway/src/gateway.rs index ed96c4532f..2db5d466cd 100644 --- a/crates/gateway/src/gateway.rs +++ b/crates/gateway/src/gateway.rs @@ -6,7 +6,8 @@ use starknet_api::executable_transaction::Transaction; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_gateway_types::errors::GatewaySpecError; -use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +use starknet_mempool_infra::component_runner::ComponentStarter; +use starknet_mempool_infra::errors::ComponentError; use starknet_mempool_types::communication::{MempoolWrapperInput, SharedMempoolClient}; use starknet_mempool_types::mempool_types::{Account, AccountState, MempoolInput}; use starknet_sierra_compile::config::SierraToCasmCompilationConfig; @@ -156,7 +157,7 @@ pub fn create_gateway( #[async_trait] impl ComponentStarter for Gateway { - async fn start(&mut self) -> Result<(), ComponentStartError> { + async fn start(&mut self) -> Result<(), ComponentError> { info!("Gateway::start()"); Ok(()) } diff --git a/crates/http_server/src/http_server.rs b/crates/http_server/src/http_server.rs index 46b0d3b9f4..91c0b5277a 100644 --- a/crates/http_server/src/http_server.rs +++ b/crates/http_server/src/http_server.rs @@ -11,7 +11,8 @@ use starknet_gateway::errors::GatewayRunError; use starknet_gateway_types::communication::SharedGatewayClient; use starknet_gateway_types::errors::GatewaySpecError; use starknet_gateway_types::gateway_types::GatewayInput; -use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +use starknet_mempool_infra::component_runner::ComponentStarter; +use starknet_mempool_infra::errors::ComponentError; use tracing::{error, info, instrument}; use crate::config::HttpServerConfig; @@ -96,8 +97,8 @@ pub fn create_http_server( #[async_trait] impl ComponentStarter for HttpServer { - async fn start(&mut self) -> Result<(), ComponentStartError> { + async fn start(&mut self) -> Result<(), ComponentError> { info!("HttpServer::start()"); - self.run().await.map_err(|_| ComponentStartError::InternalComponentError) + self.run().await.map_err(|_| ComponentError::InternalComponentError) } } diff --git a/crates/mempool_infra/src/component_runner.rs b/crates/mempool_infra/src/component_runner.rs index de8ab6b51d..8757a2f13a 100644 --- a/crates/mempool_infra/src/component_runner.rs +++ b/crates/mempool_infra/src/component_runner.rs @@ -1,18 +1,12 @@ use async_trait::async_trait; -#[derive(thiserror::Error, Debug, PartialEq, Clone)] -pub enum ComponentStartError { - #[error("Error in the component configuration.")] - ComponentConfigError, - #[error("An internal component error.")] - InternalComponentError, -} +use crate::errors::ComponentError; /// Interface to start components. #[async_trait] pub trait ComponentStarter { /// Start the component. By default do nothing. - async fn start(&mut self) -> Result<(), ComponentStartError> { + async fn start(&mut self) -> Result<(), ComponentError> { Ok(()) } } diff --git a/crates/mempool_infra/src/component_server/definitions.rs b/crates/mempool_infra/src/component_server/definitions.rs index fddb6dbe4b..90082e2880 100644 --- a/crates/mempool_infra/src/component_server/definitions.rs +++ b/crates/mempool_infra/src/component_server/definitions.rs @@ -6,10 +6,11 @@ use tracing::{error, info}; use crate::component_definitions::{ComponentRequestAndResponseSender, ComponentRequestHandler}; use crate::component_runner::ComponentStarter; +use crate::errors::ComponentServerError; #[async_trait] pub trait ComponentServerStarter: Send + Sync { - async fn start(&mut self); + async fn start(&mut self) -> Result<(), ComponentServerError>; } pub async fn start_component(component: &mut Component) -> bool diff --git a/crates/mempool_infra/src/component_server/empty_component_server.rs b/crates/mempool_infra/src/component_server/empty_component_server.rs index 53b03b6bfd..ccdf992947 100644 --- a/crates/mempool_infra/src/component_server/empty_component_server.rs +++ b/crates/mempool_infra/src/component_server/empty_component_server.rs @@ -1,7 +1,8 @@ use async_trait::async_trait; -use super::definitions::{start_component, ComponentServerStarter}; +use super::definitions::ComponentServerStarter; use crate::component_runner::ComponentStarter; +use crate::errors::ComponentServerError; pub struct EmptyServer { component: T, @@ -15,8 +16,8 @@ impl EmptyServer { #[async_trait] impl ComponentServerStarter for EmptyServer { - async fn start(&mut self) { - start_component(&mut self.component).await; + async fn start(&mut self) -> Result<(), ComponentServerError> { + self.component.start().await.map_err(ComponentServerError::ComponentError) } } diff --git a/crates/mempool_infra/src/component_server/local_component_server.rs b/crates/mempool_infra/src/component_server/local_component_server.rs index 8f9d07724f..6e27603030 100644 --- a/crates/mempool_infra/src/component_server/local_component_server.rs +++ b/crates/mempool_infra/src/component_server/local_component_server.rs @@ -4,9 +4,10 @@ use async_trait::async_trait; use tokio::sync::mpsc::Receiver; use tracing::error; -use super::definitions::{request_response_loop, start_component, ComponentServerStarter}; +use super::definitions::{request_response_loop, ComponentServerStarter}; use crate::component_definitions::{ComponentRequestAndResponseSender, ComponentRequestHandler}; use crate::component_runner::ComponentStarter; +use crate::errors::ComponentServerError; /// The `LocalComponentServer` struct is a generic server that handles requests and responses for a /// specified component. It receives requests, processes them using the provided component, and @@ -36,7 +37,8 @@ use crate::component_runner::ComponentStarter; /// use std::sync::mpsc::{channel, Receiver}; /// /// use async_trait::async_trait; -/// use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +/// use starknet_mempool_infra::component_runner::ComponentStarter; +/// use starknet_mempool_infra::errors::ComponentError; /// use tokio::task; /// /// use crate::starknet_mempool_infra::component_definitions::{ @@ -53,7 +55,7 @@ use crate::component_runner::ComponentStarter; /// /// #[async_trait] /// impl ComponentStarter for MyComponent { -/// async fn start(&mut self) -> Result<(), ComponentStartError> { +/// async fn start(&mut self) -> Result<(), ComponentError> { /// Ok(()) /// } /// } @@ -112,6 +114,7 @@ use crate::component_runner::ComponentStarter; pub type LocalComponentServer = BaseLocalComponentServer; pub struct BlockingLocalServerType {} + #[async_trait] impl ComponentServerStarter for LocalComponentServer @@ -120,12 +123,10 @@ where Request: Send + Sync, Response: Send + Sync, { - async fn start(&mut self) { - // TODO(Tsabary/Lev): Find a better mechanism than the if condition to determine what to - // run. - if start_component(&mut self.component).await { - request_response_loop(&mut self.rx, &mut self.component).await; - } + async fn start(&mut self) -> Result<(), ComponentServerError> { + self.component.start().await?; + request_response_loop(&mut self.rx, &mut self.component).await; + Ok(()) } } @@ -141,7 +142,7 @@ where Request: Send + Sync, Response: Send + Sync, { - async fn start(&mut self) { + async fn start(&mut self) -> Result<(), ComponentServerError> { let mut component = self.component.clone(); let component_future = async move { component.start().await }; let request_response_future = request_response_loop(&mut self.rx, &mut self.component); @@ -155,6 +156,7 @@ where } }; error!("Server ended with unexpected Ok."); + Err(ComponentServerError::ServerUnexpectedlyStopped) } } diff --git a/crates/mempool_infra/src/component_server/remote_component_server.rs b/crates/mempool_infra/src/component_server/remote_component_server.rs index 0db770ef2f..7fab104146 100644 --- a/crates/mempool_infra/src/component_server/remote_component_server.rs +++ b/crates/mempool_infra/src/component_server/remote_component_server.rs @@ -12,6 +12,7 @@ use serde::Serialize; use super::definitions::ComponentServerStarter; use crate::component_client::LocalComponentClient; use crate::component_definitions::{ServerError, APPLICATION_OCTET_STREAM}; +use crate::errors::ComponentServerError; /// The `RemoteComponentServer` struct is a generic server that handles requests and responses for a /// specified component. It receives requests, processes them using the provided component, and @@ -38,7 +39,8 @@ use crate::component_definitions::{ServerError, APPLICATION_OCTET_STREAM}; /// // Example usage of the RemoteComponentServer /// use async_trait::async_trait; /// use serde::{Deserialize, Serialize}; -/// use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +/// use starknet_mempool_infra::component_runner::ComponentStarter; +/// use starknet_mempool_infra::errors::ComponentError; /// use tokio::task; /// /// use crate::starknet_mempool_infra::component_client::LocalComponentClient; @@ -53,7 +55,7 @@ use crate::component_definitions::{ServerError, APPLICATION_OCTET_STREAM}; /// /// #[async_trait] /// impl ComponentStarter for MyComponent { -/// async fn start(&mut self) -> Result<(), ComponentStartError> { +/// async fn start(&mut self) -> Result<(), ComponentError> { /// Ok(()) /// } /// } @@ -153,7 +155,7 @@ where Request: DeserializeOwned + Send + Sync + 'static, Response: Serialize + Send + Sync + 'static, { - async fn start(&mut self) { + async fn start(&mut self) -> Result<(), ComponentServerError> { let make_svc = make_service_fn(|_conn| { let local_client = self.local_client.clone(); async { @@ -163,6 +165,10 @@ where } }); - Server::bind(&self.socket.clone()).serve(make_svc).await.unwrap(); + Server::bind(&self.socket.clone()) + .serve(make_svc) + .await + .map_err(|err| ComponentServerError::HttpServerStartError(err.to_string()))?; + Ok(()) } } diff --git a/crates/mempool_infra/src/errors.rs b/crates/mempool_infra/src/errors.rs new file mode 100644 index 0000000000..23891e3366 --- /dev/null +++ b/crates/mempool_infra/src/errors.rs @@ -0,0 +1,21 @@ +use thiserror::Error; + +#[derive(Error, Debug, PartialEq, Clone)] +pub enum ComponentError { + #[error("Error in the component configuration.")] + ComponentConfigError, + #[error("An internal component error.")] + InternalComponentError, +} + +#[derive(Error, Debug, PartialEq, Clone)] +pub enum ComponentServerError { + #[error("Server has already been started.")] + ServerAlreadyStarted, + #[error("Http server has failed: {0}.")] + HttpServerStartError(String), + #[error(transparent)] + ComponentError(#[from] ComponentError), + #[error("Server unexpectedly stopped.")] + ServerUnexpectedlyStopped, +} diff --git a/crates/mempool_infra/src/lib.rs b/crates/mempool_infra/src/lib.rs index bdeea651d3..46582f0bb5 100644 --- a/crates/mempool_infra/src/lib.rs +++ b/crates/mempool_infra/src/lib.rs @@ -2,4 +2,5 @@ pub mod component_client; pub mod component_definitions; pub mod component_runner; pub mod component_server; +pub mod errors; pub mod trace_util; diff --git a/crates/mempool_infra/tests/active_local_component_client_server_test.rs b/crates/mempool_infra/tests/active_local_component_client_server_test.rs index 4b30505bc5..4426011fd7 100644 --- a/crates/mempool_infra/tests/active_local_component_client_server_test.rs +++ b/crates/mempool_infra/tests/active_local_component_client_server_test.rs @@ -8,12 +8,13 @@ use starknet_mempool_infra::component_definitions::{ ComponentRequestAndResponseSender, ComponentRequestHandler, }; -use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +use starknet_mempool_infra::component_runner::ComponentStarter; use starknet_mempool_infra::component_server::{ ComponentServerStarter, EmptyServer, LocalActiveComponentServer, }; +use starknet_mempool_infra::errors::ComponentError; use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::{Barrier, Mutex}; use tokio::task; @@ -41,7 +42,7 @@ impl ComponentC { #[async_trait] impl ComponentStarter for ComponentC { - async fn start(&mut self) -> Result<(), ComponentStartError> { + async fn start(&mut self) -> Result<(), ComponentError> { for _ in 0..self.max_iterations { self.c_increment_counter().await; } @@ -99,7 +100,7 @@ impl ComponentD { #[async_trait] impl ComponentStarter for ComponentD { - async fn start(&mut self) -> Result<(), ComponentStartError> { + async fn start(&mut self) -> Result<(), ComponentError> { for _ in 0..self.max_iterations { self.d_increment_counter().await; } @@ -177,11 +178,11 @@ async fn test_setup_c_d() { let mut component_d_server = EmptyServer::new(component_d); task::spawn(async move { - component_c_server.start().await; + let _ = component_c_server.start().await; }); task::spawn(async move { - component_d_server.start().await; + let _ = component_d_server.start().await; }); // Wait for the components to finish incrementing of the ComponentC::counter and verify it. diff --git a/crates/mempool_infra/tests/local_component_client_server_test.rs b/crates/mempool_infra/tests/local_component_client_server_test.rs index 09ff93f0f7..2250d5ec1c 100644 --- a/crates/mempool_infra/tests/local_component_client_server_test.rs +++ b/crates/mempool_infra/tests/local_component_client_server_test.rs @@ -99,11 +99,11 @@ async fn test_setup() { let mut component_b_server = LocalComponentServer::new(component_b, rx_b); task::spawn(async move { - component_a_server.start().await; + let _ = component_a_server.start().await; }); task::spawn(async move { - component_b_server.start().await; + let _ = component_b_server.start().await; }); test_a_b_functionality(a_client, b_client, expected_value).await; diff --git a/crates/mempool_infra/tests/remote_component_client_server_test.rs b/crates/mempool_infra/tests/remote_component_client_server_test.rs index a1a032a763..740a98c887 100644 --- a/crates/mempool_infra/tests/remote_component_client_server_test.rs +++ b/crates/mempool_infra/tests/remote_component_client_server_test.rs @@ -190,18 +190,18 @@ async fn setup_for_tests(setup_value: ValueB, a_port: u16, b_port: u16) { RemoteComponentServer::new(b_local_client, LOCAL_IP, b_port); task::spawn(async move { - component_a_local_server.start().await; + let _ = component_a_local_server.start().await; }); task::spawn(async move { - component_b_local_server.start().await; + let _ = component_b_local_server.start().await; }); task::spawn(async move { - component_a_remote_server.start().await; + let _ = component_a_remote_server.start().await; }); task::spawn(async move { - component_b_remote_server.start().await; + let _ = component_b_remote_server.start().await; }); // Todo(uriel): Get rid of this diff --git a/crates/mempool_node/src/servers.rs b/crates/mempool_node/src/servers.rs index bde1a8698a..6ce2884507 100644 --- a/crates/mempool_node/src/servers.rs +++ b/crates/mempool_node/src/servers.rs @@ -11,6 +11,7 @@ use starknet_gateway::communication::{create_gateway_server, GatewayServer}; use starknet_http_server::communication::{create_http_server, HttpServer}; use starknet_mempool::communication::{create_mempool_server, MempoolServer}; use starknet_mempool_infra::component_server::ComponentServerStarter; +use starknet_mempool_infra::errors::ComponentServerError; use tracing::error; use crate::communication::MempoolNodeCommunication; @@ -113,7 +114,7 @@ pub async fn run_component_servers( let http_server_handle = tokio::spawn(http_server_future); let mempool_handle = tokio::spawn(mempool_future); - tokio::select! { + let result = tokio::select! { res = batcher_handle => { error!("Batcher Server stopped."); res? @@ -137,14 +138,14 @@ pub async fn run_component_servers( }; error!("Servers ended with unexpected Ok."); - Ok(()) + Ok(result?) } pub fn get_server_future( name: &str, execute_flag: bool, server: Option>, -) -> Pin + Send>> { +) -> Pin> + Send>> { let server_future = match execute_flag { true => { let mut server = match server { diff --git a/crates/mempool_p2p/src/receiver/mod.rs b/crates/mempool_p2p/src/receiver/mod.rs index 689096849b..ef32b3c1b8 100644 --- a/crates/mempool_p2p/src/receiver/mod.rs +++ b/crates/mempool_p2p/src/receiver/mod.rs @@ -1,11 +1,12 @@ use async_trait::async_trait; -use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +use starknet_mempool_infra::component_runner::ComponentStarter; +use starknet_mempool_infra::errors::ComponentError; pub struct MempoolP2pReceiver; #[async_trait] impl ComponentStarter for MempoolP2pReceiver { - async fn start(&mut self) -> Result<(), ComponentStartError> { + async fn start(&mut self) -> Result<(), ComponentError> { unimplemented!() } } diff --git a/crates/tests-integration/src/integration_test_setup.rs b/crates/tests-integration/src/integration_test_setup.rs index f2cadb9d3d..535cc1eb39 100644 --- a/crates/tests-integration/src/integration_test_setup.rs +++ b/crates/tests-integration/src/integration_test_setup.rs @@ -7,6 +7,7 @@ use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_gateway_types::errors::GatewaySpecError; use starknet_http_server::config::HttpServerConfig; +use starknet_mempool_infra::errors::ComponentServerError; use starknet_mempool_infra::trace_util::configure_tracing; use starknet_mempool_node::servers::get_server_future; use starknet_mempool_node::utils::create_clients_servers_from_config; @@ -22,9 +23,9 @@ pub struct IntegrationTestSetup { pub task_executor: TokioExecutor, pub http_test_client: HttpTestClient, pub batcher: MockBatcher, - pub gateway_handle: JoinHandle<()>, - pub http_server_handle: JoinHandle<()>, - pub mempool_handle: JoinHandle<()>, + pub gateway_handle: JoinHandle>, + pub http_server_handle: JoinHandle>, + pub mempool_handle: JoinHandle>, } impl IntegrationTestSetup {