Skip to content

Gloas data column reprocess queue#9339

Open
eserilev wants to merge 13 commits into
sigp:unstablefrom
eserilev:data-column-reprocess-queue
Open

Gloas data column reprocess queue#9339
eserilev wants to merge 13 commits into
sigp:unstablefrom
eserilev:data-column-reprocess-queue

Conversation

@eserilev
Copy link
Copy Markdown
Member

Issue Addressed

When debugging ePBS with columns, we noticed that columns arriving before their block dont pass gossip verification checks and are dropped. This PR ensures that columns arriving before the block are sent to the reprocess queue. Once their block arrives, they are reprocessed.

This isn't an issue pre-gloas because we don't make block root checks for fulu data columns. This allows us to gossip verify the column and send it to the DA cache before the block arrives.

I think we also need to handle this edge case for partial data columns. Theres an existing TODO for that already.

@eserilev eserilev requested a review from jxs as a code owner May 22, 2026 12:44
@eserilev eserilev added ready-for-review The code is ready for review gloas labels May 22, 2026
const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;

/// How many columns we keep before new ones get dropped.
const MAXIMUM_QUEUED_DATA_COLUMNS: usize = 256;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked 256 so we could cache up to two slots worth of columns in the super node case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is around 24mb of storage. I concur.

Comment thread beacon_node/beacon_processor/src/lib.rs Outdated
}
ReadyWork::DataColumn(QueuedGossipDataColumn { process_fn, .. }) => Self {
drop_during_sync: true,
work: Work::UnknownBlockAttestation { process_fn },
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm what?!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, funny enough it still works but yeah fixed

const MAXIMUM_QUEUED_ATTESTATIONS: usize = 16_384;

/// How many columns we keep before new ones get dropped.
const MAXIMUM_QUEUED_DATA_COLUMNS: usize = 256;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is around 24mb of storage. I concur.

// Queue the column for reprocessing when the block arrives.
let processor = self.clone();
let reprocess_msg =
ReprocessQueueMessage::UnknownBlockDataColumn(QueuedGossipDataColumn {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen in a loop no?
Attacker sends random columns with random block roots -> We send it to reprocess queue -> timer expires -> try again -> send to reprocess queue again.

We might need a allow_reprocess kind of logic that we have for attestations here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// Queued backfill batches
queued_backfill_batches: Vec<QueuedBackfillBatch>,
/// Queued gossip data columns awaiting their block.
queued_gossip_data_columns: FnvHashMap<usize, (QueuedGossipDataColumn, DelayKey)>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need to store individual indices separately.
The logic could be similar to the envelope where we store all columns for a given root upto the queue size. If a root is imported, then release everything under it at once. The timer can be for a block_root level instead of per column.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@eserilev
Copy link
Copy Markdown
Member Author

sorry there was a bit of slop in this PR. was scrambling trying to fix things before the devnet forked to gloas and missed some claude nonsense.

Comment on lines -563 to -569
if let Some(oldest_root) =
self.awaiting_envelopes_per_root.keys().next().copied()
&& let Some((_envelope, delay_key)) =
self.awaiting_envelopes_per_root.remove(&oldest_root)
{
self.envelope_delay_queue.remove(&delay_key);
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we were just arbitrarily dropping a random envelope here which didn't seem right

@mergify
Copy link
Copy Markdown

mergify Bot commented May 24, 2026

Some required checks have failed. Could you please take a look @eserilev? 🙏

@mergify mergify Bot added waiting-on-author The reviewer has suggested changes and awaits thier implementation. and removed ready-for-review The code is ready for review labels May 24, 2026
@eserilev eserilev added ready-for-review The code is ready for review and removed waiting-on-author The reviewer has suggested changes and awaits thier implementation. labels May 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

gloas ready-for-review The code is ready for review

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants