Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc refactoring for boost #255

Merged
merged 5 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/mev/src/cmd/boost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ impl Command {
info!("configured for `{network}`");

if let Some(config) = config.boost {
Ok(Service::from(network, config).spawn().await?.await?)
let service = Service::from(network, config);
let handle = service.spawn()?;
Ok(handle.await?)
} else {
Err(eyre::eyre!("missing boost config from file provided"))
}
Expand Down
215 changes: 125 additions & 90 deletions mev-boost-rs/src/relay_mux.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use ethereum_consensus::{
primitives::{BlsPublicKey, Epoch, Slot, U256},
crypto::KzgCommitment,
primitives::{BlsPublicKey, Hash32, Slot, U256},
state_transition::Context,
};
use futures_util::{stream, StreamExt};
Expand All @@ -16,23 +17,22 @@ use mev_rs::{
use parking_lot::Mutex;
use rand::prelude::*;
use std::{cmp::Ordering, collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use tokio::time::timeout;
use tracing::{debug, info, warn};

// See note in the `mev-relay-rs::Relay` about this constant.
// TODO likely drop this feature...
const PROPOSAL_TOLERANCE_DELAY: Slot = 1;
// Track an auction for this amount of time, in slots.
const AUCTION_LIFETIME: u64 = 2;
// Give relays this amount of time in seconds to process validator registrations.
const VALIDATOR_REGISTRATION_TIME_OUT_SECS: u64 = 4;
// Give relays this amount of time in seconds to return bids.
const FETCH_BEST_BID_TIME_OUT_SECS: u64 = 1;
// Give relays this amount of time in seconds to respond with a payload.
const FETCH_PAYLOAD_TIME_OUT_SECS: u64 = 1;

fn bid_key_from(
signed_block: &SignedBlindedBeaconBlock,
public_key: &BlsPublicKey,
) -> AuctionRequest {
let slot = signed_block.message().slot();
let parent_hash =
signed_block.message().body().execution_payload_header().parent_hash().clone();

AuctionRequest { slot, parent_hash, public_key: public_key.clone() }
#[derive(Debug)]
struct AuctionContext {
slot: Slot,
relays: Vec<Arc<Relay>>,
}

fn validate_bid(
Expand All @@ -52,6 +52,35 @@ fn validate_bid(
.map_err(Into::into)
}

fn validate_payload(
contents: &AuctionContents,
expected_block_hash: &Hash32,
expected_commitments: Option<&[KzgCommitment]>,
) -> Result<(), BoostError> {
let provided_block_hash = contents.execution_payload().block_hash();
if expected_block_hash != provided_block_hash {
return Err(BoostError::InvalidPayloadHash {
expected: expected_block_hash.clone(),
provided: provided_block_hash.clone(),
})
}
let provided_commitments = contents.blobs_bundle().map(|bundle| &bundle.commitments);
match (expected_commitments, provided_commitments) {
(Some(expected), Some(provided)) => {
if expected == provided.as_ref() {
Ok(())
} else {
Err(BoostError::InvalidPayloadBlobs {
expected: expected.to_vec(),
provided: provided.to_vec(),
})
}
}
(None, None) => Ok(()),
_ => Err(BoostError::InvalidPayloadUnexpectedBlobs),
}
}

// Select the most valuable bids in `bids`, breaking ties by `block_hash`
fn select_best_bids(bids: impl Iterator<Item = (usize, U256)>) -> Vec<usize> {
let (best_indices, _value) =
Expand Down Expand Up @@ -81,42 +110,39 @@ impl Deref for RelayMux {

pub struct Inner {
relays: Vec<Arc<Relay>>,
context: Context,
context: Arc<Context>,
state: Mutex<State>,
}

#[derive(Debug, Default)]
struct State {
// map from bid requests to index of `Relay` in collection
outstanding_bids: HashMap<AuctionRequest, Vec<Arc<Relay>>>,
current_epoch_registration_count: usize,
latest_pubkey: BlsPublicKey,
outstanding_bids: HashMap<Hash32, Arc<AuctionContext>>,
}

impl RelayMux {
pub fn new(relays: impl Iterator<Item = Relay>, context: Context) -> Self {
let inner =
Inner { relays: relays.map(Arc::new).collect(), context, state: Default::default() };
pub fn new(relays: Vec<Relay>, context: Arc<Context>) -> Self {
let inner = Inner {
relays: relays.into_iter().map(Arc::new).collect(),
context,
state: Default::default(),
};
Self(Arc::new(inner))
}

pub fn on_slot(&self, slot: Slot) {
debug!(slot, "processing");
let retain_slot = slot - AUCTION_LIFETIME;
let mut state = self.state.lock();
state
.outstanding_bids
.retain(|auction_request, _| auction_request.slot + PROPOSAL_TOLERANCE_DELAY >= slot);
state.outstanding_bids.retain(|_, auction| auction.slot >= retain_slot);
}

pub fn on_epoch(&self, epoch: Epoch) {
debug!(epoch, "processing");
let count = {
let mut state = self.state.lock();
let count = state.current_epoch_registration_count;
state.current_epoch_registration_count = 0;
count
};
info!(count, epoch, "processed validator registrations")
fn get_context(&self, key: &Hash32) -> Result<Arc<AuctionContext>, Error> {
let state = self.state.lock();
state
.outstanding_bids
.get(key)
.cloned()
.ok_or_else::<Error, _>(|| BoostError::MissingOpenBid(key.clone()).into())
}
}

Expand All @@ -128,28 +154,33 @@ impl BlindedBlockProvider for RelayMux {
) -> Result<(), Error> {
let responses = stream::iter(self.relays.iter().cloned())
.map(|relay| async {
let response = relay.register_validators(registrations).await;
(relay, response)
let request = relay.register_validators(registrations);
let duration = Duration::from_secs(VALIDATOR_REGISTRATION_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, result)| async move {
match result {
Ok(Ok(_)) => Some(()),
Ok(Err(err)) => {
warn!(%err, %relay, "failure when registering validator(s)");
None
}
Err(_) => {
warn!(%relay, "timeout when registering validator(s)");
None
}
}
})
.collect::<Vec<_>>()
.await;

let mut num_failures = 0;
for (relay, response) in responses {
if let Err(err) = response {
num_failures += 1;
warn!(%relay, %err, "failed to register validator");
}
}

if num_failures == self.relays.len() {
if responses.is_empty() {
Err(BoostError::CouldNotRegister.into())
} else {
let count = registrations.len();
info!(count, "sent validator registrations");
let mut state = self.state.lock();
state.current_epoch_registration_count += registrations.len();
Ok(())
}
}
Expand All @@ -160,16 +191,14 @@ impl BlindedBlockProvider for RelayMux {
) -> Result<SignedBuilderBid, Error> {
let bids = stream::iter(self.relays.iter().cloned())
.map(|relay| async {
let response = tokio::time::timeout(
Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS),
relay.fetch_best_bid(auction_request),
)
.await;
(relay, response)
})
let request = relay.fetch_best_bid(auction_request);
let duration = Duration::from_secs(FETCH_BEST_BID_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, response)| async {
match response {
.filter_map(|(relay, result)| async {
match result {
Ok(Ok(bid)) => {
if let Err(err) = validate_bid(&bid, &relay.public_key, &self.context) {
warn!(%err, %relay, "invalid signed builder bid");
Expand Down Expand Up @@ -221,19 +250,20 @@ impl BlindedBlockProvider for RelayMux {
}
}

let relays_desc = best_relays
.iter()
.map(|relay| format!("{relay}"))
.reduce(|desc, next| format!("{desc}, {next}"))
.expect("at least one relay");
info!(%auction_request, %best_bid, relays=relays_desc, "acquired best bid");
let slot = auction_request.slot;
info!(
slot,
parent_hash = ?auction_request.parent_hash,
public_key = ?auction_request.public_key,
%best_bid,
relays = ?best_relays,
"acquired best bid"
);

{
let mut state = self.state.lock();
// assume the next request to open a bid corresponds to the current request
// TODO consider if the relay mux should have more knowledge about the proposal
state.latest_pubkey = auction_request.public_key.clone();
state.outstanding_bids.insert(auction_request.clone(), best_relays);
let auction_context = AuctionContext { slot, relays: best_relays };
state.outstanding_bids.insert(best_block_hash.clone(), Arc::new(auction_context));
}

Ok(best_bid.clone())
Expand All @@ -243,42 +273,47 @@ impl BlindedBlockProvider for RelayMux {
&self,
signed_block: &SignedBlindedBeaconBlock,
) -> Result<AuctionContents, Error> {
let (auction_request, relays) = {
let mut state = self.state.lock();
let key = bid_key_from(signed_block, &state.latest_pubkey);
// TODO: do not `remove` so this endpoint can be retried
let relays = state
.outstanding_bids
.remove(&key)
.ok_or_else::<Error, _>(|| BoostError::MissingOpenBid.into())?;
(key, relays)
};
let block = signed_block.message();
let slot = block.slot();
let body = block.body();
let expected_block_hash = body.execution_payload_header().block_hash().clone();
let context = self.get_context(&expected_block_hash)?;

let signed_block = &signed_block;
let responses = stream::iter(relays)
let responses = stream::iter(context.relays.iter().cloned())
.map(|relay| async move {
let response = relay.open_bid(signed_block).await;
(relay, response)
let request = relay.open_bid(signed_block);
let duration = Duration::from_secs(FETCH_PAYLOAD_TIME_OUT_SECS);
let result = timeout(duration, request).await;
(relay, result)
})
.buffer_unordered(self.relays.len())
.filter_map(|(relay, result)| async move {
match result {
Ok(response) => Some((relay, response)),
Err(_) => {
warn!( %relay, "timeout when opening bid");
None
}
}
})
.collect::<Vec<_>>()
.await;

let block = signed_block.message();
let block_body = block.body();
let payload_header = block_body.execution_payload_header();
let expected_block_hash = payload_header.block_hash();
for (relay, response) in responses.into_iter() {
match response {
Ok(auction_contents) => {
let block_hash = auction_contents.execution_payload().block_hash();
if block_hash == expected_block_hash {
info!(%auction_request, %block_hash, %relay, "acquired payload");
Ok(auction_contents) => match validate_payload(
&auction_contents,
&expected_block_hash,
body.blob_kzg_commitments().map(|commitments| commitments.as_slice()),
) {
Ok(_) => {
info!(%slot, block_hash = %expected_block_hash, %relay, "acquired payload");
return Ok(auction_contents)
} else {
warn!(?block_hash, ?expected_block_hash, %relay, "incorrect block hash delivered by relay");
}
}
Err(err) => {
warn!(?err, ?relay, "could not validate payload");
}
},
Err(err) => {
warn!(%err, %relay, "error opening bid");
}
Expand Down
Loading
Loading