Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 62 additions & 63 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,15 @@ pub(super) mod tests {
fetch_task_handle.await.unwrap();
}

#[tokio::test]
async fn test_fetch_task_signals_eof() {
/// Spawns a fetch task against a closed shard whose queue holds `num_records` records and whose
/// replication position is `replication_position_inclusive`, then asserts that fetching from
/// `from_position_exclusive` immediately signals EOF at `expected_eof_position`.
async fn check_fetch_task_signals_eof(
num_records: usize,
replication_position_inclusive: Position,
from_position_exclusive: Position,
expected_eof_position: Position,
) {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
Expand All @@ -868,26 +875,25 @@ pub(super) mod tests {
.await
.unwrap();

mrecordlog_guard
.as_mut()
.unwrap()
.append_records(
&queue_id,
None,
std::iter::once(MRecord::new_doc("test-doc-foo").encode()),
)
.await
.unwrap();
if num_records > 0 {
let records = (0..num_records).map(|_| MRecord::new_doc("test-doc-foo").encode());
mrecordlog_guard
.as_mut()
.unwrap()
.append_records(&queue_id, None, records)
.await
.unwrap();
}
drop(mrecordlog_guard);

let open_fetch_stream_request = OpenFetchStreamRequest {
client_id: client_id.clone(),
client_id,
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
from_position_exclusive: Some(Position::offset(0u64)),
from_position_exclusive: Some(from_position_exclusive.clone()),
};
let shard_status = (ShardState::Closed, Position::offset(0u64));
let shard_status = (ShardState::Closed, replication_position_inclusive);
let (_shard_status_tx, shard_status_rx) = watch::channel(shard_status);

let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
Expand All @@ -906,63 +912,56 @@ pub(super) mod tests {
assert_eq!(fetch_eof.index_uid(), &index_uid);
assert_eq!(fetch_eof.source_id, source_id);
assert_eq!(fetch_eof.shard_id(), shard_id);
assert_eq!(fetch_eof.eof_position, Some(Position::eof(0u64).as_eof()));
assert_eq!(
fetch_eof.eof_position,
Some(expected_eof_position),
"unexpected EOF position when fetching from `{from_position_exclusive:?}`"
);

fetch_task_handle.await.unwrap();
}

#[tokio::test]
async fn test_fetch_task_signals_eof_at_beginning() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let source_id = "test-source".to_string();
let shard_id = ShardId::from(1);
let queue_id = queue_id(&index_uid, &source_id, &shard_id);

let open_fetch_stream_request = OpenFetchStreamRequest {
client_id: client_id.clone(),
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
from_position_exclusive: Some(Position::Beginning),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
1024,
);
let mut mrecordlog_guard = mrecordlog.write().await;

mrecordlog_guard
.as_mut()
.unwrap()
.create_queue(&queue_id)
.await
.unwrap();
drop(mrecordlog_guard);
async fn test_fetch_task_signals_eof() {
check_fetch_task_signals_eof(
0,
Position::Beginning,
Position::Beginning,
Position::Beginning.as_eof(),
)
.await;

let shard_status = (ShardState::Closed, Position::Beginning);
shard_status_tx.send(shard_status).unwrap();
check_fetch_task_signals_eof(
0,
Position::Beginning,
Position::offset(0u64),
Position::eof(0u64),
)
.await;

let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
let fetch_eof = into_fetch_eof(fetch_message);
check_fetch_task_signals_eof(
0,
Position::Beginning,
Position::offset(42u64),
Position::eof(42u64),
)
.await;

assert_eq!(fetch_eof.index_uid(), &index_uid);
assert_eq!(fetch_eof.source_id, source_id);
assert_eq!(fetch_eof.shard_id(), shard_id);
assert_eq!(fetch_eof.eof_position, Some(Position::Beginning.as_eof()));
check_fetch_task_signals_eof(
1,
Position::offset(0u64),
Position::offset(0u64),
Position::eof(0u64),
)
.await;

fetch_task_handle.await.unwrap();
check_fetch_task_signals_eof(
1,
Position::offset(0u64),
Position::offset(42u64),
Position::eof(42u64),
)
.await;
}

#[tokio::test]
Expand Down
48 changes: 37 additions & 11 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,13 +1024,17 @@ impl Ingester {
let queue_id = subrequest.queue_id();
let truncate_up_to_position_inclusive = subrequest.truncate_up_to_position_inclusive();

if truncate_up_to_position_inclusive.is_eof() {
state_guard.delete_shard(&queue_id, "indexer RPC").await;
} else {
state_guard
.truncate_shard(&queue_id, truncate_up_to_position_inclusive, "indexer RPC")
.await;
}
// We deliberately do NOT delete the shard when the indexer truncates up to EOF over
// this gRPC path. Shard deletion is driven solely by the `ShardPositionsUpdate` gossip
// event (see the `EventSubscriber<ShardPositionsUpdate>` impl below), which is the same
// signal the control plane uses to delete the shard from the metastore and its model.
//
// Handling shard deletion through that single, shared signal keeps the ingester and
// control plane views consistent: the ingester never removes a shard the
// control plane does not also remove.
state_guard
.truncate_shard(&queue_id, truncate_up_to_position_inclusive, "indexer RPC")
.await;
}
let wal_usage = state_guard.mrecordlog.resource_usage();
report_wal_usage(wal_usage);
Expand Down Expand Up @@ -1545,7 +1549,14 @@ mod tests {
.await;

let state_guard = ingester.state.lock_fully("test").await.unwrap();
assert_eq!(state_guard.shards.len(), 1);
assert_eq!(state_guard.shards.len(), 3);

let solo_shard_01 = state_guard.shards.get(&queue_id_01).unwrap();
solo_shard_01.assert_is_solo();
solo_shard_01.assert_is_closed();
solo_shard_01.assert_replication_position(Position::Beginning);
solo_shard_01.assert_truncation_position(Position::Beginning);
assert!(solo_shard_01.is_advertisable);

let solo_shard_02 = state_guard.shards.get(&queue_id_02).unwrap();
solo_shard_02.assert_is_solo();
Expand All @@ -1554,6 +1565,13 @@ mod tests {
solo_shard_02.assert_truncation_position(Position::offset(0u64));
assert!(solo_shard_02.is_advertisable);

let solo_shard_03 = state_guard.shards.get(&queue_id_03).unwrap();
solo_shard_03.assert_is_solo();
solo_shard_03.assert_is_closed();
solo_shard_03.assert_replication_position(Position::Beginning);
solo_shard_03.assert_truncation_position(Position::Beginning);
assert!(solo_shard_03.is_advertisable);

state_guard
.mrecordlog
.assert_records_eq(&queue_id_02, .., &[(1, [0, 0], "test-doc-bar")]);
Expand Down Expand Up @@ -3260,16 +3278,24 @@ mod tests {
.unwrap();

let state_guard = ingester.state.lock_fully("test").await.unwrap();

assert_eq!(state_guard.shards.len(), 1);
assert_eq!(state_guard.doc_mappers.len(), 1);
assert_eq!(state_guard.shards.len(), 2);
assert_eq!(state_guard.doc_mappers.len(), 2);

assert!(state_guard.shards.contains_key(&queue_id_01));
assert!(state_guard.shards.contains_key(&queue_id_02));
assert!(state_guard.doc_mappers.contains_key(&doc_mapping_uid_01));
assert!(state_guard.doc_mappers.contains_key(&doc_mapping_uid_02));

let solo_shard_02 = state_guard.shards.get(&queue_id_02).unwrap();
solo_shard_02.assert_truncation_position(Position::eof(0u64));

state_guard
.mrecordlog
.assert_records_eq(&queue_id_01, .., &[(1, [0, 0], "test-doc-bar")]);

state_guard
.mrecordlog
.assert_records_eq(&queue_id_02, .., &[]);
}

#[tokio::test]
Expand Down
13 changes: 1 addition & 12 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::ops::RangeInclusive;
use bytesize::ByteSize;
#[cfg(feature = "failpoints")]
use fail::fail_point;
use mrecordlog::error::{AppendError, DeleteQueueError};
use mrecordlog::error::AppendError;
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::types::{Position, QueueId};
use tracing::instrument;
Expand Down Expand Up @@ -150,17 +150,6 @@ pub(super) fn check_enough_capacity(
Ok(())
}

/// Deletes a queue from the WAL. Returns without error if the queue does not exist.
pub async fn force_delete_queue(
mrecordlog: &mut MultiRecordLogAsync,
queue_id: &QueueId,
) -> io::Result<()> {
match mrecordlog.delete_queue(queue_id).await {
Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => Ok(()),
Err(DeleteQueueError::IoError(error)) => Err(error),
}
}

/// Returns the first and last position of the records currently stored in the queue. Returns `None`
/// if the queue does not exist or is empty.
pub(super) fn queue_position_range(
Expand Down
93 changes: 50 additions & 43 deletions quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::models::IngesterShard;
use super::rate_meter::RateMeter;
use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle};
use super::wal_capacity_tracker::WalCapacityTracker;
use crate::ingest_v2::mrecordlog_utils::{force_delete_queue, queue_position_range};
use crate::ingest_v2::mrecordlog_utils::queue_position_range;
use crate::mrecordlog_async::MultiRecordLogAsync;
use crate::{FollowerId, LeaderId, OpenShardCounts};

Expand Down Expand Up @@ -209,8 +209,7 @@ impl IngesterState {
}

/// Initializes the internal state of the ingester. It loads the local WAL, then lists all its
/// queues. Empty queues are deleted, while non-empty queues are recovered. However, the
/// corresponding shards are closed and become read-only.
/// queues. Every queue is recovered as a closed shard, including empty ones.
pub async fn init(&self, wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings) {
// Acquire locks in the same order as `lock_fully` (mrecordlog first, then inner) to
// prevent ABBA deadlocks with the broadcast capacity task.
Expand All @@ -230,7 +229,7 @@ impl IngesterState {
)
.await;

let mut mrecordlog = match open_result {
let mrecordlog = match open_result {
Ok(mrecordlog) => {
info!(
"opened WAL successfully in {}",
Expand All @@ -254,52 +253,60 @@ impl IngesterState {
}
let now = Instant::now();
let mut num_closed_shards = 0;
let mut num_deleted_shards = 0;

for queue_id in queue_ids {
if let Some(position_range) = queue_position_range(&mrecordlog, &queue_id) {
let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else {
// `split_queue_id` already logs an error.
continue;
};
// The queue is not empty: recover it.
let replication_position_inclusive = Position::offset(*position_range.end());
let truncation_position_inclusive = if *position_range.start() == 0 {
Position::Beginning
} else {
Position::offset(*position_range.start() - 1)
let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else {
// `split_queue_id` already logs an error.
continue;
};
// We recover every shard found in the WAL as a closed shard, including empty ones.
//
// We used to delete empty shards here, but that silently diverged from the control
// plane, which kept advertising the shard as available even though it no longer
// existed on the ingester (resulting in "no shards available" errors). Instead, we
// recover an empty shard as a closed shard positioned at the beginning. An indexer
// will drain it, immediately reach EOF (there is nothing to read), and the resulting
// EOF gossip will delete the shard from the ingester, the control plane, and the
// metastore.
let (replication_position_inclusive, truncation_position_inclusive) =
match queue_position_range(&mrecordlog, &queue_id) {
// The queue is not empty.
Some(position_range) => {
let replication_position_inclusive =
Position::offset(*position_range.end());
let truncation_position_inclusive = if *position_range.start() == 0 {
Position::Beginning
} else {
Position::offset(*position_range.start() - 1)
};
(
replication_position_inclusive,
truncation_position_inclusive,
)
}
// The queue is empty.
None => (Position::Beginning, Position::Beginning),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline: this isn't ideal. We'll explore if this is something we can get from the mrecordlog.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Now relying on the queues summary API.

};
let rate_limiter = RateLimiter::from_settings(rate_limiter_settings);
let rate_meter = RateMeter::default();
// We want to advertise the shard as read-only right away.
let solo_shard =
IngesterShard::new_solo(index_uid.clone(), source_id.clone(), shard_id.clone())
.with_state(ShardState::Closed)
.with_replication_position_inclusive(replication_position_inclusive)
.with_truncation_position_inclusive(truncation_position_inclusive)
.with_rate_limiter(rate_limiter)
.with_rate_meter(rate_meter)
.with_last_write(now)
.advertisable() // We want to advertise the shard as read-only right away.
.build();
inner_guard.shards.insert(queue_id.clone(), solo_shard);

num_closed_shards += 1;
} else {
// The queue is empty: delete it.
if let Err(io_error) = force_delete_queue(&mut mrecordlog, &queue_id).await {
error!("failed to delete shard `{queue_id}`: {io_error}");
continue;
}
num_deleted_shards += 1;
}
let rate_limiter = RateLimiter::from_settings(rate_limiter_settings);
let rate_meter = RateMeter::default();

let solo_shard =
IngesterShard::new_solo(index_uid.clone(), source_id.clone(), shard_id.clone())
.with_state(ShardState::Closed)
.with_replication_position_inclusive(replication_position_inclusive)
.with_truncation_position_inclusive(truncation_position_inclusive)
.with_rate_limiter(rate_limiter)
.with_rate_meter(rate_meter)
.with_last_write(now)
.advertisable() // We want to advertise the shard as read-only right away.
.build();
inner_guard.shards.insert(queue_id.clone(), solo_shard);

num_closed_shards += 1;
}
if num_closed_shards > 0 {
info!("recovered and closed {num_closed_shards} shard(s)");
}
if num_deleted_shards > 0 {
info!("deleted {num_deleted_shards} empty shard(s)");
}
mrecordlog_guard.replace(mrecordlog);
inner_guard.set_status(IngesterStatus::Ready).await;
}
Expand Down
Loading