diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 97097cc..20d64a3 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -41,6 +41,12 @@ pub use metrics::populate_name_registry; const MAX_FETCH_RETRIES: u32 = 5; const INITIAL_BACKOFF_MS: u64 = 10; const BACKOFF_MULTIPLIER: u64 = 4; +const PEER_REDIAL_INTERVAL_SECS: u64 = 12; + +enum RetryMessage { + BlockFetch(H256), + PeerRedial(PeerId), +} pub(crate) struct PendingRequest { pub(crate) attempts: u32, @@ -122,6 +128,7 @@ pub async fn start_p2p( }) .build(); let local_peer_id = *swarm.local_peer_id(); + let mut bootnode_addrs = HashMap::new(); for bootnode in bootnodes { let peer_id = PeerId::from_public_key(&bootnode.public_key); if peer_id == local_peer_id { @@ -133,6 +140,7 @@ pub async fn start_p2p( .with(Protocol::QuicV1) .with_p2p(peer_id) .expect("failed to add peer ID to multiaddr"); + bootnode_addrs.insert(peer_id, addr.clone()); swarm.dial(addr).unwrap(); } let addr = Multiaddr::empty() @@ -159,7 +167,7 @@ pub async fn start_p2p( "/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy" )); - info!("P2P node started on {listening_socket}"); + info!(socket=%listening_socket, "P2P node started"); let (retry_tx, retry_rx) = mpsc::unbounded_channel(); @@ -173,6 +181,7 @@ pub async fn start_p2p( connected_peers: HashSet::new(), pending_requests: HashMap::new(), request_id_map: HashMap::new(), + bootnode_addrs, retry_tx, retry_rx, }; @@ -197,8 +206,11 @@ pub(crate) struct P2PServer { pub(crate) connected_peers: HashSet, pub(crate) pending_requests: HashMap, pub(crate) request_id_map: HashMap, - retry_tx: mpsc::UnboundedSender, - retry_rx: mpsc::UnboundedReceiver, + /// Bootnode addresses for redialing when disconnected + bootnode_addrs: HashMap, + /// Channel for scheduling retries (block fetches and peer redials) + pub(crate) retry_tx: mpsc::UnboundedSender, + retry_rx: mpsc::UnboundedReceiver, } /// Event loop for the P2P crate. @@ -220,8 +232,11 @@ async fn event_loop(mut server: P2PServer) { }; handle_swarm_event(&mut server, event).await; } - Some(root) = server.retry_rx.recv() => { - handle_retry(&mut server, root).await; + Some(msg) = server.retry_rx.recv() => { + match msg { + RetryMessage::BlockFetch(root) => handle_retry(&mut server, root).await, + RetryMessage::PeerRedial(peer_id) => handle_peer_redial(&mut server, peer_id).await, + } } } } @@ -302,6 +317,7 @@ async fn handle_swarm_event(server: &mut P2PServer, event: SwarmEvent { metrics::notify_peer_connected(&peer_id, "inbound", "error"); @@ -369,6 +400,33 @@ async fn handle_retry(server: &mut P2PServer, root: H256) { } } +async fn handle_peer_redial(server: &mut P2PServer, peer_id: PeerId) { + // Skip if already reconnected + if server.connected_peers.contains(&peer_id) { + trace!(%peer_id, "Bootnode reconnected during redial delay, skipping"); + return; + } + + if let Some(addr) = server.bootnode_addrs.get(&peer_id) { + info!(%peer_id, "Redialing disconnected bootnode"); + // NOTE: this dial does some checks and adds a pending outbound connection attempt. + // It does NOT block. If the dial fails, we'll later get an OutgoingConnectionError event. + if let Err(e) = server.swarm.dial(addr.clone()) { + warn!(%peer_id, %e, "Failed to redial bootnode, will retry"); + // Schedule another redial attempt + schedule_peer_redial(server.retry_tx.clone(), peer_id); + } + } +} + +/// Schedules a peer redial after the configured delay interval. +pub(crate) fn schedule_peer_redial(retry_tx: mpsc::UnboundedSender, peer_id: PeerId) { + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(PEER_REDIAL_INTERVAL_SECS)).await; + let _ = retry_tx.send(RetryMessage::PeerRedial(peer_id)); + }); +} + pub struct Bootnode { ip: IpAddr, quic_port: u16, diff --git a/crates/net/p2p/src/req_resp/handlers.rs b/crates/net/p2p/src/req_resp/handlers.rs index d8e670c..026ae9d 100644 --- a/crates/net/p2p/src/req_resp/handlers.rs +++ b/crates/net/p2p/src/req_resp/handlers.rs @@ -11,7 +11,10 @@ use super::{ BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRootRequest, Request, Response, ResponsePayload, ResponseResult, Status, }; -use crate::{BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest}; +use crate::{ + BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, + RetryMessage, +}; pub async fn handle_req_resp_message( server: &mut P2PServer, @@ -226,6 +229,6 @@ async fn handle_fetch_failure( let retry_tx = server.retry_tx.clone(); tokio::spawn(async move { tokio::time::sleep(backoff).await; - let _ = retry_tx.send(root); + let _ = retry_tx.send(RetryMessage::BlockFetch(root)); }); }