Skip to content
8 changes: 6 additions & 2 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
pub use crate::scheduler::BeaconProcessorQueueLengths;
use crate::scheduler::work_queue::WorkQueues;
use crate::work_reprocessing_queue::{
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipEnvelope,
ReprocessQueueMessage,
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, QueuedGossipDataColumn,
QueuedGossipEnvelope, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
Expand Down Expand Up @@ -304,6 +304,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
work: Work::ColumnReconstruction(process_fn),
}
}
ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockAttestation { process_fn },
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm what?!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, funny enough it still works but yeah fixed

},
}
}
}
Expand Down
175 changes: 175 additions & 0 deletions beacon_node/beacon_processor/src/scheduler/work_reprocessing_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ const MAXIMUM_QUEUED_ENVELOPES: usize = 16;
/// How many attestations we keep before new ones get dropped.
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;

/// How many columns we keep before new ones get dropped.
const MAXIMUM_QUEUED_DATA_COLUMNS: usize = 256;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked 256 so we could cache up to two slots worth of columns in the super node case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is around 24mb of storage. I concur.


/// How many light client updates we keep before new ones get dropped.
const MAXIMUM_QUEUED_LIGHT_CLIENT_UPDATES: usize = 128;

Expand Down Expand Up @@ -123,6 +126,8 @@ pub enum ReprocessQueueMessage {
UnknownLightClientOptimisticUpdate(QueuedLightClientUpdate),
/// A new backfill batch that needs to be scheduled for processing.
BackfillSync(QueuedBackfillBatch),
/// A gossip data column that references an unknown block.
UnknownBlockDataColumn(QueuedGossipDataColumn),
/// A delayed column reconstruction that needs checking
DelayColumnReconstruction(QueuedColumnReconstruction),
}
Expand All @@ -138,6 +143,7 @@ pub enum ReadyWork {
LightClientUpdate(QueuedLightClientUpdate),
BackfillSync(QueuedBackfillBatch),
ColumnReconstruction(QueuedColumnReconstruction),
DataColumn(QueuedGossipDataColumn),
}

/// An Attestation for which the corresponding block was not seen while processing, queued for
Expand Down Expand Up @@ -200,6 +206,12 @@ pub struct QueuedColumnReconstruction {
pub process_fn: AsyncFn,
}

/// A gossip data column that references an unknown block, queued for later reprocessing.
pub struct QueuedGossipDataColumn {
pub beacon_block_root: Hash256,
pub process_fn: BlockingFn,
}

