Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 25 additions & 22 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use {
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{
clock::{DEFAULT_TICKS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET},
packet::{Packet, PacketFlags},
packet::PacketFlags,
},
solana_streamer::streamer::{
self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats,
solana_streamer::{
packet::TxPacketBatch,
streamer::{self, StreamerReceiveStats, TxPacketBatchReceiver, TxPacketBatchSender},
},
solana_tpu_client::tpu_connection_cache::DEFAULT_TPU_ENABLE_UDP,
std::{
Expand All @@ -38,7 +39,7 @@ impl FetchStage {
exit: &Arc<AtomicBool>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
) -> (Self, PacketBatchReceiver, PacketBatchReceiver) {
) -> (Self, TxPacketBatchReceiver, TxPacketBatchReceiver) {
let (sender, receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let (forward_sender, forward_receiver) = unbounded();
Expand Down Expand Up @@ -68,10 +69,10 @@ impl FetchStage {
tpu_forwards_sockets: Vec<UdpSocket>,
tpu_vote_sockets: Vec<UdpSocket>,
exit: &Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
sender: &TxPacketBatchSender,
vote_sender: &TxPacketBatchSender,
forward_sender: &TxPacketBatchSender,
forward_receiver: TxPacketBatchReceiver,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
Expand All @@ -97,20 +98,22 @@ impl FetchStage {
}

fn handle_forwarded_packets(
recvr: &PacketBatchReceiver,
sendr: &PacketBatchSender,
recvr: &TxPacketBatchReceiver,
sendr: &TxPacketBatchSender,
poh_recorder: &Arc<RwLock<PohRecorder>>,
) -> Result<()> {
let mark_forwarded = |packet: &mut Packet| {
packet.meta_mut().flags |= PacketFlags::FORWARDED;
};
fn mark_batch_forwared(batch: &mut TxPacketBatch) {
batch.iter_mut().for_each(|packet| {
packet.meta_mut().flags |= PacketFlags::FORWARDED;
});
}

let mut packet_batch = recvr.recv()?;
let mut num_packets = packet_batch.len();
packet_batch.iter_mut().for_each(mark_forwarded);
mark_batch_forwared(&mut packet_batch);
let mut packet_batches = vec![packet_batch];
while let Ok(mut packet_batch) = recvr.try_recv() {
packet_batch.iter_mut().for_each(mark_forwarded);
mark_batch_forwared(&mut packet_batch);
num_packets += packet_batch.len();
packet_batches.push(packet_batch);
// Read at most 1K transactions in a loop
Expand Down Expand Up @@ -144,10 +147,10 @@ impl FetchStage {
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
sender: &PacketBatchSender,
vote_sender: &PacketBatchSender,
forward_sender: &PacketBatchSender,
forward_receiver: PacketBatchReceiver,
sender: &TxPacketBatchSender,
vote_sender: &TxPacketBatchSender,
forward_sender: &TxPacketBatchSender,
forward_receiver: TxPacketBatchReceiver,
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
Expand All @@ -161,7 +164,7 @@ impl FetchStage {
tpu_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
streamer::receiver2(
socket,
exit.clone(),
sender.clone(),
Expand All @@ -182,7 +185,7 @@ impl FetchStage {
tpu_forwards_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
streamer::receiver2(
socket,
exit.clone(),
forward_sender.clone(),
Expand All @@ -202,7 +205,7 @@ impl FetchStage {
let tpu_vote_threads: Vec<_> = tpu_vote_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
streamer::receiver2(
socket,
exit.clone(),
vote_sender.clone(),
Expand Down
12 changes: 6 additions & 6 deletions core/src/find_packet_sender_stake_stage.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::tx_packet_batch::TxPacketBatch,
solana_sdk::timing::timestamp,
solana_streamer::streamer::{self, StakedNodes, StreamerError},
solana_streamer::streamer::{self, StakedNodes, StreamerError, TxPacketBatchReceiver},
std::{
collections::HashMap,
net::IpAddr,
Expand All @@ -17,8 +17,8 @@ use {
// 50ms/(200ns/packet) = 250k packets
const MAX_FINDPACKETSENDERSTAKE_BATCH: usize = 250_000;

pub type FindPacketSenderStakeSender = Sender<Vec<PacketBatch>>;
pub type FindPacketSenderStakeReceiver = Receiver<Vec<PacketBatch>>;
pub type FindPacketSenderStakeSender = Sender<Vec<TxPacketBatch>>;
pub type FindPacketSenderStakeReceiver = Receiver<Vec<TxPacketBatch>>;

#[derive(Debug, Default)]
struct FindPacketSenderStakeStats {
Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct FindPacketSenderStakeStage {

impl FindPacketSenderStakeStage {
pub fn new(
packet_receiver: streamer::PacketBatchReceiver,
packet_receiver: TxPacketBatchReceiver,
sender: FindPacketSenderStakeSender,
staked_nodes: Arc<RwLock<StakedNodes>>,
name: &'static str,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl FindPacketSenderStakeStage {
Self { thread_hdl }
}

fn apply_sender_stakes(batches: &mut [PacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
fn apply_sender_stakes(batches: &mut [TxPacketBatch], ip_to_stake: &HashMap<IpAddr, u64>) {
batches
.iter_mut()
.flat_map(|batch| batch.iter_mut())
Expand Down
47 changes: 34 additions & 13 deletions core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,26 @@
//!

pub use solana_perf::sigverify::{
count_packets_in_batches, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset,
ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset,
};
use {
<<<<<<< HEAD
crate::{
banking_trace::{BankingPacketBatch, BankingPacketSender},
sigverify_stage::{SigVerifier, SigVerifyServiceError},
},
=======
crate::{banking_stage::BankingPacketBatch, sigverify_stage::SigVerifyServiceError},
crossbeam_channel::Sender,
<<<<<<< HEAD
>>>>>>> e59cd309c9 (Rip out SigVerifier trait)
solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify},
=======
solana_perf::{
cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify,
tx_packet_batch::TxPacketViewMut,
},
>>>>>>> 4ba1998ccb (Plumbing TxPacketBatch through Tx packet path)
solana_sdk::{packet::Packet, saturating_add_assign},
};

Expand Down Expand Up @@ -55,21 +67,33 @@ impl SigverifyTracerPacketStats {
}

pub struct TransactionSigVerifier {
<<<<<<< HEAD
packet_sender: BankingPacketSender,
=======
packet_sender: Sender<BankingPacketBatch>,
>>>>>>> e59cd309c9 (Rip out SigVerifier trait)
tracer_packet_stats: SigverifyTracerPacketStats,
recycler: Recycler<TxOffset>,
recycler_out: Recycler<PinnedVec<u8>>,
reject_non_vote: bool,
}

impl TransactionSigVerifier {
<<<<<<< HEAD
pub fn new_reject_non_vote(packet_sender: BankingPacketSender) -> Self {
=======
pub fn new_reject_non_vote(packet_sender: Sender<BankingPacketBatch>) -> Self {
>>>>>>> e59cd309c9 (Rip out SigVerifier trait)
let mut new_self = Self::new(packet_sender);
new_self.reject_non_vote = true;
new_self
}

<<<<<<< HEAD
pub fn new(packet_sender: BankingPacketSender) -> Self {
=======
pub fn new(packet_sender: Sender<BankingPacketBatch>) -> Self {
>>>>>>> e59cd309c9 (Rip out SigVerifier trait)
init();
Self {
packet_sender,
Expand All @@ -79,19 +103,16 @@ impl TransactionSigVerifier {
reject_non_vote: false,
}
}
}

impl SigVerifier for TransactionSigVerifier {
type SendType = BankingPacketBatch;

#[inline(always)]
fn process_received_packet(
pub fn process_received_packet(
&mut self,
packet: &mut Packet,
packet: &TxPacketViewMut,
removed_before_sigverify_stage: bool,
is_dup: bool,
) {
sigverify::check_for_tracer_packet(packet);
// TODO: re-enable tracer check
//sigverify::check_for_tracer_packet(packet);
if packet.meta().is_tracer_packet() {
if removed_before_sigverify_stage {
self.tracer_packet_stats
Expand All @@ -107,24 +128,24 @@ impl SigVerifier for TransactionSigVerifier {
}

#[inline(always)]
fn process_excess_packet(&mut self, packet: &Packet) {
pub fn process_excess_packet(&mut self, packet: &TxPacketViewMut) {
if packet.meta().is_tracer_packet() {
self.tracer_packet_stats.total_excess_tracer_packets += 1;
}
}

#[inline(always)]
fn process_passed_sigverify_packet(&mut self, packet: &Packet) {
pub fn process_passed_sigverify_packet(&mut self, packet: &Packet) {
if packet.meta().is_tracer_packet() {
self.tracer_packet_stats
.total_tracker_packets_passed_sigverify += 1;
}
}

fn send_packets(
pub fn send_packets(
&mut self,
packet_batches: Vec<PacketBatch>,
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
) -> Result<(), SigVerifyServiceError<BankingPacketBatch>> {
let tracer_packet_stats_to_send = std::mem::take(&mut self.tracer_packet_stats);
self.packet_sender.send(BankingPacketBatch::new((
packet_batches,
Expand All @@ -133,7 +154,7 @@ impl SigVerifier for TransactionSigVerifier {
Ok(())
}

fn verify_batches(
pub fn verify_batches(
&self,
mut batches: Vec<PacketBatch>,
valid_packets: usize,
Expand Down
Loading