Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(node): remove test code from production code #876

Merged
merged 1 commit into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions config/papyrus/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,36 +99,6 @@
"privacy": "Public",
"value": 0
},
"consensus.test.#is_none": {
"description": "Flag for an optional field.",
"privacy": "TemporaryValue",
"value": true
},
"consensus.test.cache_size": {
"description": "The cache size for the test simulation.",
"privacy": "Public",
"value": 1000
},
"consensus.test.drop_probability": {
"description": "The probability of dropping a message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.invalid_probability": {
"description": "The probability of sending an invalid message.",
"privacy": "Public",
"value": 0.0
},
"consensus.test.random_seed": {
"description": "The random seed for the test simulation to ensure repeatable test results.",
"privacy": "Public",
"value": 0
},
"consensus.test.sync_topic": {
"description": "The network topic for sync messages.",
"privacy": "Public",
"value": "consensus_test_sync"
},
"consensus.timeouts.precommit_timeout": {
"description": "The timeout (seconds) for a precommit.",
"privacy": "Public",
Expand Down
6 changes: 6 additions & 0 deletions crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ normal = ["clap", "papyrus_base_layer", "reqwest", "tokio"]
[features]
default = ["rpc"]
rpc = ["papyrus_rpc"]
testing = []

[[bin]]
name = "central_source_integration_test"
path = "src/bin/central_source_integration_test.rs"
required-features = ["futures-util", "tokio-stream"]

[[bin]]
name = "run_consensus"
path = "src/bin/run_consensus.rs"
required-features = ["testing"]

[dependencies]
anyhow.workspace = true
clap = { workspace = true }
Expand Down
113 changes: 113 additions & 0 deletions crates/papyrus_node/src/bin/run_consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Run a papyrus node with consensus enabled and the ability to simulate network issues for
//! consensus.
use clap::Parser;
use futures::stream::StreamExt;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_node::bin_utils::build_configs;
use papyrus_node::run::{run, PapyrusResources, PapyrusTaskHandles};
use papyrus_p2p_sync::BUFFER_SIZE;
use papyrus_storage::StorageReader;
use starknet_api::block::BlockNumber;
use tokio::task::JoinHandle;

/// Test configuration for consensus.
#[derive(Parser, Debug, Clone, PartialEq)]
pub struct TestConfig {
#[arg(long = "cache_size", help = "The cache size for the test network receiver.")]
pub cache_size: usize,
#[arg(
long = "random_seed",
help = "The random seed for the test simulation to ensure repeatable test results."
)]
pub random_seed: u64,
#[arg(long = "drop_probability", help = "The probability of dropping a message.")]
pub drop_probability: f64,
#[arg(long = "invalid_probability", help = "The probability of sending an invalid message.")]
pub invalid_probability: f64,
#[arg(long = "sync_topic", help = "The network topic for sync messages.")]
pub sync_topic: String,
}

impl Default for TestConfig {
fn default() -> Self {
Self {
cache_size: 1000,
random_seed: 0,
drop_probability: 0.0,
invalid_probability: 0.0,
sync_topic: "consensus_test_sync".to_string(),
}
}
}

fn build_consensus(
consensus_config: ConsensusConfig,
test_config: TestConfig,
storage_reader: StorageReader,
network_manager: &mut NetworkManager,
) -> anyhow::Result<Option<JoinHandle<anyhow::Result<()>>>> {
let network_channels = network_manager.register_broadcast_topic(
Topic::new(consensus_config.network_topic.clone()),
BUFFER_SIZE,
)?;
// TODO(matan): connect this to an actual channel.
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
consensus_config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastTopicChannels {
messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender,
broadcasted_messages_receiver: Box::new(network_receiver),
reported_messages_sender: network_channels.reported_messages_sender,
continue_propagation_sender: network_channels.continue_propagation_sender,
};

Ok(Some(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
consensus_config.start_height,
consensus_config.validator_id,
consensus_config.consensus_delay,
consensus_config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
})))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (test_config, node_config) = build_configs::<TestConfig>()?;

let mut resources = PapyrusResources::new(&node_config)?;

let consensus_handle = build_consensus(
node_config.consensus.clone().unwrap(),
test_config,
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut().unwrap(),
)?;
let tasks = PapyrusTaskHandles { consensus_handle, ..Default::default() };

