Skip to content
23 changes: 21 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
pub use crate::scheduler::BeaconProcessorQueueLengths;
use crate::scheduler::work_queue::WorkQueues;
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope,
ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipDataColumn,
QueuedGossipEnvelope, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
Expand Down Expand Up @@ -304,6 +304,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
work: Work::ColumnReconstruction(process_fn),
}
}
ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockDataColumn { process_fn },
},
}
}
}
Expand Down Expand Up @@ -369,6 +373,9 @@ pub enum Work<E: EthSpec> {
UnknownBlockAttestation {
process_fn: BlockingFn,
},
UnknownBlockDataColumn {
process_fn: BlockingFn,
},
GossipAttestationBatch {
attestations: GossipAttestationBatch,
process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
Expand Down Expand Up @@ -465,6 +472,7 @@ pub enum WorkType {
GossipAttestation,
GossipAttestationToConvert,
UnknownBlockAttestation,
UnknownBlockDataColumn,
GossipAttestationBatch,
GossipAggregate,
UnknownBlockAggregate,
Expand Down Expand Up @@ -572,6 +580,7 @@ impl<E: EthSpec> Work<E> {
Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest,
Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
Work::UnknownBlockDataColumn { .. } => WorkType::UnknownBlockDataColumn,
Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
Work::UnknownLightClientOptimisticUpdate { .. } => {
WorkType::UnknownLightClientOptimisticUpdate
Expand Down Expand Up @@ -986,6 +995,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() {
Some(item)
} else if let Some(item) = work_queues.unknown_block_attestation_queue.pop()
{
Some(item)
} else if let Some(item) = work_queues.unknown_block_data_column_queue.pop()
{
Some(item)
// Check execution payload bids. Most proposers will request bids directly from builders
Expand Down Expand Up @@ -1246,6 +1258,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownBlockAttestation { .. } => {
work_queues.unknown_block_attestation_queue.push(work)
}
Work::UnknownBlockDataColumn { .. } => work_queues
.unknown_block_data_column_queue
.push(work, work_id),
Work::UnknownBlockAggregate { .. } => {
work_queues.unknown_block_aggregate_queue.push(work)
}
Expand Down Expand Up @@ -1296,6 +1311,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::UnknownBlockAttestation => {
work_queues.unknown_block_attestation_queue.len()
}
WorkType::UnknownBlockDataColumn => {
work_queues.unknown_block_data_column_queue.len()
}
WorkType::GossipAttestationBatch => 0, // No queue
WorkType::GossipAggregate => work_queues.aggregate_queue.len(),
WorkType::UnknownBlockAggregate => {
Expand Down Expand Up @@ -1513,6 +1531,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
}),
Work::UnknownBlockAttestation { process_fn }
| Work::UnknownBlockAggregate { process_fn }
| Work::UnknownBlockDataColumn { process_fn }
| Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
task_spawner.spawn_blocking(process_fn)
}
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_processor/src/scheduler/work_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct BeaconProcessorQueueLengths {
attestation_queue: usize,
unknown_block_aggregate_queue: usize,
unknown_block_attestation_queue: usize,
unknown_block_data_column_queue: usize,
sync_message_queue: usize,
sync_contribution_queue: usize,
gossip_voluntary_exit_queue: usize,
Expand Down Expand Up @@ -175,6 +176,8 @@ impl BeaconProcessorQueueLengths {
Ok(Self {
aggregate_queue: 4096,
unknown_block_aggregate_queue: 1024,
// Capacity for two slot's worth of data columns for a supernode.
unknown_block_data_column_queue: 256,
// Capacity for a full slot's worth of attestations if subscribed to all subnets
attestation_queue: std::cmp::max(
active_validator_count / slots_per_epoch,
Expand Down Expand Up @@ -247,6 +250,7 @@ pub struct WorkQueues<E: EthSpec> {
pub attestation_debounce: TimeLatch,
pub unknown_block_aggregate_queue: LifoQueue<Work<E>>,
pub unknown_block_attestation_queue: LifoQueue<Work<E>>,
pub unknown_block_data_column_queue: FifoQueue<Work<E>>,
pub sync_message_queue: LifoQueue<Work<E>>,
pub sync_contribution_queue: LifoQueue<Work<E>>,
pub gossip_voluntary_exit_queue: FifoQueue<Work<E>>,
Expand Down Expand Up @@ -305,6 +309,8 @@ impl<E: EthSpec> WorkQueues<E> {
LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
let unknown_block_attestation_queue =
LifoQueue::new(queue_lengths.unknown_block_attestation_queue);
let unknown_block_data_column_queue =
FifoQueue::new(queue_lengths.unknown_block_data_column_queue);

let sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
let sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);
Expand Down Expand Up @@ -387,6 +393,7 @@ impl<E: EthSpec> WorkQueues<E> {
attestation_debounce,
unknown_block_aggregate_queue,
unknown_block_attestation_queue,
unknown_block_data_column_queue,
sync_message_queue,
sync_contribution_queue,
gossip_voluntary_exit_queue,
Expand Down
Loading
Loading