Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions config/src/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,28 +141,29 @@ fn comments() -> HashMap<String, String> {
);

retval.insert(
"relay_secs".to_string(),
"epoch_secs".to_string(),
"
#dandelion relay time (choose new relay peer every n secs)
#dandelion epoch duration
"
.to_string(),
);

retval.insert(
"embargo_secs".to_string(),
"aggregation_secs".to_string(),
"
#fluff and broadcast after embargo expires if tx not seen on network
#dandelion aggregation period in secs
"
.to_string(),
);

retval.insert(
"patience_secs".to_string(),
"embargo_secs".to_string(),
"
#run dandelion stem/fluff processing every n secs (stem tx aggregation in this window)
#fluff and broadcast after embargo expires if tx not seen on network
"
.to_string(),
);

retval.insert(
"stem_probability".to_string(),
"
Expand Down
7 changes: 7 additions & 0 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::util::{Mutex, RwLock};
use std::fmt;
use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;
Expand Down Expand Up @@ -54,6 +55,12 @@ pub struct Peer {
connection: Option<Mutex<conn::Tracker>>,
}

impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Peer({:?})", &self.info)
}
}

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
Expand Down
55 changes: 0 additions & 55 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub struct Peers {
pub adapter: Arc<dyn ChainAdapter>,
store: PeerStore,
peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
dandelion_relay: RwLock<Option<(i64, Arc<Peer>)>>,
config: P2PConfig,
}

Expand All @@ -48,7 +47,6 @@ impl Peers {
store,
config,
peers: RwLock::new(HashMap::new()),
dandelion_relay: RwLock::new(None),
}
}

Expand Down Expand Up @@ -87,39 +85,6 @@ impl Peers {
self.save_peer(&peer_data)
}

// Update the dandelion relay
pub fn update_dandelion_relay(&self) {
let peers = self.outgoing_connected_peers();

let peer = &self
.config
.dandelion_peer
.and_then(|ip| peers.iter().find(|x| x.info.addr == ip))
.or(thread_rng().choose(&peers));

match peer {
Some(peer) => self.set_dandelion_relay(peer),
None => debug!("Could not update dandelion relay"),
}
}

fn set_dandelion_relay(&self, peer: &Arc<Peer>) {
// Clear the map and add new relay
let dandelion_relay = &self.dandelion_relay;
dandelion_relay
.write()
.replace((Utc::now().timestamp(), peer.clone()));
debug!(
"Successfully updated Dandelion relay to: {}",
peer.info.addr
);
}

// Get the dandelion relay
pub fn get_dandelion_relay(&self) -> Option<(i64, Arc<Peer>)> {
self.dandelion_relay.read().clone()
}

pub fn is_known(&self, addr: PeerAddr) -> bool {
self.peers.read().contains_key(&addr)
}
Expand Down Expand Up @@ -335,26 +300,6 @@ impl Peers {
);
}

/// Relays the provided stem transaction to our single stem peer.
pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
self.get_dandelion_relay()
.or_else(|| {
debug!("No dandelion relay, updating.");
self.update_dandelion_relay();
self.get_dandelion_relay()
})
// If still return an error, let the caller handle this as they see fit.
// The caller will "fluff" at this point as the stem phase is finished.
.ok_or(Error::NoDandelionRelay)
.map(|(_, relay)| {
if relay.is_connected() {
if let Err(e) = relay.send_stem_transaction(tx) {
debug!("Error sending stem transaction to peer relay: {:?}", e);
}
}
})
}

/// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our
/// peers. We may be connected to PEER_MAX_COUNT peers so we only
/// want to broadcast to a random subset of peers.
Expand Down
3 changes: 2 additions & 1 deletion pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ mod pool;
pub mod transaction_pool;
pub mod types;

