Skip to content

Commit

Permalink
feat(batcher): mock BlockBuilder for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Aug 25, 2024
1 parent 6fe60b7 commit 8c76f79
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/batcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ validator.workspace = true

[dev-dependencies]
assert_matches.workspace = true
mempool_test_utils.workspace = true
mockall.workspace = true
starknet_api = { workspace = true, features = ["testing"] }
63 changes: 30 additions & 33 deletions crates/batcher/src/proposals_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::BTreeMap;
use std::sync::Arc;

#[cfg(test)]
use mockall::automock;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use serde::{Deserialize, Serialize};
Expand All @@ -10,7 +12,7 @@ use starknet_mempool_types::communication::{MempoolClientError, SharedMempoolCli
use thiserror::Error;
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, Instrument};

// TODO: Should be defined in SN_API probably (shared with the consensus).
pub type ProposalId = u64;
Expand Down Expand Up @@ -81,13 +83,24 @@ pub(crate) struct ProposalsManager {
/// At any given time, there can be only one proposal being actively executed (either proposed
/// or validated).
proposal_in_generation: Arc<Mutex<Option<ProposalId>>>,
// To be able to mock BlockBuilder in tests.
block_builder_factory: Arc<dyn BlockBuilderFactory>,
}

impl ProposalsManager {
// TODO: Remove dead_code attribute.
#[allow(dead_code)]
pub fn new(config: ProposalsManagerConfig, mempool_client: SharedMempoolClient) -> Self {
Self { config, mempool_client, proposal_in_generation: Arc::new(Mutex::new(None)) }
pub fn new(
config: ProposalsManagerConfig,
mempool_client: SharedMempoolClient,
block_builder_factory: Arc<dyn BlockBuilderFactory>,
) -> Self {
Self {
config,
mempool_client,
proposal_in_generation: Arc::new(Mutex::new(None)),
block_builder_factory,
}
}

/// Starts a new block proposal generation task for the given proposal_id and height with
Expand All @@ -112,8 +125,10 @@ impl ProposalsManager {
max_txs_per_mempool_request: self.config.max_txs_per_mempool_request,
sender,
proposal_in_generation: self.proposal_in_generation.clone(),
block_builder_factory: self.block_builder_factory.clone(),
}
.run(),
.run()
.in_current_span(),
);

Ok(ReceiverStream::new(receiver))
Expand All @@ -140,35 +155,16 @@ impl ProposalsManager {
}
}