impl<E: EthSpec> TryFrom<WorkEvent<E>> for QueuedBackfillBatch {
type Error = WorkEvent<E>;

Expand Down Expand Up @@ -240,6 +252,8 @@ enum InboundEvent {
ReadyBackfillSync(QueuedBackfillBatch),
/// A column reconstruction that was queued is ready for processing.
ReadyColumnReconstruction(QueuedColumnReconstruction),
/// A gossip data column that timed out waiting for its block.
ReadyDataColumn(usize),
/// A message sent to the `ReprocessQueue`
Msg(ReprocessQueueMessage),
}
Expand All @@ -264,6 +278,7 @@ struct ReprocessQueue<S> {
lc_updates_delay_queue: DelayQueue<QueuedLightClientUpdateId>,
/// Queue to manage scheduled column reconstructions.
column_reconstructions_delay_queue: DelayQueue<QueuedColumnReconstruction>,
data_columns_delay_queue: DelayQueue<usize>,

/* Queued items */
/// Queued blocks.
Expand All @@ -284,10 +299,15 @@ struct ReprocessQueue<S> {
queued_column_reconstructions: HashMap<Hash256, Option<DelayKey>>,
/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,
/// Queued gossip data columns awaiting their block.
queued_gossip_data_columns: FnvHashMap<usize, (QueuedGossipDataColumn, DelayKey)>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need to store individual indices separately.
The logic could be similar to the envelope where we store all columns for a given root upto the queue size. If a root is imported, then release everything under it at once. The timer can be for a block_root level instead of per column.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

/// Data columns per block root.
awaiting_data_columns_per_root: HashMap<Hash256, Vec<usize>>,

/* Aux */
/// Next attestation id, used for both aggregated and unaggregated attestations
next_attestation: usize,
next_data_column: usize,
next_lc_update: usize,
early_block_debounce: TimeLatch,
envelope_delay_debounce: TimeLatch,
Expand Down Expand Up @@ -387,6 +407,13 @@ impl<S: SlotClock> Stream for ReprocessQueue<S> {
Poll::Ready(None) | Poll::Pending => (),
}

match self.data_columns_delay_queue.poll_expired(cx) {
Poll::Ready(Some(col_id)) => {
return Poll::Ready(Some(InboundEvent::ReadyDataColumn(col_id.into_inner())));
}
Poll::Ready(None) | Poll::Pending => (),
}

if let Some(next_backfill_batch_event) = self.next_backfill_batch_event.as_mut() {
match next_backfill_batch_event.as_mut().poll(cx) {
Poll::Ready(_) => {
Expand Down Expand Up @@ -455,6 +482,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
attestations_delay_queue: DelayQueue::new(),
lc_updates_delay_queue: DelayQueue::new(),
column_reconstructions_delay_queue: DelayQueue::new(),
data_columns_delay_queue: DelayQueue::new(),
queued_gossip_block_roots: HashSet::new(),
awaiting_envelopes_per_root: HashMap::new(),
queued_lc_updates: FnvHashMap::default(),
Expand All @@ -464,7 +492,10 @@ impl<S: SlotClock> ReprocessQueue<S> {
awaiting_lc_updates_per_parent_root: HashMap::new(),
queued_backfill_batches: Vec::new(),
queued_column_reconstructions: HashMap::new(),
queued_gossip_data_columns: FnvHashMap::default(),
awaiting_data_columns_per_root: HashMap::new(),
next_attestation: 0,
next_data_column: 0,
next_lc_update: 0,
early_block_debounce: TimeLatch::default(),
envelope_delay_debounce: TimeLatch::default(),
Expand Down Expand Up @@ -688,6 +719,29 @@ impl<S: SlotClock> ReprocessQueue<S> {

self.next_attestation += 1;
}
InboundEvent::Msg(UnknownBlockDataColumn(queued_data_column)) => {
if self.queued_gossip_data_columns.len() >= MAXIMUM_QUEUED_DATA_COLUMNS {
return;
}

let col_id = self.next_data_column;

let delay_key = self
.data_columns_delay_queue
.insert(col_id, QUEUED_ATTESTATION_DELAY);

// Register this column for the corresponding block root.
self.awaiting_data_columns_per_root
.entry(queued_data_column.beacon_block_root)
.or_default()
.push(col_id);

// Store the column and its info.
self.queued_gossip_data_columns
.insert(col_id, (queued_data_column, delay_key));

self.next_data_column += 1;
}
InboundEvent::Msg(UnknownLightClientOptimisticUpdate(
queued_light_client_optimistic_update,
)) => {
Expand Down Expand Up @@ -800,6 +854,24 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}

// Unqueue the data columns we have for this root, if any.
if let Some(queued_ids) = self.awaiting_data_columns_per_root.remove(&block_root) {
for col_id in queued_ids {
if let Some((data_column, delay_key)) =
self.queued_gossip_data_columns.remove(&col_id)
{
self.data_columns_delay_queue.remove(&delay_key);
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
.is_err()
{
error!(?block_root, "Failed to send data column for reprocessing");
}
}
}
}
}
InboundEvent::Msg(NewLightClientOptimisticUpdate { parent_root }) => {
// Unqueue the light client optimistic updates we have for this root, if any.
Expand Down Expand Up @@ -1053,6 +1125,31 @@ impl<S: SlotClock> ReprocessQueue<S> {
);
}
}
InboundEvent::ReadyDataColumn(col_id) => {
if let Some((data_column, _)) = self.queued_gossip_data_columns.remove(&col_id) {
// Clean up the per-root index.
let root = data_column.beacon_block_root;
if let Entry::Occupied(mut entry) =
self.awaiting_data_columns_per_root.entry(root)
{
let ids = entry.get_mut();
ids.retain(|&id| id != col_id);
if ids.is_empty() {
entry.remove_entry();
}
}
if self
.ready_work_tx
.try_send(ReadyWork::DataColumn(data_column))
.is_err()
{
error!(
hint = "system may be overloaded",
"Ignored expired gossip data column"
);
}
}
}
}

metrics::set_gauge_vec(
Expand Down Expand Up @@ -1625,4 +1722,82 @@ mod tests {
.contains_key(&overflow_root)
);
}

/// Tests that a queued gossip data column is released when its block is imported.
#[tokio::test]
async fn data_column_released_on_block_imported() {
create_test_tracing_subscriber();

let config = BeaconProcessorConfig::default();
let (ready_work_tx, mut ready_work_rx) =
mpsc::channel::<ReadyWork>(config.max_scheduled_work_queue_len);
let (_, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(config.max_scheduled_work_queue_len);
let slot_clock = Arc::new(testing_slot_clock(12));
let mut queue = ReprocessQueue::new(ready_work_tx, reprocess_work_rx, slot_clock);

tokio::time::pause();

let beacon_block_root = Hash256::repeat_byte(0xbb);

let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(msg));

assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
assert_eq!(queue.queued_gossip_data_columns.len(), 1);
assert_eq!(queue.data_columns_delay_queue.len(), 1);

// Simulate block import.
queue.handle_message(InboundEvent::Msg(ReprocessQueueMessage::BlockImported {
block_root: beacon_block_root,
parent_root: Hash256::repeat_byte(0x00),
}));

// Internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
assert!(queue.queued_gossip_data_columns.is_empty());
assert_eq!(queue.data_columns_delay_queue.len(), 0);

// The column should have been sent to the ready_work channel.
let ready = ready_work_rx.try_recv().expect("column should be ready");
assert!(matches!(ready, ReadyWork::DataColumn(_)));
}

/// Tests that an expired gossip data column is pruned cleanly from all internal state.
#[tokio::test]
async fn prune_awaiting_data_columns_per_root() {
create_test_tracing_subscriber();

let mut queue = test_queue();

tokio::time::pause();

let beacon_block_root = Hash256::repeat_byte(0xcd);

let msg = ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
beacon_block_root,
process_fn: Box::new(|| {}),
});
queue.handle_message(InboundEvent::Msg(msg));

assert_eq!(queue.awaiting_data_columns_per_root.len(), 1);
assert!(
queue
.awaiting_data_columns_per_root
.contains_key(&beacon_block_root)
);

// Advance time past the delay so the entry expires.
advance_time(&queue.slot_clock, 2 * QUEUED_ATTESTATION_DELAY).await;
let ready_msg = queue.next().await.unwrap();
assert!(matches!(ready_msg, InboundEvent::ReadyDataColumn(_)));
queue.handle_message(ready_msg);

// All internal state should be cleaned up.
assert!(queue.awaiting_data_columns_per_root.is_empty());
assert!(queue.queued_gossip_data_columns.is_empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction;
use beacon_processor::{
DuplicateCache, GossipAggregatePackage, GossipAttestationBatch,
work_reprocessing_queue::{
QueuedAggregate, QueuedGossipBlock, QueuedGossipEnvelope, QueuedLightClientUpdate,
QueuedUnaggregate, ReprocessQueueMessage,
QueuedAggregate, QueuedGossipBlock, QueuedGossipDataColumn, QueuedGossipEnvelope,
QueuedLightClientUpdate, QueuedUnaggregate, ReprocessQueueMessage,
},
};

Expand Down Expand Up @@ -728,19 +728,44 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
..
} => {
debug!(
action = "ignoring",
action = "queuing for reprocessing",
%unknown_block_root,
"Unknown block root for column"
);
// TODO(gloas): wire this into proper lookup sync. Sending
// `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that
// mixes column processing with the attestation lookup path and is not
// the right primitive for Gloas column lookups.
self.propagate_validation_result(
message_id,
message_id.clone(),
peer_id,
MessageAcceptance::Ignore,
);

// Queue the column for reprocessing when the block arrives.
let processor = self.clone();
let reprocess_msg =
ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen in a loop no?
Attacker sends random columns with random block roots -> We send it to reprocess queue -> timer expires -> try again -> send to reprocess queue again.

We might need a allow_reprocess kind of logic that we have for attestations here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beacon_block_root: unknown_block_root,
process_fn: Box::new(move || {
let _ = processor.send_gossip_data_column_sidecar(
message_id,
peer_id,
subnet_id,
column_sidecar,
seen_duration,
);
}),
});
if self
.beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(reprocess_msg),
})
.is_err()
{
debug!(
%unknown_block_root,
"Failed to queue data column for reprocessing"
);
}
}
GossipDataColumnError::InvalidVariant
| GossipDataColumnError::PubkeyCacheTimeout
Expand Down
Loading