pub use crate::pool::Pool;
pub use crate::transaction_pool::TransactionPool;
pub use crate::types::{
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntryState, PoolError, TxSource,
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource,
};
31 changes: 2 additions & 29 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use self::core::core::{
Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting,
};
use self::util::RwLock;
use crate::types::{BlockChain, PoolEntry, PoolEntryState, PoolError};
use crate::types::{BlockChain, PoolEntry, PoolError};
use grin_core as core;
use grin_util as util;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -167,33 +167,6 @@ impl Pool {
Ok(Some(tx))
}

pub fn select_valid_transactions(
&self,
txs: Vec<Transaction>,
extra_tx: Option<Transaction>,
header: &BlockHeader,
) -> Result<Vec<Transaction>, PoolError> {
let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?;
Ok(valid_txs)
}

pub fn get_transactions_in_state(&self, state: PoolEntryState) -> Vec<Transaction> {
self.entries
.iter()
.filter(|x| x.state == state)
.map(|x| x.tx.clone())
.collect::<Vec<_>>()
}

// Transition the specified pool entries to the new state.
pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) {
for x in &mut self.entries {
if txs.contains(&x.tx) {
x.state = state;
}
}
}

// Aggregate this new tx with all existing txs in the pool.
// If we can validate the aggregated tx against the current chain state
// then we can safely add the tx to the pool.
Expand Down Expand Up @@ -267,7 +240,7 @@ impl Pool {
Ok(new_sums)
}

fn validate_raw_txs(
pub fn validate_raw_txs(
&self,
txs: Vec<Transaction>,
extra_tx: Option<Transaction>,
Expand Down
45 changes: 21 additions & 24 deletions pool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use self::core::core::verifier_cache::VerifierCache;
use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting};
use self::util::RwLock;
use crate::pool::Pool;
use crate::types::{
BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource,
};
use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource};
use chrono::prelude::*;
use grin_core as core;
use grin_util as util;
Expand Down Expand Up @@ -79,10 +77,7 @@ impl TransactionPool {
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
// Add tx to stempool (passing in all txs from txpool to validate against).
self.stempool
.add_to_pool(entry, self.txpool.all_transactions(), header)?;

// Note: we do not notify the adapter here,
// we let the dandelion monitor handle this.
.add_to_pool(entry.clone(), self.txpool.all_transactions(), header)?;
Comment thread
antiochp marked this conversation as resolved.
Outdated
Ok(())
}

Expand Down Expand Up @@ -124,8 +119,6 @@ impl TransactionPool {
let txpool_tx = self.txpool.all_transactions_aggregate()?;
self.stempool.reconcile(txpool_tx, header)?;
}

self.adapter.tx_accepted(&entry.tx);
Ok(())
}

Expand Down Expand Up @@ -159,28 +152,32 @@ impl TransactionPool {
self.blockchain.verify_coinbase_maturity(&tx)?;

let entry = PoolEntry {
state: PoolEntryState::Fresh,
src,
tx_at: Utc::now(),
tx,
};

// If we are in "stem" mode then check if this is a new tx or if we have seen it before.
// If new tx - add it to our stempool.
// If we have seen any of the kernels before then fallback to fluff,
// adding directly to txpool.
if stem
&& self
.stempool
.find_matching_transactions(entry.tx.kernels())
.is_empty()
{
self.add_to_stempool(entry, header)?;
return Ok(());
// If this is a stem tx then attempt to stem.
// Any problems fallback to fluff later.
// If not stem then we are fluff.
let fluff = if stem {
Comment thread
antiochp marked this conversation as resolved.
Outdated
// Attempt to add to stempool, notifying adapter to relay to outbound peer.
// Fall back to fluffing (via txpool) if stempool interaction fails.
self.add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry.tx))
.is_err()
Comment thread
quentinlesceller marked this conversation as resolved.
} else {
true
};

// Fluff (broadcast) the tx if tagged as fluff or if
// stemming failed for any reason.
if fluff {
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry.tx);
}

self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry);
Ok(())
}

Expand Down
Loading