From 52be37b77d67ecd3f429967f8249998d607f1b17 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Wed, 18 Sep 2024 20:15:09 +0300 Subject: [PATCH] refactor(node): move main to its own file and keep run infrastructure reusable --- crates/papyrus_node/src/lib.rs | 1 + crates/papyrus_node/src/main.rs | 540 +---------------- crates/papyrus_node/src/run.rs | 541 ++++++++++++++++++ .../src/{main_test.rs => run_test.rs} | 9 +- 4 files changed, 551 insertions(+), 540 deletions(-) create mode 100644 crates/papyrus_node/src/run.rs rename crates/papyrus_node/src/{main_test.rs => run_test.rs} (92%) diff --git a/crates/papyrus_node/src/lib.rs b/crates/papyrus_node/src/lib.rs index 924916d963..b2fc83392d 100644 --- a/crates/papyrus_node/src/lib.rs +++ b/crates/papyrus_node/src/lib.rs @@ -6,4 +6,5 @@ pub mod config; #[cfg(test)] mod precision_test; +pub mod run; pub mod version; diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 20c895cf8e..3af3d3e536 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -1,544 +1,8 @@ -#[cfg(test)] -mod main_test; - use std::env::args; -use std::future::pending; -use std::process::exit; -use std::sync::Arc; -use std::time::Duration; -use futures::stream::StreamExt; -use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; -use papyrus_common::metrics::COLLECT_PROFILING_METRICS; -use papyrus_common::pending_classes::PendingClasses; -use papyrus_common::BlockHashAndNumber; -use papyrus_config::presentation::get_config_presentation; -use papyrus_config::validators::config_validate; use papyrus_config::ConfigError; -use papyrus_consensus::config::ConsensusConfig; -use papyrus_consensus::simulation_network_receiver::NetworkReceiver; -use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; -use papyrus_monitoring_gateway::MonitoringServer; -use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; -use papyrus_network::{network_manager, NetworkConfig}; use papyrus_node::config::NodeConfig; -use papyrus_node::version::VERSION_FULL; -use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; -use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; -use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; -#[cfg(feature = "rpc")] -use papyrus_rpc::run_server; -use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; -use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerSource}; -use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig}; -use papyrus_sync::sources::pending::PendingSource; -use papyrus_sync::{StateSync, SyncConfig}; -use starknet_api::block::{BlockHash, BlockNumber}; -use starknet_api::felt; -use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; -use starknet_client::reader::PendingData; -use tokio::sync::RwLock; -use tokio::task::JoinHandle; -use tracing::metadata::LevelFilter; -use tracing::{debug, debug_span, error, info, warn, Instrument}; -use tracing_subscriber::prelude::*; -use tracing_subscriber::{fmt, EnvFilter}; - -// TODO(yair): Add to config. -const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO; - -// TODO(shahak): Consider adding genesis hash to the config to support chains that have -// different genesis hash. -// TODO: Consider moving to a more general place. -const GENESIS_HASH: &str = "0x0"; - -// TODO(dvir): add this to config. -// Duration between updates to the storage metrics (those in the collect_storage_metrics function). -const STORAGE_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); - -pub struct PapyrusResources { - pub storage_reader: StorageReader, - pub storage_writer: StorageWriter, - pub maybe_network_manager: Option, - pub local_peer_id: String, - pub shared_highest_block: Arc>>, - pub pending_data: Arc>, - pub pending_classes: Arc>, -} - -/// Struct which allows configuring how the node will run. -/// - If left `None`, the task will be spawn with its default (prod) configuration. -/// - If set to Some, that variant of the task will be run, and the default ignored. -/// - If you want to disable a task set it to `Some(tokio::spawn(pending()))`. -#[derive(Default)] -pub struct PapyrusTaskHandles { - pub storage_metrics_handle: Option>>, - pub rpc_server_handle: Option>>, - pub sync_client_handle: Option>>, - pub monitoring_server_handle: Option>>, - pub p2p_sync_server_handle: Option>>, - pub consensus_handle: Option>>, - pub network_handle: Option>>, -} - -impl PapyrusResources { - pub fn new(config: &NodeConfig) -> anyhow::Result { - let (storage_reader, storage_writer) = open_storage(config.storage.clone())?; - let (maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?; - let shared_highest_block = Arc::new(RwLock::new(None)); - let pending_data = Arc::new(RwLock::new(PendingData { - // The pending data might change later to DeprecatedPendingBlock, depending on the - // response from the feeder gateway. - block: PendingBlockOrDeprecated::Current(PendingBlock { - parent_block_hash: BlockHash(felt!(GENESIS_HASH)), - ..Default::default() - }), - ..Default::default() - })); - let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); - Ok(Self { - storage_reader, - storage_writer, - maybe_network_manager, - local_peer_id, - shared_highest_block, - pending_data, - pending_classes, - }) - } -} - -fn build_network_manager( - network_config: Option, -) -> anyhow::Result<(Option, String)> { - let Some(network_config) = network_config else { - return Ok((None, "".to_string())); - }; - let network_manager = network_manager::NetworkManager::new( - network_config.clone(), - Some(VERSION_FULL.to_string()), - ); - let local_peer_id = network_manager.get_local_peer_id(); - - Ok((Some(network_manager), local_peer_id)) -} - -#[cfg(feature = "rpc")] -async fn spawn_rpc_server( - config: &NodeConfig, - shared_highest_block: Arc>>, - pending_data: Arc>, - pending_classes: Arc>, - storage_reader: StorageReader, -) -> anyhow::Result>> { - let (_, server_handle) = run_server( - &config.rpc, - shared_highest_block, - pending_data, - pending_classes, - storage_reader, - VERSION_FULL, - ) - .await?; - Ok(tokio::spawn(async move { - server_handle.stopped().await; - Ok(()) - })) -} - -#[cfg(not(feature = "rpc"))] -async fn spawn_rpc_server( - _config: &NodeConfig, - _shared_highest_block: Arc>>, - _pending_data: Arc>, - _pending_classes: Arc>, - _storage_reader: StorageReader, -) -> anyhow::Result>> { - Ok(tokio::spawn(pending())) -} - -fn spawn_monitoring_server( - storage_reader: StorageReader, - local_peer_id: String, - config: &NodeConfig, -) -> anyhow::Result>> { - let monitoring_server = MonitoringServer::new( - config.monitoring_gateway.clone(), - get_config_presentation(config, true)?, - get_config_presentation(config, false)?, - storage_reader, - VERSION_FULL, - local_peer_id, - )?; - Ok(tokio::spawn(async move { Ok(monitoring_server.run_server().await?) })) -} - -fn spawn_consensus( - config: Option<&ConsensusConfig>, - storage_reader: StorageReader, - network_manager: Option<&mut NetworkManager>, -) -> anyhow::Result>> { - let (Some(config), Some(network_manager)) = (config, network_manager) else { - info!("Consensus is disabled."); - return Ok(tokio::spawn(pending())); - }; - let config = config.clone(); - debug!("Consensus configuration: {config:?}"); - - let network_channels = network_manager - .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; - // TODO(matan): connect this to an actual channel. - if let Some(test_config) = config.test.as_ref() { - let sync_channels = network_manager - .register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?; - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender.clone(), - config.num_validators, - Some(sync_channels.messages_to_broadcast_sender), - ); - let network_receiver = NetworkReceiver::new( - network_channels.broadcasted_messages_receiver, - test_config.cache_size, - test_config.random_seed, - test_config.drop_probability, - test_config.invalid_probability, - ); - let broadcast_channels = BroadcastTopicChannels { - messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender, - broadcasted_messages_receiver: Box::new(network_receiver), - reported_messages_sender: network_channels.reported_messages_sender, - continue_propagation_sender: network_channels.continue_propagation_sender, - }; - let sync_receiver = - sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { - BlockNumber(vote.expect("Sync channel should never have errors").height) - }); - Ok(tokio::spawn(async move { - Ok(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - broadcast_channels, - sync_receiver, - ) - .await?) - })) - } else { - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender.clone(), - config.num_validators, - None, - ); - Ok(tokio::spawn(async move { - Ok(papyrus_consensus::run_consensus( - context, - config.start_height, - config.validator_id, - config.consensus_delay, - config.timeouts.clone(), - network_channels, - futures::stream::pending(), - ) - .await?) - })) - } -} - -async fn run_sync( - configs: (SyncConfig, CentralSourceConfig, EthereumBaseLayerConfig), - shared_highest_block: Arc>>, - pending_data: Arc>, - pending_classes: Arc>, - storage: (StorageReader, StorageWriter), -) -> anyhow::Result<()> { - let (sync_config, central_config, base_layer_config) = configs; - let (storage_reader, storage_writer) = storage; - let central_source = - CentralSource::new(central_config.clone(), VERSION_FULL, storage_reader.clone()) - .map_err(CentralError::ClientCreation)?; - let pending_source = - PendingSource::new(central_config, VERSION_FULL).map_err(CentralError::ClientCreation)?; - let base_layer_source = EthereumBaseLayerSource::new(base_layer_config) - .map_err(|e| BaseLayerSourceError::BaseLayerSourceCreationError(e.to_string()))?; - let mut sync = StateSync::new( - sync_config, - shared_highest_block, - pending_data, - pending_classes, - central_source, - pending_source, - base_layer_source, - storage_reader.clone(), - storage_writer, - ); - Ok(sync.run().await?) -} - -async fn spawn_sync_client( - maybe_network_manager: Option<&mut NetworkManager>, - storage_reader: StorageReader, - storage_writer: StorageWriter, - config: &NodeConfig, - shared_highest_block: Arc>>, - pending_data: Arc>, - pending_classes: Arc>, -) -> JoinHandle> { - match (config.sync, config.p2p_sync) { - (Some(_), Some(_)) => { - panic!("One of --sync.#is_none or --p2p_sync.#is_none must be turned on"); - } - (None, None) => tokio::spawn(pending()), - (Some(sync_config), None) => { - let configs = (sync_config, config.central.clone(), config.base_layer.clone()); - let storage = (storage_reader.clone(), storage_writer); - tokio::spawn(run_sync( - configs, - shared_highest_block, - pending_data, - pending_classes, - storage, - )) - } - (None, Some(p2p_sync_client_config)) => { - let network_manager = maybe_network_manager - .expect("If p2p sync is enabled, network needs to be enabled too"); - let header_client_sender = network_manager - .register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); - let state_diff_client_sender = network_manager - .register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE); - let transaction_client_sender = network_manager - .register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE); - let class_client_sender = - network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE); - let p2p_sync_client_channels = P2PSyncClientChannels::new( - header_client_sender, - state_diff_client_sender, - transaction_client_sender, - class_client_sender, - ); - let p2p_sync = P2PSyncClient::new( - p2p_sync_client_config, - storage_reader, - storage_writer, - p2p_sync_client_channels, - ); - tokio::spawn(async move { Ok(p2p_sync.run().await?) }) - } - } -} - -fn spawn_p2p_sync_server( - network_manager: Option<&mut NetworkManager>, - storage_reader: StorageReader, -) -> JoinHandle> { - let Some(network_manager) = network_manager else { - info!("P2P Sync is disabled."); - return tokio::spawn(pending()); - }; - - let header_server_receiver = network_manager - .register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); - let state_diff_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE); - let transaction_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE); - let class_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE); - let event_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); - - let p2p_sync_server_channels = P2PSyncServerChannels::new( - header_server_receiver, - state_diff_server_receiver, - transaction_server_receiver, - class_server_receiver, - event_server_receiver, - ); - - let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); - tokio::spawn(async move { - p2p_sync_server.run().await; - Ok(()) - }) -} - -async fn run_threads( - config: NodeConfig, - mut resources: PapyrusResources, - tasks: PapyrusTaskHandles, -) -> anyhow::Result<()> { - let consensus_handle = if let Some(handle) = tasks.consensus_handle { - handle - } else { - spawn_consensus( - config.consensus.as_ref(), - resources.storage_reader.clone(), - resources.maybe_network_manager.as_mut(), - )? - }; - - let storage_metrics_handle = if let Some(handle) = tasks.storage_metrics_handle { - handle - } else { - spawn_storage_metrics_collector( - config.monitoring_gateway.collect_metrics, - resources.storage_reader.clone(), - STORAGE_METRICS_UPDATE_INTERVAL, - ) - }; - // Monitoring server. - let monitoring_server_handle = if let Some(handle) = tasks.monitoring_server_handle { - handle - } else { - spawn_monitoring_server( - resources.storage_reader.clone(), - resources.local_peer_id.clone(), - &config, - )? - }; - - // JSON-RPC server. - let rpc_server_handle = if let Some(handle) = tasks.rpc_server_handle { - handle - } else { - spawn_rpc_server( - &config, - resources.shared_highest_block.clone(), - resources.pending_data.clone(), - resources.pending_classes.clone(), - resources.storage_reader.clone(), - ) - .await? - }; - - // P2P Sync Server task. - let p2p_sync_server_handle = if let Some(handle) = tasks.p2p_sync_server_handle { - handle - } else { - spawn_p2p_sync_server( - resources.maybe_network_manager.as_mut(), - resources.storage_reader.clone(), - ) - }; - - // Sync task. - let sync_client_handle = if let Some(handle) = tasks.sync_client_handle { - handle - } else { - spawn_sync_client( - resources.maybe_network_manager.as_mut(), - resources.storage_reader, - resources.storage_writer, - &config, - resources.shared_highest_block, - resources.pending_data, - resources.pending_classes, - ) - .await - }; - - // Created last since it consumes the network manager. - let network_handle = if let Some(handle) = tasks.network_handle { - handle - } else { - match resources.maybe_network_manager { - Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }), - None => tokio::spawn(pending()), - } - }; - tokio::select! { - res = storage_metrics_handle => { - error!("collecting storage metrics stopped."); - res?? - } - res = rpc_server_handle => { - error!("RPC server stopped."); - res?? - } - res = monitoring_server_handle => { - error!("Monitoring server stopped."); - res?? - } - res = sync_client_handle => { - error!("Sync stopped."); - res?? - } - res = p2p_sync_server_handle => { - error!("P2P Sync server stopped"); - res?? - } - res = network_handle => { - error!("Network stopped."); - res?? - } - res = consensus_handle => { - error!("Consensus stopped."); - res?? - } - }; - error!("Task ended with unexpected Ok."); - Ok(()) -} - -// TODO(yair): add dynamic level filtering. -// TODO(dan): filter out logs from dependencies (happens when RUST_LOG=DEBUG) -// TODO(yair): define and implement configurable filtering. -fn configure_tracing() { - let fmt_layer = fmt::layer().compact().with_target(false); - let level_filter_layer = - EnvFilter::builder().with_default_directive(DEFAULT_LEVEL.into()).from_env_lossy(); - - // This sets a single subscriber to all of the threads. We may want to implement different - // subscriber for some threads and use set_global_default instead of init. - tracing_subscriber::registry().with(fmt_layer).with(level_filter_layer).init(); -} - -fn spawn_storage_metrics_collector( - collect_metrics: bool, - storage_reader: StorageReader, - interval: Duration, -) -> JoinHandle> { - if !collect_metrics { - return tokio::spawn(pending()); - } - - tokio::spawn( - async move { - loop { - if let Err(error) = update_storage_metrics(&storage_reader) { - warn!("Failed to update storage metrics: {error}"); - } - tokio::time::sleep(interval).await; - } - } - .instrument(debug_span!("collect_storage_metrics")), - ) -} - -pub async fn run( - config: NodeConfig, - resources: PapyrusResources, - tasks: PapyrusTaskHandles, -) -> anyhow::Result<()> { - configure_tracing(); - - if let Err(errors) = config_validate(&config) { - error!("{}", errors); - exit(1); - } - - COLLECT_PROFILING_METRICS - .set(config.collect_profiling_metrics) - .expect("This should be the first and only time we set this value."); - - info!("Booting up."); - run_threads(config, resources, tasks).await -} +use papyrus_node::run::{run, PapyrusResources, PapyrusTaskHandles}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -550,5 +14,5 @@ async fn main() -> anyhow::Result<()> { let resources = PapyrusResources::new(&config)?; let tasks = PapyrusTaskHandles::default(); - run_threads(config, resources, tasks).await + run(config, resources, tasks).await } diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs new file mode 100644 index 0000000000..1b47cf2d05 --- /dev/null +++ b/crates/papyrus_node/src/run.rs @@ -0,0 +1,541 @@ +#[cfg(test)] +#[path = "run_test.rs"] +mod run_test; + +use std::future::pending; +use std::process::exit; +use std::sync::Arc; +use std::time::Duration; + +use futures::stream::StreamExt; +use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; +use papyrus_common::metrics::COLLECT_PROFILING_METRICS; +use papyrus_common::pending_classes::PendingClasses; +use papyrus_common::BlockHashAndNumber; +use papyrus_config::presentation::get_config_presentation; +use papyrus_config::validators::config_validate; +use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus::simulation_network_receiver::NetworkReceiver; +use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; +use papyrus_monitoring_gateway::MonitoringServer; +use papyrus_network::gossipsub_impl::Topic; +use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; +use papyrus_network::{network_manager, NetworkConfig}; +use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; +use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; +use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; +#[cfg(feature = "rpc")] +use papyrus_rpc::run_server; +use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; +use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerSource}; +use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig}; +use papyrus_sync::sources::pending::PendingSource; +use papyrus_sync::{StateSync, SyncConfig}; +use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::felt; +use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; +use starknet_client::reader::PendingData; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tracing::metadata::LevelFilter; +use tracing::{debug, debug_span, error, info, warn, Instrument}; +use tracing_subscriber::prelude::*; +use tracing_subscriber::{fmt, EnvFilter}; + +use crate::config::NodeConfig; +use crate::version::VERSION_FULL; + +// TODO(yair): Add to config. +const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO; + +// TODO(shahak): Consider adding genesis hash to the config to support chains that have +// different genesis hash. +// TODO: Consider moving to a more general place. +const GENESIS_HASH: &str = "0x0"; + +// TODO(dvir): add this to config. +// Duration between updates to the storage metrics (those in the collect_storage_metrics function). +const STORAGE_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); + +pub struct PapyrusResources { + pub storage_reader: StorageReader, + pub storage_writer: StorageWriter, + pub maybe_network_manager: Option, + pub local_peer_id: String, + pub shared_highest_block: Arc>>, + pub pending_data: Arc>, + pub pending_classes: Arc>, +} + +/// Struct which allows configuring how the node will run. +/// - If left `None`, the task will be spawn with its default (prod) configuration. +/// - If set to Some, that variant of the task will be run, and the default ignored. +/// - If you want to disable a task set it to `Some(tokio::spawn(pending()))`. +#[derive(Default)] +pub struct PapyrusTaskHandles { + pub storage_metrics_handle: Option>>, + pub rpc_server_handle: Option>>, + pub sync_client_handle: Option>>, + pub monitoring_server_handle: Option>>, + pub p2p_sync_server_handle: Option>>, + pub consensus_handle: Option>>, + pub network_handle: Option>>, +} + +impl PapyrusResources { + pub fn new(config: &NodeConfig) -> anyhow::Result { + let (storage_reader, storage_writer) = open_storage(config.storage.clone())?; + let (maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?; + let shared_highest_block = Arc::new(RwLock::new(None)); + let pending_data = Arc::new(RwLock::new(PendingData { + // The pending data might change later to DeprecatedPendingBlock, depending on the + // response from the feeder gateway. + block: PendingBlockOrDeprecated::Current(PendingBlock { + parent_block_hash: BlockHash(felt!(GENESIS_HASH)), + ..Default::default() + }), + ..Default::default() + })); + let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); + Ok(Self { + storage_reader, + storage_writer, + maybe_network_manager, + local_peer_id, + shared_highest_block, + pending_data, + pending_classes, + }) + } +} + +fn build_network_manager( + network_config: Option, +) -> anyhow::Result<(Option, String)> { + let Some(network_config) = network_config else { + return Ok((None, "".to_string())); + }; + let network_manager = network_manager::NetworkManager::new( + network_config.clone(), + Some(VERSION_FULL.to_string()), + ); + let local_peer_id = network_manager.get_local_peer_id(); + + Ok((Some(network_manager), local_peer_id)) +} + +#[cfg(feature = "rpc")] +async fn spawn_rpc_server( + config: &NodeConfig, + shared_highest_block: Arc>>, + pending_data: Arc>, + pending_classes: Arc>, + storage_reader: StorageReader, +) -> anyhow::Result>> { + let (_, server_handle) = run_server( + &config.rpc, + shared_highest_block, + pending_data, + pending_classes, + storage_reader, + VERSION_FULL, + ) + .await?; + Ok(tokio::spawn(async move { + server_handle.stopped().await; + Ok(()) + })) +} + +#[cfg(not(feature = "rpc"))] +async fn spawn_rpc_server( + _config: &NodeConfig, + _shared_highest_block: Arc>>, + _pending_data: Arc>, + _pending_classes: Arc>, + _storage_reader: StorageReader, +) -> anyhow::Result>> { + Ok(tokio::spawn(pending())) +} + +fn spawn_monitoring_server( + storage_reader: StorageReader, + local_peer_id: String, + config: &NodeConfig, +) -> anyhow::Result>> { + let monitoring_server = MonitoringServer::new( + config.monitoring_gateway.clone(), + get_config_presentation(config, true)?, + get_config_presentation(config, false)?, + storage_reader, + VERSION_FULL, + local_peer_id, + )?; + Ok(tokio::spawn(async move { Ok(monitoring_server.run_server().await?) })) +} + +fn spawn_consensus( + config: Option<&ConsensusConfig>, + storage_reader: StorageReader, + network_manager: Option<&mut NetworkManager>, +) -> anyhow::Result>> { + let (Some(config), Some(network_manager)) = (config, network_manager) else { + info!("Consensus is disabled."); + return Ok(tokio::spawn(pending())); + }; + let config = config.clone(); + debug!("Consensus configuration: {config:?}"); + + let network_channels = network_manager + .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; + // TODO(matan): connect this to an actual channel. + if let Some(test_config) = config.test.as_ref() { + let sync_channels = network_manager + .register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?; + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender.clone(), + config.num_validators, + Some(sync_channels.messages_to_broadcast_sender), + ); + let network_receiver = NetworkReceiver::new( + network_channels.broadcasted_messages_receiver, + test_config.cache_size, + test_config.random_seed, + test_config.drop_probability, + test_config.invalid_probability, + ); + let broadcast_channels = BroadcastTopicChannels { + messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender, + broadcasted_messages_receiver: Box::new(network_receiver), + reported_messages_sender: network_channels.reported_messages_sender, + continue_propagation_sender: network_channels.continue_propagation_sender, + }; + let sync_receiver = + sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { + BlockNumber(vote.expect("Sync channel should never have errors").height) + }); + Ok(tokio::spawn(async move { + Ok(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + config.timeouts.clone(), + broadcast_channels, + sync_receiver, + ) + .await?) + })) + } else { + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender.clone(), + config.num_validators, + None, + ); + Ok(tokio::spawn(async move { + Ok(papyrus_consensus::run_consensus( + context, + config.start_height, + config.validator_id, + config.consensus_delay, + config.timeouts.clone(), + network_channels, + futures::stream::pending(), + ) + .await?) + })) + } +} + +async fn run_sync( + configs: (SyncConfig, CentralSourceConfig, EthereumBaseLayerConfig), + shared_highest_block: Arc>>, + pending_data: Arc>, + pending_classes: Arc>, + storage: (StorageReader, StorageWriter), +) -> anyhow::Result<()> { + let (sync_config, central_config, base_layer_config) = configs; + let (storage_reader, storage_writer) = storage; + let central_source = + CentralSource::new(central_config.clone(), VERSION_FULL, storage_reader.clone()) + .map_err(CentralError::ClientCreation)?; + let pending_source = + PendingSource::new(central_config, VERSION_FULL).map_err(CentralError::ClientCreation)?; + let base_layer_source = EthereumBaseLayerSource::new(base_layer_config) + .map_err(|e| BaseLayerSourceError::BaseLayerSourceCreationError(e.to_string()))?; + let mut sync = StateSync::new( + sync_config, + shared_highest_block, + pending_data, + pending_classes, + central_source, + pending_source, + base_layer_source, + storage_reader.clone(), + storage_writer, + ); + Ok(sync.run().await?) +} + +async fn spawn_sync_client( + maybe_network_manager: Option<&mut NetworkManager>, + storage_reader: StorageReader, + storage_writer: StorageWriter, + config: &NodeConfig, + shared_highest_block: Arc>>, + pending_data: Arc>, + pending_classes: Arc>, +) -> JoinHandle> { + match (config.sync, config.p2p_sync) { + (Some(_), Some(_)) => { + panic!("One of --sync.#is_none or --p2p_sync.#is_none must be turned on"); + } + (None, None) => tokio::spawn(pending()), + (Some(sync_config), None) => { + let configs = (sync_config, config.central.clone(), config.base_layer.clone()); + let storage = (storage_reader.clone(), storage_writer); + tokio::spawn(run_sync( + configs, + shared_highest_block, + pending_data, + pending_classes, + storage, + )) + } + (None, Some(p2p_sync_client_config)) => { + let network_manager = maybe_network_manager + .expect("If p2p sync is enabled, network needs to be enabled too"); + let header_client_sender = network_manager + .register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); + let state_diff_client_sender = network_manager + .register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE); + let transaction_client_sender = network_manager + .register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE); + let class_client_sender = + network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE); + let p2p_sync_client_channels = P2PSyncClientChannels::new( + header_client_sender, + state_diff_client_sender, + transaction_client_sender, + class_client_sender, + ); + let p2p_sync = P2PSyncClient::new( + p2p_sync_client_config, + storage_reader, + storage_writer, + p2p_sync_client_channels, + ); + tokio::spawn(async move { Ok(p2p_sync.run().await?) }) + } + } +} + +fn spawn_p2p_sync_server( + network_manager: Option<&mut NetworkManager>, + storage_reader: StorageReader, +) -> JoinHandle> { + let Some(network_manager) = network_manager else { + info!("P2P Sync is disabled."); + return tokio::spawn(pending()); + }; + + let header_server_receiver = network_manager + .register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); + let state_diff_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE); + let transaction_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE); + let class_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE); + let event_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); + + let p2p_sync_server_channels = P2PSyncServerChannels::new( + header_server_receiver, + state_diff_server_receiver, + transaction_server_receiver, + class_server_receiver, + event_server_receiver, + ); + + let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); + tokio::spawn(async move { + p2p_sync_server.run().await; + Ok(()) + }) +} + +async fn run_threads( + config: NodeConfig, + mut resources: PapyrusResources, + tasks: PapyrusTaskHandles, +) -> anyhow::Result<()> { + let consensus_handle = if let Some(handle) = tasks.consensus_handle { + handle + } else { + spawn_consensus( + config.consensus.as_ref(), + resources.storage_reader.clone(), + resources.maybe_network_manager.as_mut(), + )? + }; + + let storage_metrics_handle = if let Some(handle) = tasks.storage_metrics_handle { + handle + } else { + spawn_storage_metrics_collector( + config.monitoring_gateway.collect_metrics, + resources.storage_reader.clone(), + STORAGE_METRICS_UPDATE_INTERVAL, + ) + }; + // Monitoring server. + let monitoring_server_handle = if let Some(handle) = tasks.monitoring_server_handle { + handle + } else { + spawn_monitoring_server( + resources.storage_reader.clone(), + resources.local_peer_id.clone(), + &config, + )? + }; + + // JSON-RPC server. + let rpc_server_handle = if let Some(handle) = tasks.rpc_server_handle { + handle + } else { + spawn_rpc_server( + &config, + resources.shared_highest_block.clone(), + resources.pending_data.clone(), + resources.pending_classes.clone(), + resources.storage_reader.clone(), + ) + .await? + }; + + // P2P Sync Server task. + let p2p_sync_server_handle = if let Some(handle) = tasks.p2p_sync_server_handle { + handle + } else { + spawn_p2p_sync_server( + resources.maybe_network_manager.as_mut(), + resources.storage_reader.clone(), + ) + }; + + // Sync task. + let sync_client_handle = if let Some(handle) = tasks.sync_client_handle { + handle + } else { + spawn_sync_client( + resources.maybe_network_manager.as_mut(), + resources.storage_reader, + resources.storage_writer, + &config, + resources.shared_highest_block, + resources.pending_data, + resources.pending_classes, + ) + .await + }; + + // Created last since it consumes the network manager. + let network_handle = if let Some(handle) = tasks.network_handle { + handle + } else { + match resources.maybe_network_manager { + Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }), + None => tokio::spawn(pending()), + } + }; + tokio::select! { + res = storage_metrics_handle => { + error!("collecting storage metrics stopped."); + res?? + } + res = rpc_server_handle => { + error!("RPC server stopped."); + res?? + } + res = monitoring_server_handle => { + error!("Monitoring server stopped."); + res?? + } + res = sync_client_handle => { + error!("Sync stopped."); + res?? + } + res = p2p_sync_server_handle => { + error!("P2P Sync server stopped"); + res?? + } + res = network_handle => { + error!("Network stopped."); + res?? + } + res = consensus_handle => { + error!("Consensus stopped."); + res?? + } + }; + error!("Task ended with unexpected Ok."); + Ok(()) +} + +// TODO(yair): add dynamic level filtering. +// TODO(dan): filter out logs from dependencies (happens when RUST_LOG=DEBUG) +// TODO(yair): define and implement configurable filtering. +fn configure_tracing() { + let fmt_layer = fmt::layer().compact().with_target(false); + let level_filter_layer = + EnvFilter::builder().with_default_directive(DEFAULT_LEVEL.into()).from_env_lossy(); + + // This sets a single subscriber to all of the threads. We may want to implement different + // subscriber for some threads and use set_global_default instead of init. + tracing_subscriber::registry().with(fmt_layer).with(level_filter_layer).init(); +} + +fn spawn_storage_metrics_collector( + collect_metrics: bool, + storage_reader: StorageReader, + interval: Duration, +) -> JoinHandle> { + if !collect_metrics { + return tokio::spawn(pending()); + } + + tokio::spawn( + async move { + loop { + if let Err(error) = update_storage_metrics(&storage_reader) { + warn!("Failed to update storage metrics: {error}"); + } + tokio::time::sleep(interval).await; + } + } + .instrument(debug_span!("collect_storage_metrics")), + ) +} + +pub async fn run( + config: NodeConfig, + resources: PapyrusResources, + tasks: PapyrusTaskHandles, +) -> anyhow::Result<()> { + configure_tracing(); + + if let Err(errors) = config_validate(&config) { + error!("{}", errors); + exit(1); + } + + COLLECT_PROFILING_METRICS + .set(config.collect_profiling_metrics) + .expect("This should be the first and only time we set this value."); + + info!("Booting up."); + run_threads(config, resources, tasks).await +} diff --git a/crates/papyrus_node/src/main_test.rs b/crates/papyrus_node/src/run_test.rs similarity index 92% rename from crates/papyrus_node/src/main_test.rs rename to crates/papyrus_node/src/run_test.rs index 366ec89421..c94a658cd0 100644 --- a/crates/papyrus_node/src/main_test.rs +++ b/crates/papyrus_node/src/run_test.rs @@ -1,12 +1,17 @@ use std::time::Duration; use metrics_exporter_prometheus::PrometheusBuilder; -use papyrus_node::config::NodeConfig; use papyrus_storage::{open_storage, StorageConfig}; use papyrus_test_utils::prometheus_is_contained; use tempfile::TempDir; -use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources, PapyrusTaskHandles}; +use crate::config::NodeConfig; +use crate::run::{ + run_threads, + spawn_storage_metrics_collector, + PapyrusResources, + PapyrusTaskHandles, +}; // The mission of this test is to ensure that if an error is returned from one of the spawned tasks, // the node will stop, and this error will be returned. This is done by checking the case of an