Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use libp2p::kad internal bootstrap trigger #2261

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/p2p/src/behaviour/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl Builder {
kademlia_config.set_record_ttl(Some(Duration::from_secs(0)));
kademlia_config.set_provider_record_ttl(Some(PROVIDER_PUBLICATION_INTERVAL * 3));
kademlia_config.set_provider_publication_interval(Some(PROVIDER_PUBLICATION_INTERVAL));
kademlia_config.set_periodic_bootstrap_interval(Some(cfg.bootstrap_period));

let peer_id = identity.public().to_peer_id();
let secret = Secret::new(&identity);
Expand Down
4 changes: 2 additions & 2 deletions crates/p2p/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Builder {
let client = Client::new(command_sender, local_peer_id);

let (behaviour, relay_transport) = behaviour_builder
.unwrap_or_else(|| Behaviour::builder(keypair.clone(), chain_id, cfg.clone()))
.unwrap_or_else(|| Behaviour::builder(keypair.clone(), chain_id, cfg))
.build(client.clone());

let swarm = Swarm::new(
Expand All @@ -65,7 +65,7 @@ impl Builder {
(
client,
event_receiver,
MainLoop::new(swarm, command_receiver, event_sender, cfg),
MainLoop::new(swarm, command_receiver, event_sender),
)
}
}
23 changes: 3 additions & 20 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,12 @@ pub struct Config {
pub max_inbound_relayed_peers: usize,
/// Maximum number of outbound peers.
pub max_outbound_peers: usize,
/// The minimum number of peers to maintain. If the number of outbound peers
/// drops below this number, the node will attempt to connect to more
/// peers.
pub low_watermark: usize,
/// How long to prevent evicted peers from reconnecting.
pub eviction_timeout: Duration,
pub ip_whitelist: Vec<IpNet>,
pub bootstrap: BootstrapConfig,
/// If the number of peers is below the low watermark, the node will attempt
/// periodic bootstrapping at this interval.
pub bootstrap_period: Duration,
pub inbound_connections_rate_limit: RateLimit,
/// Custom protocol name for Kademlia
pub kad_name: Option<String>,
Expand All @@ -79,21 +77,6 @@ pub struct RateLimit {
pub interval: Duration,
}

#[derive(Copy, Clone, Debug)]
pub struct BootstrapConfig {
pub start_offset: Duration,
pub period: Duration,
}

impl Default for BootstrapConfig {
fn default() -> Self {
Self {
start_offset: Duration::from_secs(5),
period: Duration::from_secs(2 * 60),
}
}
}

pub type HeadTx = tokio::sync::watch::Sender<Option<(BlockNumber, BlockHash)>>;
pub type HeadRx = tokio::sync::watch::Receiver<Option<(BlockNumber, BlockHash)>>;

Expand Down
123 changes: 48 additions & 75 deletions crates/p2p/src/main_loop.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::num::NonZeroUsize;

use futures::channel::mpsc::Receiver as ResponseReceiver;
use futures::StreamExt;
use libp2p::gossipsub::{self, IdentTopic};
use libp2p::kad::{
self,
BootstrapError,
BootstrapOk,
ProgressStep,
QueryId,
QueryInfo,
QueryResult,
};
use libp2p::kad::{self, BootstrapError, BootstrapOk, QueryId, QueryResult};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::SwarmEvent;
Expand All @@ -29,10 +22,9 @@ use tokio::time::Duration;

#[cfg(test)]
use crate::test_utils;
use crate::{behaviour, Command, Config, EmptyResultSender, Event, TestCommand, TestEvent};
use crate::{behaviour, Command, EmptyResultSender, Event, TestCommand, TestEvent};

pub struct MainLoop {
cfg: crate::Config,
swarm: libp2p::swarm::Swarm<behaviour::Behaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
Expand All @@ -46,8 +38,6 @@ pub struct MainLoop {
// 2. update the sync head info of our peers using a different mechanism
// request_sync_status: HashSetDelay<PeerId>,
pending_queries: PendingQueries,
/// Ongoing Kademlia bootstrap query.
ongoing_bootstrap: Option<QueryId>,
_pending_test_queries: TestQueries,
}

Expand Down Expand Up @@ -86,36 +76,24 @@ impl MainLoop {
swarm: libp2p::swarm::Swarm<behaviour::Behaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
cfg: Config,
) -> Self {
Self {
cfg,
swarm,
command_receiver,
event_sender,
pending_dials: Default::default(),
pending_sync_requests: Default::default(),
pending_queries: Default::default(),
ongoing_bootstrap: None,
_pending_test_queries: Default::default(),
}
}

pub async fn run(mut self) {
// Delay bootstrap so that by the time we attempt it we've connected to the
// bootstrap node
let bootstrap_start = tokio::time::Instant::now() + self.cfg.bootstrap.start_offset;
let mut bootstrap_interval =
tokio::time::interval_at(bootstrap_start, self.cfg.bootstrap.period);

let mut network_status_interval = tokio::time::interval(Duration::from_secs(5));
let mut peer_status_interval = tokio::time::interval(Duration::from_secs(30));
let me = *self.swarm.local_peer_id();

loop {
let bootstrap_interval_tick = bootstrap_interval.tick();
tokio::pin!(bootstrap_interval_tick);

let network_status_interval_tick = network_status_interval.tick();
tokio::pin!(network_status_interval_tick);

Expand Down Expand Up @@ -162,31 +140,6 @@ impl MainLoop {
dht,
);
}
_ = bootstrap_interval_tick => {
tracing::debug!("Checking low watermark");
if let Some(query_id) = self.ongoing_bootstrap {
match self.swarm.behaviour_mut().kademlia_mut().query_mut(&query_id) {
Some(mut query) if matches!(query.info(), QueryInfo::Bootstrap {
step: ProgressStep { last: false, .. }, .. }
) => {
tracing::debug!("Previous bootstrap still in progress, aborting it");
query.finish();
continue;
}
_ => self.ongoing_bootstrap = None,
}
}
if self.swarm.behaviour_mut().outbound_peers().count() < self.cfg.low_watermark {
if let Ok(query_id) = self.swarm.behaviour_mut().kademlia_mut().bootstrap() {
self.ongoing_bootstrap = Some(query_id);
send_test_event(
&self.event_sender,
TestEvent::KademliaBootstrapStarted,
)
.await;
}
}
}
command = self.command_receiver.recv() => {
match command {
Some(c) => self.handle_command(c).await,
Expand Down Expand Up @@ -397,7 +350,6 @@ impl MainLoop {
Err(peer)
}
};
self.ongoing_bootstrap = None;
send_test_event(
&self.event_sender,
TestEvent::KademliaBootstrapCompleted(result),
Expand Down Expand Up @@ -451,31 +403,52 @@ impl MainLoop {
}
_ => self.test_query_completed(id, result).await,
}
} else if let QueryResult::GetProviders(result) = result {
use libp2p::kad::GetProvidersOk;

let result = match result {
Ok(GetProvidersOk::FoundProviders { providers, .. }) => Ok(providers),
Ok(_) => Ok(Default::default()),
Err(_) => {
unreachable!(
"when a query times out libp2p makes it the last stage"
)
}
};

let sender = self
.pending_queries
.get_providers
.get(&id)
.expect("Query to be pending");

sender
.send(result)
.await
.expect("Receiver not to be dropped");
} else {
self.test_query_progressed(id, result).await;
match result {
QueryResult::GetProviders(result) => {
use libp2p::kad::GetProvidersOk;

let result = match result {
Ok(GetProvidersOk::FoundProviders { providers, .. }) => {
Ok(providers)
}
Ok(_) => Ok(Default::default()),
Err(_) => {
unreachable!(
"when a query times out libp2p makes it the last stage"
)
}
};

let sender = self
.pending_queries
.get_providers
.get(&id)
.expect("Query to be pending");

sender
.send(result)
.await
.expect("Receiver not to be dropped");
}
QueryResult::Bootstrap(_) => {
tracing::debug!("Checking low watermark");
// Starting from libp2p-v0.54.1 bootstrap queries are started
// automatically in the kad behaviour:
// 1. periodically,
// 2. after a peer is added to the routing table, if the number of
// peers in the DHT is lower than 20. See `bootstrap_on_low_peers` for more details:
// https://github.com/libp2p/rust-libp2p/blob/d7beb55f672dce54017fa4b30f67ecb8d66b9810/protocols/kad/src/behaviour.rs#L1401).
if step.count == NonZeroUsize::new(1).expect("1>0") {
send_test_event(
&self.event_sender,
TestEvent::KademliaBootstrapStarted,
)
.await;
}
}
_ => self.test_query_progressed(id, result).await,
}
}
}
kad::Event::RoutingUpdated {
Expand Down
5 changes: 2 additions & 3 deletions crates/p2p/src/test_utils/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ impl Config {
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 10,
max_outbound_peers: 10,
low_watermark: 10,
ip_whitelist: vec!["::/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
bootstrap: Default::default(),
ip_whitelist: vec!["::1/0".parse().unwrap(), "0.0.0.0/0".parse().unwrap()],
bootstrap_period: Duration::from_millis(500),
eviction_timeout: Duration::from_secs(15 * 60),
inbound_connections_rate_limit: RateLimit {
max: 1000,
Expand Down
Loading