Skip to content

Commit

Permalink
feat(batcher): mock BlockBuilder for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Sep 24, 2024
1 parent 8612c1d commit e2297ba
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 61 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ assert_matches = "1.5.0"
async-recursion = "1.1.0"
async-stream = "0.3.3"
async-trait = "0.1.79"
atomic_refcell = "0.1.13"
axum = "0.6.12"
base64 = "0.13.0"
bincode = "1.3.3"
Expand Down
4 changes: 4 additions & 0 deletions crates/batcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ validator.workspace = true

[dev-dependencies]
assert_matches.workspace = true
futures.workspace = true
mempool_test_utils.workspace = true
mockall.workspace = true
rstest.workspace = true
starknet_api = { workspace = true, features = ["testing"] }
116 changes: 76 additions & 40 deletions crates/batcher/src/proposal_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
#[cfg(test)]
use mockall::automock;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use serde::{Deserialize, Serialize};
Expand All @@ -17,20 +20,32 @@ pub type ProposalId = u64;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProposalManagerConfig {
pub block_builder_next_txs_buffer_size: usize,
pub max_txs_per_mempool_request: usize,
pub outstream_content_buffer_size: usize,
}

impl Default for ProposalManagerConfig {
fn default() -> Self {
// TODO: Get correct value for default max_txs_per_mempool_request.
Self { max_txs_per_mempool_request: 10, outstream_content_buffer_size: 100 }
// TODO: Get correct default values.
Self {
block_builder_next_txs_buffer_size: 100,
max_txs_per_mempool_request: 10,
outstream_content_buffer_size: 100,
}
}
}

