From 98a486830c77e9c45c3d64053599f5e9393d7840 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Wed, 18 Sep 2024 19:47:28 +0300 Subject: [PATCH] refactor(node): spawned tasks should all return anyhow result This fairly generic return type should allow for flexibility with test overrides. --- crates/papyrus_node/src/main.rs | 75 ++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index aabcb8215f..1f3a885e4c 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -2,7 +2,7 @@ mod main_test; use std::env::args; -use std::future::{pending, Future}; +use std::future::pending; use std::process::exit; use std::sync::Arc; use std::time::Duration; @@ -18,7 +18,6 @@ use papyrus_config::validators::config_validate; use papyrus_config::ConfigError; use papyrus_consensus::config::ConsensusConfig; use papyrus_consensus::simulation_network_receiver::NetworkReceiver; -use papyrus_consensus::types::ConsensusError; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; @@ -41,7 +40,7 @@ use starknet_api::felt; use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; use starknet_client::reader::PendingData; use tokio::sync::RwLock; -use tokio::task::{JoinError, JoinHandle}; +use tokio::task::JoinHandle; use tracing::metadata::LevelFilter; use tracing::{debug, debug_span, error, info, warn, Instrument}; use tracing_subscriber::prelude::*; @@ -97,13 +96,13 @@ impl PapyrusResources { } #[cfg(feature = "rpc")] -async fn create_rpc_server_future( +async fn spawn_rpc_server( config: &NodeConfig, shared_highest_block: Arc>>, pending_data: Arc>, pending_classes: Arc>, storage_reader: StorageReader, -) -> anyhow::Result>> { +) -> anyhow::Result>> { let (_, server_handle) = run_server( &config.rpc, shared_highest_block, @@ -113,29 +112,33 @@ async fn create_rpc_server_future( VERSION_FULL, ) .await?; - Ok(tokio::spawn(server_handle.stopped())) + Ok(tokio::spawn(async move { + server_handle.stopped().await; + Ok(()) + })) } #[cfg(not(feature = "rpc"))] -async fn create_rpc_server_future( +async fn spawn_rpc_server( _config: &NodeConfig, _shared_highest_block: Arc>>, _pending_data: Arc>, _pending_classes: Arc>, _storage_reader: StorageReader, -) -> anyhow::Result>> { - Ok(pending()) +) -> anyhow::Result>> { + Ok(tokio::spawn(pending())) } -fn run_consensus( +fn spawn_consensus( config: Option<&ConsensusConfig>, storage_reader: StorageReader, network_manager: Option<&mut NetworkManager>, -) -> anyhow::Result>> { +) -> anyhow::Result>> { let (Some(config), Some(network_manager)) = (config, network_manager) else { info!("Consensus is disabled."); return Ok(tokio::spawn(pending())); }; + let config = config.clone(); debug!("Consensus configuration: {config:?}"); let network_channels = network_manager @@ -167,15 +170,18 @@ fn run_consensus( sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { BlockNumber(vote.expect("Sync channel should never have errors").height) }); - Ok(tokio::spawn(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - broadcast_channels, - sync_receiver, - ))) + Ok(tokio::spawn(async move { + Ok(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + config.timeouts.clone(), + broadcast_channels, + sync_receiver, + ) + .await?) + })) } else { let context = PapyrusConsensusContext::new( storage_reader.clone(), @@ -183,15 +189,18 @@ fn run_consensus( config.num_validators, None, ); - Ok(tokio::spawn(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - network_channels, - futures::stream::pending(), - ))) + Ok(tokio::spawn(async move { + Ok(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + config.timeouts.clone(), + network_channels, + futures::stream::pending(), + ) + .await?) + })) } } @@ -314,7 +323,7 @@ fn spawn_p2p_sync_server( } async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> anyhow::Result<()> { - let consensus_handle = run_consensus( + let consensus_handle = spawn_consensus( config.consensus.as_ref(), resources.storage_reader.clone(), resources.maybe_network_manager.as_mut(), @@ -338,7 +347,7 @@ async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> any let monitoring_server_handle = monitoring_server.spawn_server().await; // JSON-RPC server. - let server_handle_future = create_rpc_server_future( + let rpc_server_handle = spawn_rpc_server( &config, resources.shared_highest_block.clone(), resources.pending_data.clone(), @@ -377,9 +386,9 @@ async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> any error!("collecting storage metrics stopped."); res?? } - res = server_handle_future => { + res = rpc_server_handle => { error!("RPC server stopped."); - res? + res?? } res = monitoring_server_handle => { error!("Monitoring server stopped.");