Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20;
/// will always be requested first)
pub const SEGMENT_REQUEST_COUNT: usize = 15;

/// How many blocks behind the tip a PIBD peer may be and still be considered usable.
pub const PIBD_PEER_HEIGHT_SLACK_BLOCKS: u64 = 2;

/// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds
/// give up and revert back to the txhashset.zip download method
pub const TXHASHSET_ZIP_FALLBACK_TIME_SECS: i64 = 60;
207 changes: 113 additions & 94 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,102 +497,126 @@ impl Desegmenter {
}
}
}
} else {
// We have all required bitmap segments and have recreated our local
// bitmap, now continue with other segments, evenly spreading requests
// among MMRs
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
// Bitmap is not finished yet, continue at next iteration when it will be ready.
return return_vec;
}

// We have all required bitmap segments and have recreated our local
// bitmap, now continue with other segments, evenly spreading requests
// among MMRs
let local_output_mmr_size;
let local_kernel_mmr_size;
let local_rangeproof_mmr_size;
{
let txhashset = self.txhashset.read();
local_output_mmr_size = txhashset.output_mmr_size();
local_kernel_mmr_size = txhashset.kernel_mmr_size();
local_rangeproof_mmr_size = txhashset.rangeproof_mmr_size();
}
let total_output_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
let mut elems_added = 0;
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
while (next_output_idx as usize) < total_output_segments {
if elems_added == max_elements / 3 {
break;
}
let output_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
let (_first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
elems_added += 1;
}
next_output_idx += 1;
}
let total_output_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);
let mut elems_added = 0;
if let Some(mut next_output_idx) = self.next_required_output_segment_index() {
while (next_output_idx as usize) < total_output_segments {
if self.output_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let output_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
let (_first, last) =
output_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_output_mmr_size && !self.has_output_segment_with_id(output_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Output, output_id));
elems_added += 1;
}
next_output_idx += 1;
}

let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);
elems_added = 0;
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
while (next_rp_idx as usize) < total_rangeproof_segments {
if elems_added == max_elements / 3 {
break;
}
let rp_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
let (_first, last) = rp_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_rangeproof_mmr_size && !self.has_rangeproof_segment_with_id(rp_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
elems_added += 1;
}
next_rp_idx += 1;
}
}

