diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs index fe8dbc7a2..5d64bba7e 100644 --- a/chain/src/pibd_params.rs +++ b/chain/src/pibd_params.rs @@ -43,6 +43,9 @@ pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20; /// will always be requested first) pub const SEGMENT_REQUEST_COUNT: usize = 15; +/// How many blocks behind the tip a PIBD peer may be and still be considered usable. +pub const PIBD_PEER_HEIGHT_SLACK_BLOCKS: u64 = 2; + /// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds /// give up and revert back to the txhashset.zip download method pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60; diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index ca391397a..d8c41fe28 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -497,102 +497,126 @@ impl Desegmenter { } } } - } else { - // We have all required bitmap segments and have recreated our local - // bitmap, now continue with other segments, evenly spreading requests - // among MMRs - let local_output_mmr_size; - let local_kernel_mmr_size; - let local_rangeproof_mmr_size; - { - let txhashset = self.txhashset.read(); - local_output_mmr_size = txhashset.output_mmr_size(); - local_kernel_mmr_size = txhashset.kernel_mmr_size(); - local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); + // Bitmap is not finished yet, continue at next iteration when it will be ready. + return return_vec; + } + + // We have all required bitmap segments and have recreated our local + // bitmap, now continue with other segments, evenly spreading requests + // among MMRs + let local_output_mmr_size; + let local_kernel_mmr_size; + let local_rangeproof_mmr_size; + { + let txhashset = self.txhashset.read(); + local_output_mmr_size = txhashset.output_mmr_size(); + local_kernel_mmr_size = txhashset.kernel_mmr_size(); + local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size(); + } + let total_output_segments = SegmentIdentifier::count_segments_required( + self.archive_header.output_mmr_size, + self.default_output_segment_height, + ); + let mut elems_added = 0; + if let Some(mut next_output_idx) = self.next_required_output_segment_index() { + while (next_output_idx as usize) < total_output_segments { + if elems_added == max_elements / 3 { + break; + } + let output_id = SegmentIdentifier { + height: self.default_output_segment_height, + idx: next_output_idx, + }; + let (_first, last) = + output_id.segment_pos_range(self.archive_header.output_mmr_size); + if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id)); + elems_added += 1; + } + next_output_idx += 1; } - let total_output_segments = SegmentIdentifier::count_segments_required( - self.archive_header.output_mmr_size, - self.default_output_segment_height, - ); - let mut elems_added = 0; - if let Some(mut next_output_idx) = self.next_required_output_segment_index() { - while (next_output_idx as usize) < total_output_segments { - if self.output_segment_cache.len() >= self.max_cached_segments { - break; - } - if elems_added == max_elements / 3 { - break; - } - let output_id = SegmentIdentifier { - height: self.default_output_segment_height, - idx: next_output_idx, - }; - let (_first, last) = - output_id.segment_pos_range(self.archive_header.output_mmr_size); - if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) { - return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id)); - elems_added += 1; - } - next_output_idx += 1; + } + + let total_rangeproof_segments = SegmentIdentifier::count_segments_required( + self.archive_header.output_mmr_size, + self.default_rangeproof_segment_height, + ); + elems_added = 0; + if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() { + while (next_rp_idx as usize) < total_rangeproof_segments { + if elems_added == max_elements / 3 { + break; } + let rp_id = SegmentIdentifier { + height: self.default_rangeproof_segment_height, + idx: next_rp_idx, + }; + let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size); + if last > local_rangeproof_mmr_size && !self.has_rangeproof_segment_with_id(rp_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id)); + elems_added += 1; + } + next_rp_idx += 1; } + } - let total_rangeproof_segments = SegmentIdentifier::count_segments_required( - self.archive_header.output_mmr_size, - self.default_rangeproof_segment_height, - ); - elems_added = 0; - if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() { - while (next_rp_idx as usize) < total_rangeproof_segments { - if self.rangeproof_segment_cache.len() >= self.max_cached_segments { - break; - } - if elems_added == max_elements / 3 { - break; - } - let rp_id = SegmentIdentifier { - height: self.default_rangeproof_segment_height, - idx: next_rp_idx, - }; - let (_first, last) = - rp_id.segment_pos_range(self.archive_header.output_mmr_size); - if last > local_rangeproof_mmr_size - && !self.has_rangeproof_segment_with_id(rp_id) - { - return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id)); - elems_added += 1; - } - next_rp_idx += 1; + let total_kernel_segments = SegmentIdentifier::count_segments_required( + self.archive_header.kernel_mmr_size, + self.default_kernel_segment_height, + ); + elems_added = 0; + if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() { + while (next_kernel_idx as usize) < total_kernel_segments { + if elems_added == max_elements / 3 { + break; + } + let k_id = SegmentIdentifier { + height: self.default_kernel_segment_height, + idx: next_kernel_idx, + }; + let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size); + if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) { + return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id)); + elems_added += 1; } + next_kernel_idx += 1; } + } - let total_kernel_segments = SegmentIdentifier::count_segments_required( - self.archive_header.kernel_mmr_size, - self.default_kernel_segment_height, - ); - elems_added = 0; - if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() { - while (next_kernel_idx as usize) < total_kernel_segments { - if self.kernel_segment_cache.len() >= self.max_cached_segments { - break; - } - if elems_added == max_elements / 3 { - break; - } - let k_id = SegmentIdentifier { - height: self.default_kernel_segment_height, - idx: next_kernel_idx, - }; - let (_first, last) = - k_id.segment_pos_range(self.archive_header.kernel_mmr_size); - if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) { - return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id)); - elems_added += 1; - } - next_kernel_idx += 1; + // Explicitly add segment identifier to request if not exists. + let mut maybe_add_to_request = |seg_id: SegmentTypeIdentifier| { + if return_vec.iter().any(|i| i == &seg_id) { + if return_vec.len() >= max_elements { + return_vec.pop(); } + return_vec.push(seg_id); + } + }; + + // Ensure we explicitly ask for the next output segment. + if let Some(next_output_idx) = self.next_required_output_segment_index() { + let seg_id = SegmentIdentifier { + height: self.default_output_segment_height, + idx: next_output_idx, + }; + if !self.has_output_segment_with_id(seg_id) { + let next_output_seg_id = SegmentTypeIdentifier::new(SegmentType::Output, seg_id); + maybe_add_to_request(next_output_seg_id); } } + + // Ensure we explicitly ask for the next rangeproof segment. + if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() { + let seg_id = SegmentIdentifier { + height: self.default_rangeproof_segment_height, + idx: next_rp_idx, + }; + if !self.has_rangeproof_segment_with_id(seg_id) { + let next_proof_seg_id = SegmentTypeIdentifier::new(SegmentType::RangeProof, seg_id); + maybe_add_to_request(next_proof_seg_id); + } + } + // Always ensure we explicitly ask for the very next kernel segment we are waiting on. // The regular round-robin above can get saturated with outputs and rangeproofs while // the desegmenter is blocked on a missing kernel, so we force this one in. @@ -601,14 +625,9 @@ impl Desegmenter { height: self.default_kernel_segment_height, idx: next_kernel_idx, }; - let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id); - if !self.has_kernel_segment_with_id(seg_id) - && !return_vec.iter().any(|x| x == &next_kernel_seg_id) - { - if return_vec.len() >= max_elements { - return_vec.pop(); - } - return_vec.push(next_kernel_seg_id); + if !self.has_kernel_segment_with_id(seg_id) { + let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id); + maybe_add_to_request(next_kernel_seg_id); } } if return_vec.is_empty() && self.bitmap_cache.is_some() { diff --git a/chain/src/types.rs b/chain/src/types.rs index 370f4a954..416b9770f 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -316,6 +316,16 @@ impl SyncState { removed_segments } + /// Drop all tracked PIBD requests, returning how many entries were removed. + pub fn clear_pibd_requests(&self) -> usize { + let mut requests = self.requested_pibd_segments.write(); + let cleared = requests.len(); + if cleared > 0 { + requests.clear(); + } + cleared + } + /// Check whether segment is in request list pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool { self.requested_pibd_segments diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index a0433d25e..e61e2115b 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -13,6 +13,8 @@ // limitations under the License. use crate::util::{Mutex, RwLock}; +use chrono::Duration; +use lru_cache::LruCache; use std::fmt; use std::fs::File; use std::net::{Shutdown, TcpStream}; @@ -20,8 +22,6 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use lru_cache::LruCache; - use crate::chain; use crate::chain::txhashset::BitmapChunk; use crate::conn; @@ -52,6 +52,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500; enum State { Connected, Banned, + Blocked(DateTime, u32), } pub struct Peer { @@ -193,6 +194,45 @@ impl Peer { State::Banned == *self.state.read() } + /// Whether this peer has been blocked. + pub fn is_blocked(&self) -> bool { + match *self.state.read() { + State::Blocked(expiry, _) => expiry > Utc::now(), + _ => false, + } + } + + /// Set this peer status to blocked. + pub fn set_blocked(&self) { + if self.is_blocked() { + return; + } + let times = { + match *self.state.read() { + State::Blocked(_, times) => times + 1, + _ => 1, + } + }; + let duration = match times { + 1 => 60, // 1m + 2 => 180, // 3m + _ => 600, // 10m + }; + let expiry = Utc::now() + Duration::seconds(duration); + *self.state.write() = State::Blocked(expiry, times); + debug!( + "state_sync: block peer {} for {} times: {}", + self.info.addr, duration, times + ); + } + + /// Unblock blocked peer. + pub fn unblock(&self) { + if self.is_blocked() { + *self.state.write() = State::Connected; + } + } + /// Whether this peer is stuck on sync. pub fn is_stuck(&self) -> (bool, Difficulty) { let peer_live_info = self.info.live_info.read(); diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 0f10a84ec..fdf10ace3 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -434,6 +434,51 @@ impl Peers { } } + /// Disconnect a peer without banning it. + pub fn disconnect_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> { + let mut peers = self.peers.try_write_for(LOCK_TIMEOUT).ok_or_else(|| { + error!("disconnect_peer: failed to get peers lock"); + Error::PeerException + })?; + match peers.remove(&peer_addr) { + Some(peer) => { + warn!("disconnecting peer {} ({})", peer_addr, reason); + peer.stop(); + Ok(()) + } + None => Err(Error::PeerNotFound), + } + } + + /// Temporary block a peer without banning it. + pub fn block_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> { + let peers = self.peers.try_read_for(LOCK_TIMEOUT).ok_or_else(|| { + error!("block_peer: failed to get peers lock"); + Error::PeerException + })?; + match peers.get(&peer_addr) { + None => Err(Error::PeerNotFound), + Some(peer) => { + warn!("blocking peer {} ({})", peer_addr, reason); + peer.set_blocked(); + Ok(()) + } + } + } + + /// Unblock blocked peers. + pub fn unblock_peers(&self) -> Result<(), Error> { + let peers = self.peers.try_read_for(LOCK_TIMEOUT).ok_or_else(|| { + error!("unblock_peers: failed to get peers lock"); + Error::PeerException + })?; + let peers = peers.iter().into_iter(); + let _ = peers + .filter(|(_, peer)| peer.is_blocked()) + .map(|(_, peer)| peer.unblock()); + Ok(()) + } + /// We have enough outbound connected peers pub fn enough_outbound_peers(&self) -> bool { self.iter().outbound().connected().count() @@ -794,6 +839,13 @@ impl>> PeersIter { } } + /// Filter non-blocked peers. + pub fn non_blocked(self) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(|p| !p.is_blocked()), + } + } + /// Filter peers with the provided difficulty comparison fn. /// /// with_difficulty(|x| x > diff) @@ -819,6 +871,16 @@ impl>> PeersIter { } } + /// Custom filter. + pub fn with_filter( + self, + f: impl Fn(&Arc) -> bool, + ) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(move |p| f(p)), + } + } + pub fn by_addr(&mut self, addr: PeerAddr) -> Option> { self.iter.find(|p| p.info.addr == addr) } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index ecc496f1d..b806f1c92 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -194,7 +194,8 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende return; } - if !peers.enough_outbound_peers() { + let enough_outbound = peers.enough_outbound_peers(); + if !enough_outbound { // loop over connected peers that can provide peer lists // ask them for their list of peers let mut connected_peers: Vec = vec![]; @@ -239,7 +240,8 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende .iter() .filter(|p| { peers.get_connected_peer(p.addr).is_none() - && Utc::now().timestamp() - p.last_attempt >= max_attempt_delay + && (!enough_outbound + || Utc::now().timestamp() - p.last_attempt >= max_attempt_delay) }) .choose_multiple(&mut thread_rng(), max_peer_attempts / 2) { @@ -268,7 +270,9 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende // check min 32 (max 128, if there are no healthy and unknown) random defunct peers no more often than 1 hour per peer. for dp in defuncts .iter() - .filter(|p| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay) + .filter(|p| { + !enough_outbound || Utc::now().timestamp() - p.last_attempt >= max_attempt_delay + }) .choose_multiple(&mut thread_rng(), max_peer_attempts - new_peers.len()) { new_peers.push(&dp.addr); diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index aaee2e6a0..07faaf8bd 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -14,6 +14,7 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; +use grin_p2p::PeerAddr; use std::sync::Arc; use crate::chain::{self, pibd_params, SyncState, SyncStatus}; @@ -256,6 +257,46 @@ impl StateSync { .sync_state .remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS); + if !stale_segments.is_empty() { + for (seg_id, peer_addr) in stale_segments.iter() { + if let Some(peer_addr) = peer_addr { + let _ = self + .peers + .block_peer(PeerAddr(*peer_addr), "PIBD segment timeout"); + debug!( + "state_sync: peer {} moved to PIBD retry exclusion list for segment {:?}", + peer_addr, seg_id + ); + let is_outbound = { + self.peers + .iter() + .outbound() + .by_addr(PeerAddr(peer_addr.clone())) + .is_some() + }; + if is_outbound { + debug!("state_sync: disconnecting peer {}", peer_addr); + if let Err(e) = self + .peers + .disconnect_peer(PeerAddr(*peer_addr), "PIBD segment timeout") + { + debug!( + "state_sync: failed to disconnect timed-out peer {}: {:?}", + peer_addr, e + ); + } + } else { + debug!("state_sync: peer {} is not outbound or not connected, do not disconnect", peer_addr); + } + } else { + debug!( + "state_sync: PIBD request {:?} timed out without a recorded peer", + seg_id + ); + } + } + } + // Apply segments... TODO: figure out how this should be called, might // need to be a separate thread. if let Some(mut de) = desegmenter.try_write() { @@ -318,11 +359,25 @@ impl StateSync { .connected() }; + // Get peers with reasonable height for pibd. + let height_slack = pibd_params::PIBD_PEER_HEIGHT_SLACK_BLOCKS; + let max_pibd_height = peers_iter_pibd() + .into_iter() + .map(|p| p.info.height()) + .max() + .unwrap_or(0); + + let available_pibd_peers = || { + peers_iter_pibd().with_filter(|p| { + p.info.height().saturating_add(height_slack) >= max_pibd_height + }) + }; + // If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute, // abort PIBD and fall back to txhashset download // Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled // peer having the max difficulty - if peers_iter_pibd().count() == 0 { + if available_pibd_peers().count() == 0 { if let None = self.earliest_zero_pibd_peer_time { self.set_earliest_zero_pibd_peer_time(Some(Utc::now())); } @@ -339,72 +394,95 @@ impl StateSync { self.set_pibd_aborted(); return false; } - } else { - self.set_earliest_zero_pibd_peer_time(None) + let cleared = self.sync_state.clear_pibd_requests(); + if cleared > 0 { + warn!( + "state_sync: cleared {} pending PIBD requests because no PIBD-enabled peers are currently available", + cleared + ); + } + continue; } + self.set_earliest_zero_pibd_peer_time(None); + // Choose a random "most work" peer, excluding peer from stale segment and preferring outbound if at all possible. let excluded_peer = stale_segments .iter() .find(|(stale_id, _)| stale_id == seg_id) .and_then(|(_, addr)| *addr); - let peer = peers_iter_pibd() + let peer = available_pibd_peers() .outbound() + .non_blocked() .exclude(excluded_peer) .choose_random() .or_else(|| { - peers_iter_pibd() + available_pibd_peers() .inbound() + .non_blocked() .exclude(excluded_peer) .choose_random() + .or_else(|| { + // Select from blocked if we have no peers (could be network issue). + available_pibd_peers() + .exclude(excluded_peer) + .choose_random() + }) }); trace!("Chosen peer is {:?}", peer); - if let Some(p) = peer { - // add to list of segments that are being tracked - self.sync_state.add_pibd_segment(seg_id, p.info.addr.0); - let res = match seg_id.segment_type { - SegmentType::Bitmap => p.send_bitmap_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - SegmentType::Output => p.send_output_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - SegmentType::RangeProof => p.send_rangeproof_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - SegmentType::Kernel => p.send_kernel_segment_request( - archive_header.hash(), - seg_id.identifier.clone(), - ), - }; - if let Err(e) = res { + let p = match peer { + Some(p) => p, + None => { + debug!( + "state_sync: no eligible PIBD peers available for request {:?}", + seg_id + ); + continue; + } + }; + + // add to list of segments that are being tracked + self.sync_state.add_pibd_segment(seg_id, p.info.addr.0); + + let res = match seg_id.segment_type { + SegmentType::Bitmap => { + p.send_bitmap_segment_request(archive_header.hash(), seg_id.identifier.clone()) + } + SegmentType::Output => { + p.send_output_segment_request(archive_header.hash(), seg_id.identifier.clone()) + } + SegmentType::RangeProof => p.send_rangeproof_segment_request( + archive_header.hash(), + seg_id.identifier.clone(), + ), + SegmentType::Kernel => { + p.send_kernel_segment_request(archive_header.hash(), seg_id.identifier.clone()) + } + }; + if let Err(e) = res { + info!( + "Error sending request to peer at {}, reason: {:?}", + p.info.addr, e + ); + self.sync_state.remove_pibd_segment(seg_id); + } else if let Some(prev_peer) = excluded_peer { + if p.info.addr.0 != prev_peer { info!( - "Error sending request to peer at {}, reason: {:?}", - p.info.addr, e + "state_sync: retrying segment {:?} with new peer {} (previously {})", + seg_id, p.info.addr, prev_peer ); - self.sync_state.remove_pibd_segment(seg_id); - } else if let Some(prev_peer) = excluded_peer { - if p.info.addr.0 != prev_peer { - info!( - "state_sync: retrying segment {:?} with new peer {} (previously {})", - seg_id, p.info.addr, prev_peer - ); - } else { - debug!( - "state_sync: requested segment {:?} from peer {}", - seg_id, p.info.addr - ); - } } else { debug!( "state_sync: requested segment {:?} from peer {}", seg_id, p.info.addr ); } + } else { + debug!( + "state_sync: requested segment {:?} from peer {}", + seg_id, p.info.addr + ); } } false @@ -495,6 +573,7 @@ impl StateSync { } fn state_sync_reset(&mut self) { + let _ = self.peers.unblock_peers(); self.prev_state_sync = None; self.state_sync_peer = None; }