// TODO: Should be defined elsewhere.
#[allow(dead_code)]
mod block_builder {
use starknet_api::executable_transaction::Transaction;
use starknet_api::state::StateDiff;

#[derive(Debug, PartialEq)]
pub enum Status {
Building,
Ready,
Timeout,
}

pub struct BlockBuilder {}

impl BlockBuilder {
pub fn status(&self) -> Status {
Status::Building
}

/// Returning true if the block is ready to be proposed.
pub fn add_txs(&self, _txs: &[Transaction]) -> bool {
false
}
#[cfg_attr(test, automock)]
pub trait BlockBuilderTrait: Send {
/// Returning true if the block is ready to be proposed.
fn add_txs(&self, txs: &[Transaction]) -> bool;
}

pub fn close_block(&self) -> StateDiff {
StateDiff::default()
}
}
#[allow(dead_code)]
#[cfg_attr(test, automock)]
pub(crate) trait BlockBuilderFactory: Send + Sync {
fn create_block_builder(&self) -> Box<dyn BlockBuilderTrait>;
}

#[allow(dead_code)]
Expand All @@ -178,12 +174,13 @@ struct ProposalGenerationTask {
pub max_txs_per_mempool_request: usize,
pub sender: tokio::sync::mpsc::Sender<Transaction>,
pub proposal_in_generation: Arc<Mutex<Option<ProposalId>>>,
pub block_builder_factory: Arc<dyn BlockBuilderFactory>,
}

impl ProposalGenerationTask {
#[allow(dead_code)]
async fn run(self) -> ProposalsManagerResult<()> {
let block_builder = block_builder::BlockBuilder {};
let block_builder = self.block_builder_factory.create_block_builder();
loop {
if tokio::time::Instant::now() > self.timeout {
info!("Proposal reached timeout.");
Expand Down
162 changes: 147 additions & 15 deletions crates/batcher/src/proposals_manager_test.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,129 @@
use std::ops::Range;
use std::sync::Arc;

use assert_matches::assert_matches;
use mempool_test_utils::starknet_api_test_utils::create_executable_tx;
use starknet_api::block::BlockNumber;
use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::executable_transaction::Transaction;
use starknet_api::felt;
use starknet_api::transaction::{ResourceBoundsMapping, Tip, TransactionHash};
use starknet_mempool_types::communication::MockMempoolClient;
use tokio_stream::StreamExt;

use crate::proposals_manager::{ProposalsManager, ProposalsManagerConfig, ProposalsManagerError};
use crate::proposals_manager::{
MockBlockBuilderFactory,
MockBlockBuilderTrait,
ProposalId,
ProposalsManager,
ProposalsManagerConfig,
ProposalsManagerError,
};

const GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);
#[tokio::test]
async fn proposal_generation_success() {
let proposals_manager_config = ProposalsManagerConfig::default();
let mut block_builder_factory = MockBlockBuilderFactory::new();
block_builder_factory.expect_create_block_builder().once().returning(|| {
let mut mock_block_builder = MockBlockBuilderTrait::new();
mock_block_builder.expect_add_txs().once().returning(|_txs| false);
// Close the block after the second call to add_txs.
mock_block_builder.expect_add_txs().once().returning(|_txs| true);

Box::new(mock_block_builder)
});

let mut mempool_client = MockMempoolClient::new();
mempool_client.expect_get_txs().once().returning(|max_n_txs| Ok(test_txs(0..max_n_txs)));

mempool_client
.expect_get_txs()
.once()
.returning(|max_n_txs| Ok(test_txs(max_n_txs..2 * max_n_txs)));

let mut proposals_manager = ProposalsManager::new(
proposals_manager_config.clone(),
Arc::new(mempool_client),
Arc::new(block_builder_factory),
);

let expected_tx_hashes = (0..proposals_manager_config.max_txs_per_mempool_request * 2)
.map(|i| TransactionHash(felt!(u8::try_from(i).unwrap())))
.collect::<Vec<_>>();

let streamed_txs =
generate_block_proposal_and_collect_streamed_txs(&mut proposals_manager, 0, BlockNumber(0))
.await;
assert_eq!(streamed_txs, expected_tx_hashes);
}

#[tokio::test]
async fn concecutive_proposal_generations_success() {
let proposals_manager_config = ProposalsManagerConfig::default();
let mut block_builder_factory = MockBlockBuilderFactory::new();
block_builder_factory.expect_create_block_builder().times(2).returning(|| {
let mut mock_block_builder = MockBlockBuilderTrait::new();
mock_block_builder.expect_add_txs().once().returning(|_thin_txs| false);
// Close the block after the second call to add_txs.
mock_block_builder.expect_add_txs().once().returning(|_thin_txs| true);

Box::new(mock_block_builder)
});

let mut mempool_client = MockMempoolClient::new();
mempool_client.expect_get_txs().once().returning(|max_n_txs| Ok(test_txs(0..max_n_txs)));
mempool_client
.expect_get_txs()
.once()
.returning(|max_n_txs| Ok(test_txs(max_n_txs..2 * max_n_txs)));
mempool_client.expect_get_txs().once().returning(|max_n_txs| Ok(test_txs(0..max_n_txs)));
mempool_client
.expect_get_txs()
.once()
.returning(|max_n_txs| Ok(test_txs(max_n_txs..2 * max_n_txs)));

let mut proposals_manager = ProposalsManager::new(
proposals_manager_config.clone(),
Arc::new(mempool_client),
Arc::new(block_builder_factory),
);

let expected_tx_hashes = (0..proposals_manager_config.max_txs_per_mempool_request * 2)
.map(|i| TransactionHash(felt!(u8::try_from(i).unwrap())))
.collect::<Vec<_>>();

let streamed_txs =
generate_block_proposal_and_collect_streamed_txs(&mut proposals_manager, 0, BlockNumber(0))
.await;
assert_eq!(streamed_txs, expected_tx_hashes);

let streamed_txs =
generate_block_proposal_and_collect_streamed_txs(&mut proposals_manager, 1, BlockNumber(1))
.await;
assert_eq!(streamed_txs, expected_tx_hashes);
}

#[tokio::test]
async fn multiple_proposals_generation_fails() {
async fn multiple_proposals_generation_fail() {
let mut mempool_client = MockMempoolClient::new();
mempool_client.expect_get_txs().returning(|_| Ok(vec![]));
let mut proposals_manager =
ProposalsManager::new(ProposalsManagerConfig::default(), Arc::new(mempool_client));
let mut block_builder_factory = MockBlockBuilderFactory::new();
block_builder_factory
.expect_create_block_builder()
.once()
.returning(|| Box::new(MockBlockBuilderTrait::new()));
let mut proposals_manager = ProposalsManager::new(
ProposalsManagerConfig::default(),
Arc::new(mempool_client),
Arc::new(block_builder_factory),
);
let _ = proposals_manager
.generate_block_proposal(
0,
tokio::time::Instant::now() + GENERATION_TIMEOUT,
BlockNumber::default(),
)
.generate_block_proposal(0, arbitrary_deadline(), BlockNumber::default())
.await
.unwrap();

let another_generate_request = proposals_manager
.generate_block_proposal(
1,
tokio::time::Instant::now() + GENERATION_TIMEOUT,
BlockNumber::default(),
)
.generate_block_proposal(1, arbitrary_deadline(), BlockNumber::default())
.await;

assert_matches!(
Expand All @@ -39,3 +134,40 @@ async fn multiple_proposals_generation_fails() {
}) if current_generating_proposal_id == 0 && new_proposal_id == 1
);
}

async fn generate_block_proposal_and_collect_streamed_txs(
proposal_manager: &mut ProposalsManager,
proposal_id: ProposalId,
block_number: BlockNumber,
) -> Vec<TransactionHash> {
let mut tx_stream = proposal_manager
.generate_block_proposal(proposal_id, arbitrary_deadline(), block_number)
.await
.unwrap();

let mut streamed_txs = vec![];
while let Some(tx) = tx_stream.next().await {
streamed_txs.push(tx.tx_hash());
}

streamed_txs
}

fn arbitrary_deadline() -> tokio::time::Instant {
const GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);
tokio::time::Instant::now() + GENERATION_TIMEOUT
}

fn test_txs(tx_hash_range: Range<usize>) -> Vec<Transaction> {
tx_hash_range
.map(|i| {
create_executable_tx(
ContractAddress::default(),
TransactionHash(felt!(u128::try_from(i).unwrap())),
Tip::default(),
Nonce::default(),
ResourceBoundsMapping::default(),
)
})
.collect()
}

0 comments on commit 8c76f79

Please sign in to comment.