run(node_config, resources, tasks).await
}
48 changes: 48 additions & 0 deletions crates/papyrus_node/src/bin_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::env::args;

use clap::Parser;
use papyrus_config::ConfigError;

use crate::config::NodeConfig;

// Test arguments passed on the command line are prefixed with `test.<ARG_NAME>`.
const TEST_ARG_PREFIX: &str = "--test.";

/// Split the elements of `input_args` into 2 groups:
/// 1. Those prefixed with "--test."
/// 2. Other.
///
/// Presumes input is: program_name (--flag_name value)*
pub fn split_args(input_args: Vec<String>) -> (Vec<String>, Vec<String>) {
input_args[1..].chunks(2).fold(
(vec![input_args[0].clone()], vec![input_args[0].clone()]),
|(mut matching_args, mut mismatched_args), input_arg| {
let (name, value) = (&input_arg[0], &input_arg[1]);
// String leading `--` for comparison.
if &name[..TEST_ARG_PREFIX.len()] == TEST_ARG_PREFIX {
matching_args.push(format!("--{}", &name[TEST_ARG_PREFIX.len()..]));
matching_args.push(value.clone());
} else {
mismatched_args.push(name.clone());
mismatched_args.push(value.clone());
}
(matching_args, mismatched_args)
},
)
}

/// Build both the node and test configs from the command line arguments.
pub fn build_configs<T: Parser + Default>() -> Result<(T, NodeConfig), ConfigError> {
let input_args = args().collect::<Vec<_>>();
let (test_input_args, node_input_args) = split_args(input_args);

let mut test_config = T::default();
test_config.update_from(test_input_args.iter());

let node_config = NodeConfig::load_and_process(node_input_args);
if let Err(ConfigError::CommandInput(clap_err)) = node_config {
clap_err.exit();
}
let node_config = node_config?;
Ok((test_config, node_config))
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,44 +115,6 @@ expression: dumped_default_config
},
"privacy": "Public"
},
"consensus.test.#is_none": {
"description": "Flag for an optional field.",
"value": true,
"privacy": "TemporaryValue"
},
"consensus.test.cache_size": {
"description": "The cache size for the test simulation.",
"value": {
"$serde_json::private::Number": "1000"
},
"privacy": "Public"
},
"consensus.test.drop_probability": {
"description": "The probability of dropping a message.",
"value": {
"$serde_json::private::Number": "0.0"
},
"privacy": "Public"
},
"consensus.test.invalid_probability": {
"description": "The probability of sending an invalid message.",
"value": {
"$serde_json::private::Number": "0.0"
},
"privacy": "Public"
},
"consensus.test.random_seed": {
"description": "The random seed for the test simulation to ensure repeatable test results.",
"value": {
"$serde_json::private::Number": "0"
},
"privacy": "Public"
},
"consensus.test.sync_topic": {
"description": "The network topic for sync messages.",
"value": "consensus_test_sync",
"privacy": "Public"
},
"consensus.timeouts.precommit_timeout": {
"description": "The timeout (seconds) for a precommit.",
"value": {
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// within this crate
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

#[cfg(any(test, feature = "testing"))]
pub mod bin_utils;
#[allow(unused_imports)]
pub mod config;
#[cfg(test)]
Expand Down
83 changes: 20 additions & 63 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
use papyrus_common::BlockHashAndNumber;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
Expand All @@ -31,7 +29,7 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS
use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig};
use papyrus_sync::sources::pending::PendingSource;
use papyrus_sync::{StateSync, SyncConfig};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block::BlockHash;
use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
Expand Down Expand Up @@ -188,65 +186,24 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
let sync_channels = network_manager
.register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?;
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
Some(sync_channels.messages_to_broadcast_sender),
);
let network_receiver = NetworkReceiver::new(
network_channels.broadcasted_messages_receiver,
test_config.cache_size,
test_config.random_seed,
test_config.drop_probability,
test_config.invalid_probability,
);
let broadcast_channels = BroadcastTopicChannels {
messages_to_broadcast_sender: network_channels.messages_to_broadcast_sender,
broadcasted_messages_receiver: Box::new(network_receiver),
reported_messages_sender: network_channels.reported_messages_sender,
continue_propagation_sender: network_channels.continue_propagation_sender,
};
let sync_receiver =
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
}))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}

async fn run_sync(
Expand Down
Loading
Loading