impl SerializeConfig for ProposalManagerConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([
ser_param(
"block_builder_next_txs_buffer_size",
&self.block_builder_next_txs_buffer_size,
"Maximum transactions to fill in the stream buffer for the block builder before \
blocking",
ParamPrivacyInput::Public,
),
ser_param(
"max_txs_per_mempool_request",
&self.max_txs_per_mempool_request,
Expand Down Expand Up @@ -81,13 +96,28 @@ pub(crate) struct ProposalManager {
/// At any given time, there can be only one proposal being actively executed (either proposed
/// or validated).
active_proposal: Arc<Mutex<Option<ProposalId>>>,
active_proposal_handle: Option<ActiveTaskHandle>,
// Use a factory object, to be able to mock BlockBuilder in tests.
block_builder_factory: Arc<dyn BlockBuilderFactoryTrait>,
}

type ActiveTaskHandle = tokio::task::JoinHandle<ProposalsManagerResult<()>>;

impl ProposalManager {
// TODO: Remove dead_code attribute.
#[allow(dead_code)]
pub fn new(config: ProposalManagerConfig, mempool_client: SharedMempoolClient) -> Self {
Self { config, mempool_client, active_proposal: Arc::new(Mutex::new(None)) }
pub fn new(
config: ProposalManagerConfig,
mempool_client: SharedMempoolClient,
block_builder_factory: Arc<dyn BlockBuilderFactoryTrait>,
) -> Self {
Self {
config,
mempool_client,
active_proposal: Arc::new(Mutex::new(None)),
block_builder_factory,
active_proposal_handle: None,
}
}

/// Starts a new block proposal generation task for the given proposal_id and height with
Expand All @@ -104,19 +134,21 @@ impl ProposalManager {
info!("Starting generation of a new proposal with id {}.", proposal_id);
self.set_active_proposal(proposal_id).await?;

let block_builder = block_builder::BlockBuilder {};
let _handle = tokio::spawn(
let block_builder = self.block_builder_factory.create_block_builder();

self.active_proposal_handle = Some(tokio::spawn(
BuildProposalTask {
mempool_client: self.mempool_client.clone(),
output_content_sender,
block_builder_next_txs_buffer_size: self.config.block_builder_next_txs_buffer_size,
max_txs_per_mempool_request: self.config.max_txs_per_mempool_request,
block_builder,
active_proposal: self.active_proposal.clone(),
deadline,
}
.run()
.in_current_span(),
);
));

Ok(())
}
Expand All @@ -137,30 +169,14 @@ impl ProposalManager {
debug!("Set proposal {} as the one being generated.", proposal_id);
Ok(())
}
}

// TODO: Should be defined elsewhere.
#[allow(dead_code)]
mod block_builder {
use starknet_api::executable_transaction::Transaction;
use tokio_stream::Stream;

#[derive(Debug, PartialEq)]
pub enum Status {
Building,
Ready,
Timeout,
}

pub struct BlockBuilder {}

impl BlockBuilder {
pub async fn build_block(
&self,
_deadline: tokio::time::Instant,
_mempool_tx_stream: impl Stream<Item = Transaction>,
_output_content_sender: tokio::sync::mpsc::Sender<Transaction>,
) {
// A helper function for testing purposes (to be able to await the active proposal).
// TODO: Consider making the tests a nested module to allow them to access private members.
#[cfg(test)]
pub async fn await_active_proposal(&mut self) -> Option<ProposalsManagerResult<()>> {
match self.active_proposal_handle.take() {
Some(handle) => Some(handle.await.unwrap()),
None => None,
}
}
}
Expand All @@ -170,19 +186,19 @@ struct BuildProposalTask {
mempool_client: SharedMempoolClient,
output_content_sender: tokio::sync::mpsc::Sender<Transaction>,
max_txs_per_mempool_request: usize,
block_builder: block_builder::BlockBuilder,
block_builder_next_txs_buffer_size: usize,
block_builder: Arc<dyn BlockBuilderTrait>,
active_proposal: Arc<Mutex<Option<ProposalId>>>,
deadline: tokio::time::Instant,
}

#[allow(dead_code)]
impl BuildProposalTask {
async fn run(mut self) -> ProposalsManagerResult<()> {
// TODO: Should we use a different config for the stream buffer size?
// We convert the receiver to a stream and pass it to the block builder while using the
// sender to feed the stream.
let (mempool_tx_sender, mempool_tx_receiver) =
tokio::sync::mpsc::channel::<Transaction>(self.max_txs_per_mempool_request);
tokio::sync::mpsc::channel::<Transaction>(self.block_builder_next_txs_buffer_size);
let mempool_tx_stream = ReceiverStream::new(mempool_tx_receiver);
let building_future = self.block_builder.build_block(
self.deadline,
Expand Down Expand Up @@ -225,6 +241,11 @@ impl BuildProposalTask {
loop {
// TODO: Get L1 transactions.
let mempool_txs = match mempool_client.get_txs(max_txs_per_mempool_request).await {
Ok(txs) if txs.is_empty() => {
// TODO: Consider sleeping for a while.
tokio::task::yield_now().await;
continue;
}
Ok(txs) => txs,
Err(e) => return e.into(),
};
Expand All @@ -233,13 +254,10 @@ impl BuildProposalTask {
mempool_txs.len()
);
for tx in mempool_txs {
if let Err(e) = mempool_tx_sender.send(tx).await.map_err(|err| {
// TODO: should we return the rest of the txs to the mempool?
error!("Failed to send transaction to the block builder: {}.", err);
ProposalManagerError::InternalError
}) {
return e;
}
mempool_tx_sender
.send(tx)
.await
.expect("Channel should remain open during feeding mempool transactions.");
}
}
}
Expand All @@ -249,3 +267,21 @@ impl BuildProposalTask {
*proposal_id = None;
}
}

pub type InputTxStream = ReceiverStream<Transaction>;
pub type OutputTxStream = ReceiverStream<Transaction>;

#[async_trait]
pub trait BlockBuilderTrait: Send + Sync {
async fn build_block(
&self,
deadline: tokio::time::Instant,
tx_stream: InputTxStream,
output_content_sender: tokio::sync::mpsc::Sender<Transaction>,
);
}

#[cfg_attr(test, automock)]
pub trait BlockBuilderFactoryTrait: Send + Sync {
fn create_block_builder(&self) -> Arc<dyn BlockBuilderTrait>;
}
Loading

0 comments on commit e2297ba

Please sign in to comment.