Skip to content

Commit

Permalink
refactor(batcher): refactor proposal manager
Browse files Browse the repository at this point in the history
  • Loading branch information
yair-starkware committed Sep 22, 2024
1 parent 91835a9 commit 64cfda8
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 95 deletions.
202 changes: 116 additions & 86 deletions crates/batcher/src/proposals_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@ use std::sync::Arc;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use starknet_api::executable_transaction::Transaction;
use starknet_mempool_types::communication::{MempoolClientError, SharedMempoolClient};
use thiserror::Error;
use tokio::select;
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, info, instrument};
use tracing::{debug, error, info, instrument, trace, Instrument};

// TODO: Should be defined in SN_API probably (shared with the consensus).
pub type ProposalId = u64;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProposalsManagerConfig {
pub struct ProposalManagerConfig {
pub max_txs_per_mempool_request: usize,
pub outstream_content_buffer_size: usize,
}

impl Default for ProposalsManagerConfig {
impl Default for ProposalManagerConfig {
fn default() -> Self {
// TODO: Get correct value for default max_txs_per_mempool_request.
Self { max_txs_per_mempool_request: 10, outstream_content_buffer_size: 100 }
}
}

impl SerializeConfig for ProposalsManagerConfig {
impl SerializeConfig for ProposalManagerConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([
ser_param(
Expand All @@ -47,8 +47,8 @@ impl SerializeConfig for ProposalsManagerConfig {
}
}

#[derive(Debug, Error)]
pub enum ProposalsManagerError {
#[derive(Clone, Debug, Error)]
pub enum ProposalManagerError {
#[error(
"Received proposal generation request with id {new_proposal_id} while already generating \
proposal with id {current_generating_proposal_id}."
Expand All @@ -63,7 +63,7 @@ pub enum ProposalsManagerError {
MempoolError(#[from] MempoolClientError),
}

pub type ProposalsManagerResult<T> = Result<T, ProposalsManagerError>;
pub type ProposalsManagerResult<T> = Result<T, ProposalManagerError>;

/// Main struct for handling block proposals.
/// Taking care of:
Expand All @@ -74,62 +74,61 @@ pub type ProposalsManagerResult<T> = Result<T, ProposalsManagerError>;
/// Triggered by the consensus.
// TODO: Remove dead_code attribute.
#[allow(dead_code)]
pub(crate) struct ProposalsManager {
config: ProposalsManagerConfig,
pub(crate) struct ProposalManager {
config: ProposalManagerConfig,
mempool_client: SharedMempoolClient,
/// The block proposal that is currently being proposed, if any.
/// At any given time, there can be only one proposal being actively executed (either proposed
/// or validated).
proposal_in_generation: Arc<Mutex<Option<ProposalId>>>,
active_proposal: Arc<Mutex<Option<ProposalId>>>,
}

impl ProposalsManager {
impl ProposalManager {
// TODO: Remove dead_code attribute.
#[allow(dead_code)]
pub fn new(config: ProposalsManagerConfig, mempool_client: SharedMempoolClient) -> Self {
Self { config, mempool_client, proposal_in_generation: Arc::new(Mutex::new(None)) }
pub fn new(config: ProposalManagerConfig, mempool_client: SharedMempoolClient) -> Self {
Self { config, mempool_client, active_proposal: Arc::new(Mutex::new(None)) }
}

/// Starts a new block proposal generation task for the given proposal_id and height with
/// transactions from the mempool.
#[instrument(skip(self))]
pub async fn generate_block_proposal(
/// Requires output_content_sender for sending the generated transactions to the caller.
#[instrument(skip(self, output_content_sender), err)]
pub async fn build_block_proposal(
&mut self,
proposal_id: ProposalId,
timeout: tokio::time::Instant,
_height: BlockNumber,
) -> ProposalsManagerResult<ReceiverStream<Transaction>> {
info!("Starting generation of new proposal.");
self.set_proposal_in_generation(proposal_id).await?;

let (sender, receiver) =
tokio::sync::mpsc::channel::<Transaction>(self.config.outstream_content_buffer_size);
// TODO: Find where to join the task - needed to make sure it starts immediatly.
deadline: tokio::time::Instant,
// TODO: Should this be an unbounded channel?
output_content_sender: tokio::sync::mpsc::Sender<Transaction>,
) -> ProposalsManagerResult<()> {
info!("Starting generation of a new proposal with id {}.", proposal_id);
self.set_active_proposal(proposal_id).await?;

let block_builder = block_builder::BlockBuilder {};
let _handle = tokio::spawn(
ProposalGenerationTask {
timeout,
BuildProposalTask {
mempool_client: self.mempool_client.clone(),
output_content_sender,
max_txs_per_mempool_request: self.config.max_txs_per_mempool_request,
sender,
proposal_in_generation: self.proposal_in_generation.clone(),
block_builder,
active_proposal: self.active_proposal.clone(),
deadline,
}
.run(),
.run()
.in_current_span(),
);

Ok(ReceiverStream::new(receiver))
Ok(())
}

// Checks if there is already a proposal being generated, and if not, sets the given proposal_id
// as the one being generated.
async fn set_proposal_in_generation(
&mut self,
proposal_id: ProposalId,
) -> ProposalsManagerResult<()> {
let mut lock = self.proposal_in_generation.lock().await;
async fn set_active_proposal(&mut self, proposal_id: ProposalId) -> ProposalsManagerResult<()> {
let mut lock = self.active_proposal.lock().await;

if let Some(proposal_in_generation) = *lock {
return Err(ProposalsManagerError::AlreadyGeneratingProposal {
current_generating_proposal_id: proposal_in_generation,
if let Some(active_proposal) = *lock {
return Err(ProposalManagerError::AlreadyGeneratingProposal {
current_generating_proposal_id: active_proposal,
new_proposal_id: proposal_id,
});
}
Expand All @@ -144,7 +143,7 @@ impl ProposalsManager {
#[allow(dead_code)]
mod block_builder {
use starknet_api::executable_transaction::Transaction;
use starknet_api::state::StateDiff;
use tokio_stream::Stream;

#[derive(Debug, PartialEq)]
pub enum Status {
Expand All @@ -156,66 +155,97 @@ mod block_builder {
pub struct BlockBuilder {}

impl BlockBuilder {
pub fn status(&self) -> Status {
Status::Building
}

/// Returning true if the block is ready to be proposed.
pub fn add_txs_and_stream(
pub async fn build_block(
&self,
_txs: &[Transaction],
_sender: &tokio::sync::mpsc::Sender<Transaction>,
) -> bool {
false
}

pub fn close_block(&self) -> StateDiff {
StateDiff::default()
_deadline: tokio::time::Instant,
_mempool_tx_stream: impl Stream<Item = Transaction>,
_output_content_sender: tokio::sync::mpsc::Sender<Transaction>,
) {
}
}
}

#[allow(dead_code)]
struct ProposalGenerationTask {
pub timeout: tokio::time::Instant,
pub mempool_client: SharedMempoolClient,
pub max_txs_per_mempool_request: usize,
pub sender: tokio::sync::mpsc::Sender<Transaction>,
pub proposal_in_generation: Arc<Mutex<Option<ProposalId>>>,
struct BuildProposalTask {
mempool_client: SharedMempoolClient,
output_content_sender: tokio::sync::mpsc::Sender<Transaction>,
max_txs_per_mempool_request: usize,
block_builder: block_builder::BlockBuilder,
active_proposal: Arc<Mutex<Option<ProposalId>>>,
deadline: tokio::time::Instant,
}

impl ProposalGenerationTask {
#[allow(dead_code)]
async fn run(self) -> ProposalsManagerResult<()> {
let block_builder = block_builder::BlockBuilder {};
loop {
if tokio::time::Instant::now() > self.timeout {
info!("Proposal reached timeout.");
break;
}
let mempool_txs = self.mempool_client.get_txs(self.max_txs_per_mempool_request).await?;
if mempool_txs.is_empty() {
// TODO: check if sleep is needed here.
tokio::task::yield_now().await;
continue;
#[allow(dead_code)]
impl BuildProposalTask {
async fn run(mut self) -> ProposalsManagerResult<()> {
// TODO: Should we use a different config for the stream buffer size?
// We convert the receiver to a stream and pass it to the block builder while using the
// sender to feed the stream.
let (mempool_tx_sender, mempool_tx_receiver) =
tokio::sync::mpsc::channel::<Transaction>(self.max_txs_per_mempool_request);
let mempool_tx_stream = ReceiverStream::new(mempool_tx_receiver);
let building_future = self.block_builder.build_block(
self.deadline,
mempool_tx_stream,
self.output_content_sender.clone(),
);

let feed_mempool_txs_future = Self::feed_mempool_txs(
&self.mempool_client,
self.max_txs_per_mempool_request,
&mempool_tx_sender,
);

// Wait for either the block builder to finish or the feeding of transactions to error.
// The other task will be cancelled.
let res = select! {
// This will send txs from the mempool to the stream we provided to the block builder.
feeding_error = feed_mempool_txs_future => {
error!("Failed to feed more mempool txs: {}.", feeding_error);
// TODO: Notify the mempool about remaining txs.
// TODO: Abort the block builder or wait for it to finish.
Err(feeding_error)
},
builder_done = building_future => {
info!("Block builder finished.");
Ok(builder_done)
}
};
self.active_proposal_finished().await;
res
}

/// Feeds transactions from the mempool to the mempool_tx_sender channel.
/// Returns only on error or when the task is cancelled.
async fn feed_mempool_txs(
mempool_client: &SharedMempoolClient,
max_txs_per_mempool_request: usize,
mempool_tx_sender: &tokio::sync::mpsc::Sender<Transaction>,
) -> ProposalManagerError {
loop {
// TODO: Get L1 transactions.
debug!("Adding {} mempool transactions to proposal in generation.", mempool_txs.len());
// TODO: This is cpu bound operation, should use spawn_blocking / Rayon / std::thread
// here or from inside the function.
let is_block_ready =
block_builder.add_txs_and_stream(mempool_txs.as_slice(), &self.sender);
if is_block_ready {
break;
let mempool_txs = match mempool_client.get_txs(max_txs_per_mempool_request).await {
Ok(txs) => txs,
Err(e) => return e.into(),
};
trace!(
"Feeding {} transactions from the mempool to the block builder.",
mempool_txs.len()
);
for tx in mempool_txs {
if let Err(e) = mempool_tx_sender.send(tx).await.map_err(|err| {
// TODO: should we return the rest of the txs to the mempool?
error!("Failed to send transaction to the block builder: {}.", err);
ProposalManagerError::InternalError
}) {
return e;
}
}
}
}

info!("Closing block.");
// TODO: Get state diff.
let mut proposal_id = self.proposal_in_generation.lock().await;
async fn active_proposal_finished(&mut self) {
let mut proposal_id = self.active_proposal.lock().await;
*proposal_id = None;

Ok(())
}
}
19 changes: 10 additions & 9 deletions crates/batcher/src/proposals_manager_test.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use assert_matches::assert_matches;
use starknet_api::block::BlockNumber;
use starknet_mempool_types::communication::MockMempoolClient;

use crate::proposals_manager::{ProposalsManager, ProposalsManagerConfig, ProposalsManagerError};
use crate::proposals_manager::{ProposalManager, ProposalManagerConfig, ProposalManagerError};

const GENERATION_TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_secs(1);

Expand All @@ -13,27 +12,29 @@ async fn multiple_proposals_generation_fails() {
let mut mempool_client = MockMempoolClient::new();
mempool_client.expect_get_txs().returning(|_| Ok(vec![]));
let mut proposals_manager =
ProposalsManager::new(ProposalsManagerConfig::default(), Arc::new(mempool_client));
let _ = proposals_manager
.generate_block_proposal(
ProposalManager::new(ProposalManagerConfig::default(), Arc::new(mempool_client));
let (output_content_sender, _rx) = tokio::sync::mpsc::channel(1);
proposals_manager
.build_block_proposal(
0,
tokio::time::Instant::now() + GENERATION_TIMEOUT,
BlockNumber::default(),
output_content_sender,
)
.await
.unwrap();

let (another_output_content_sender, _another_rx) = tokio::sync::mpsc::channel(1);
let another_generate_request = proposals_manager
.generate_block_proposal(
.build_block_proposal(
1,
tokio::time::Instant::now() + GENERATION_TIMEOUT,
BlockNumber::default(),
another_output_content_sender,
)
.await;

assert_matches!(
another_generate_request,
Err(ProposalsManagerError::AlreadyGeneratingProposal {
Err(ProposalManagerError::AlreadyGeneratingProposal {
current_generating_proposal_id,
new_proposal_id
}) if current_generating_proposal_id == 0 && new_proposal_id == 1
Expand Down

0 comments on commit 64cfda8

Please sign in to comment.