let total_rangeproof_segments = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);
elems_added = 0;
if let Some(mut next_rp_idx) = self.next_required_rangeproof_segment_index() {
while (next_rp_idx as usize) < total_rangeproof_segments {
if self.rangeproof_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let rp_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
let (_first, last) =
rp_id.segment_pos_range(self.archive_header.output_mmr_size);
if last > local_rangeproof_mmr_size
&& !self.has_rangeproof_segment_with_id(rp_id)
{
return_vec.push(SegmentTypeIdentifier::new(SegmentType::RangeProof, rp_id));
elems_added += 1;
}
next_rp_idx += 1;
let total_kernel_segments = SegmentIdentifier::count_segments_required(
self.archive_header.kernel_mmr_size,
self.default_kernel_segment_height,
);
elems_added = 0;
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
while (next_kernel_idx as usize) < total_kernel_segments {
if elems_added == max_elements / 3 {
break;
}
let k_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let (_first, last) = k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
next_kernel_idx += 1;
}
}

let total_kernel_segments = SegmentIdentifier::count_segments_required(
self.archive_header.kernel_mmr_size,
self.default_kernel_segment_height,
);
elems_added = 0;
if let Some(mut next_kernel_idx) = self.next_required_kernel_segment_index() {
while (next_kernel_idx as usize) < total_kernel_segments {
if self.kernel_segment_cache.len() >= self.max_cached_segments {
break;
}
if elems_added == max_elements / 3 {
break;
}
let k_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let (_first, last) =
k_id.segment_pos_range(self.archive_header.kernel_mmr_size);
if last > local_kernel_mmr_size && !self.has_kernel_segment_with_id(k_id) {
return_vec.push(SegmentTypeIdentifier::new(SegmentType::Kernel, k_id));
elems_added += 1;
}
next_kernel_idx += 1;
// Explicitly add segment identifier to request if not exists.
let mut maybe_add_to_request = |seg_id: SegmentTypeIdentifier| {
if return_vec.iter().any(|i| i == &seg_id) {
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(seg_id);
}
};

// Ensure we explicitly ask for the next output segment.
if let Some(next_output_idx) = self.next_required_output_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
if !self.has_output_segment_with_id(seg_id) {
let next_output_seg_id = SegmentTypeIdentifier::new(SegmentType::Output, seg_id);
maybe_add_to_request(next_output_seg_id);
}
}

// Ensure we explicitly ask for the next rangeproof segment.
if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
if !self.has_rangeproof_segment_with_id(seg_id) {
let next_proof_seg_id = SegmentTypeIdentifier::new(SegmentType::RangeProof, seg_id);
maybe_add_to_request(next_proof_seg_id);
}
}

// Always ensure we explicitly ask for the very next kernel segment we are waiting on.
// The regular round-robin above can get saturated with outputs and rangeproofs while
// the desegmenter is blocked on a missing kernel, so we force this one in.
Expand All @@ -601,14 +625,9 @@ impl Desegmenter {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
if !self.has_kernel_segment_with_id(seg_id)
&& !return_vec.iter().any(|x| x == &next_kernel_seg_id)
{
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(next_kernel_seg_id);
if !self.has_kernel_segment_with_id(seg_id) {
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
maybe_add_to_request(next_kernel_seg_id);
}
}
if return_vec.is_empty() && self.bitmap_cache.is_some() {
Expand Down
10 changes: 10 additions & 0 deletions chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ impl SyncState {
removed_segments
}

/// Drop all tracked PIBD requests, returning how many entries were removed.
pub fn clear_pibd_requests(&self) -> usize {
let mut requests = self.requested_pibd_segments.write();
let cleared = requests.len();
if cleared > 0 {
requests.clear();
}
cleared
}

/// Check whether segment is in request list
pub fn contains_pibd_segment(&self, id: &SegmentTypeIdentifier) -> bool {
self.requested_pibd_segments
Expand Down
44 changes: 42 additions & 2 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
// limitations under the License.

use crate::util::{Mutex, RwLock};
use chrono::Duration;
use lru_cache::LruCache;
use std::fmt;
use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use lru_cache::LruCache;

use crate::chain;
use crate::chain::txhashset::BitmapChunk;
use crate::conn;
Expand Down Expand Up @@ -52,6 +52,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500;
enum State {
Connected,
Banned,
Blocked(DateTime<Utc>, u32),
}

pub struct Peer {
Expand Down Expand Up @@ -193,6 +194,45 @@ impl Peer {
State::Banned == *self.state.read()
}

/// Whether this peer has been blocked.
pub fn is_blocked(&self) -> bool {
match *self.state.read() {
State::Blocked(expiry, _) => expiry > Utc::now(),
_ => false,
}
}

/// Set this peer status to blocked.
pub fn set_blocked(&self) {
if self.is_blocked() {
return;
}
let times = {
match *self.state.read() {
State::Blocked(_, times) => times + 1,
_ => 1,
}
};
let duration = match times {
1 => 60, // 1m
2 => 180, // 3m
_ => 600, // 10m
};
let expiry = Utc::now() + Duration::seconds(duration);
*self.state.write() = State::Blocked(expiry, times);
debug!(
"state_sync: block peer {} for {} times: {}",
self.info.addr, duration, times
);
}

/// Unblock blocked peer.
pub fn unblock(&self) {
if self.is_blocked() {
*self.state.write() = State::Connected;
}
}

/// Whether this peer is stuck on sync.
pub fn is_stuck(&self) -> (bool, Difficulty) {
let peer_live_info = self.info.live_info.read();
Expand Down
Loading
Loading