From 5e6c7026730cb6295e48c2e258a9a2c5d980c47f Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Tue, 9 Jun 2026 13:45:11 -0400 Subject: [PATCH 1/2] Delete shards only when notified via gossip --- .../quickwit-ingest/src/ingest_v2/fetch.rs | 125 +++++++++--------- .../quickwit-ingest/src/ingest_v2/ingester.rs | 48 +++++-- .../src/ingest_v2/mrecordlog_utils.rs | 13 +- .../quickwit-ingest/src/ingest_v2/state.rs | 93 +++++++------ 4 files changed, 150 insertions(+), 129 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 20d78b646b2..35f71b240bc 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -847,8 +847,15 @@ pub(super) mod tests { fetch_task_handle.await.unwrap(); } - #[tokio::test] - async fn test_fetch_task_signals_eof() { + /// Spawns a fetch task against a closed shard whose queue holds `num_records` records and whose + /// replication position is `replication_position_inclusive`, then asserts that fetching from + /// `from_position_exclusive` immediately signals EOF at `expected_eof_position`. + async fn check_fetch_task_signals_eof( + num_records: usize, + replication_position_inclusive: Position, + from_position_exclusive: Position, + expected_eof_position: Position, + ) { let tempdir = tempfile::tempdir().unwrap(); let mrecordlog = Arc::new(RwLock::new(Some( MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), @@ -868,26 +875,25 @@ pub(super) mod tests { .await .unwrap(); - mrecordlog_guard - .as_mut() - .unwrap() - .append_records( - &queue_id, - None, - std::iter::once(MRecord::new_doc("test-doc-foo").encode()), - ) - .await - .unwrap(); + if num_records > 0 { + let records = (0..num_records).map(|_| MRecord::new_doc("test-doc-foo").encode()); + mrecordlog_guard + .as_mut() + .unwrap() + .append_records(&queue_id, None, records) + .await + .unwrap(); + } drop(mrecordlog_guard); let open_fetch_stream_request = OpenFetchStreamRequest { - client_id: client_id.clone(), + client_id, index_uid: Some(index_uid.clone()), source_id: source_id.clone(), shard_id: Some(shard_id.clone()), - from_position_exclusive: Some(Position::offset(0u64)), + from_position_exclusive: Some(from_position_exclusive.clone()), }; - let shard_status = (ShardState::Closed, Position::offset(0u64)); + let shard_status = (ShardState::Closed, replication_position_inclusive); let (_shard_status_tx, shard_status_rx) = watch::channel(shard_status); let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn( @@ -906,63 +912,56 @@ pub(super) mod tests { assert_eq!(fetch_eof.index_uid(), &index_uid); assert_eq!(fetch_eof.source_id, source_id); assert_eq!(fetch_eof.shard_id(), shard_id); - assert_eq!(fetch_eof.eof_position, Some(Position::eof(0u64).as_eof())); + assert_eq!( + fetch_eof.eof_position, + Some(expected_eof_position), + "unexpected EOF position when fetching from `{from_position_exclusive:?}`" + ); fetch_task_handle.await.unwrap(); } #[tokio::test] - async fn test_fetch_task_signals_eof_at_beginning() { - let tempdir = tempfile::tempdir().unwrap(); - let mrecordlog = Arc::new(RwLock::new(Some( - MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), - ))); - let client_id = "test-client".to_string(); - let index_uid: IndexUid = IndexUid::for_test("test-index", 0); - let source_id = "test-source".to_string(); - let shard_id = ShardId::from(1); - let queue_id = queue_id(&index_uid, &source_id, &shard_id); - - let open_fetch_stream_request = OpenFetchStreamRequest { - client_id: client_id.clone(), - index_uid: Some(index_uid.clone()), - source_id: source_id.clone(), - shard_id: Some(shard_id.clone()), - from_position_exclusive: Some(Position::Beginning), - }; - let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); - let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn( - open_fetch_stream_request, - mrecordlog.clone(), - shard_status_rx, - 1024, - ); - let mut mrecordlog_guard = mrecordlog.write().await; - - mrecordlog_guard - .as_mut() - .unwrap() - .create_queue(&queue_id) - .await - .unwrap(); - drop(mrecordlog_guard); + async fn test_fetch_task_signals_eof() { + check_fetch_task_signals_eof( + 0, + Position::Beginning, + Position::Beginning, + Position::Beginning.as_eof(), + ) + .await; - let shard_status = (ShardState::Closed, Position::Beginning); - shard_status_tx.send(shard_status).unwrap(); + check_fetch_task_signals_eof( + 0, + Position::Beginning, + Position::offset(0u64), + Position::eof(0u64), + ) + .await; - let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next()) - .await - .unwrap() - .unwrap() - .unwrap(); - let fetch_eof = into_fetch_eof(fetch_message); + check_fetch_task_signals_eof( + 0, + Position::Beginning, + Position::offset(42u64), + Position::eof(42u64), + ) + .await; - assert_eq!(fetch_eof.index_uid(), &index_uid); - assert_eq!(fetch_eof.source_id, source_id); - assert_eq!(fetch_eof.shard_id(), shard_id); - assert_eq!(fetch_eof.eof_position, Some(Position::Beginning.as_eof())); + check_fetch_task_signals_eof( + 1, + Position::offset(0u64), + Position::offset(0u64), + Position::eof(0u64), + ) + .await; - fetch_task_handle.await.unwrap(); + check_fetch_task_signals_eof( + 1, + Position::offset(0u64), + Position::offset(42u64), + Position::eof(42u64), + ) + .await; } #[tokio::test] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index b8ba3b36c1e..b6c9568cb5c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -1024,13 +1024,17 @@ impl Ingester { let queue_id = subrequest.queue_id(); let truncate_up_to_position_inclusive = subrequest.truncate_up_to_position_inclusive(); - if truncate_up_to_position_inclusive.is_eof() { - state_guard.delete_shard(&queue_id, "indexer RPC").await; - } else { - state_guard - .truncate_shard(&queue_id, truncate_up_to_position_inclusive, "indexer RPC") - .await; - } + // We deliberately do NOT delete the shard when the indexer truncates up to EOF over + // this gRPC path. Shard deletion is driven solely by the `ShardPositionsUpdate` gossip + // event (see the `EventSubscriber` impl below), which is the same + // signal the control plane uses to delete the shard from the metastore and its model. + // + // Handling shard deletion through that single, shared signal keeps the ingester and + // control plane views consistent: the ingester never removes a shard the + // control plane does not also remove. + state_guard + .truncate_shard(&queue_id, truncate_up_to_position_inclusive, "indexer RPC") + .await; } let wal_usage = state_guard.mrecordlog.resource_usage(); report_wal_usage(wal_usage); @@ -1545,7 +1549,14 @@ mod tests { .await; let state_guard = ingester.state.lock_fully("test").await.unwrap(); - assert_eq!(state_guard.shards.len(), 1); + assert_eq!(state_guard.shards.len(), 3); + + let solo_shard_01 = state_guard.shards.get(&queue_id_01).unwrap(); + solo_shard_01.assert_is_solo(); + solo_shard_01.assert_is_closed(); + solo_shard_01.assert_replication_position(Position::Beginning); + solo_shard_01.assert_truncation_position(Position::Beginning); + assert!(solo_shard_01.is_advertisable); let solo_shard_02 = state_guard.shards.get(&queue_id_02).unwrap(); solo_shard_02.assert_is_solo(); @@ -1554,6 +1565,13 @@ mod tests { solo_shard_02.assert_truncation_position(Position::offset(0u64)); assert!(solo_shard_02.is_advertisable); + let solo_shard_03 = state_guard.shards.get(&queue_id_03).unwrap(); + solo_shard_03.assert_is_solo(); + solo_shard_03.assert_is_closed(); + solo_shard_03.assert_replication_position(Position::Beginning); + solo_shard_03.assert_truncation_position(Position::Beginning); + assert!(solo_shard_03.is_advertisable); + state_guard .mrecordlog .assert_records_eq(&queue_id_02, .., &[(1, [0, 0], "test-doc-bar")]); @@ -3260,16 +3278,24 @@ mod tests { .unwrap(); let state_guard = ingester.state.lock_fully("test").await.unwrap(); - - assert_eq!(state_guard.shards.len(), 1); - assert_eq!(state_guard.doc_mappers.len(), 1); + assert_eq!(state_guard.shards.len(), 2); + assert_eq!(state_guard.doc_mappers.len(), 2); assert!(state_guard.shards.contains_key(&queue_id_01)); + assert!(state_guard.shards.contains_key(&queue_id_02)); assert!(state_guard.doc_mappers.contains_key(&doc_mapping_uid_01)); + assert!(state_guard.doc_mappers.contains_key(&doc_mapping_uid_02)); + + let solo_shard_02 = state_guard.shards.get(&queue_id_02).unwrap(); + solo_shard_02.assert_truncation_position(Position::eof(0u64)); state_guard .mrecordlog .assert_records_eq(&queue_id_01, .., &[(1, [0, 0], "test-doc-bar")]); + + state_guard + .mrecordlog + .assert_records_eq(&queue_id_02, .., &[]); } #[tokio::test] diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index 8ce2dac0312..012b0bc9c4d 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -19,7 +19,7 @@ use std::ops::RangeInclusive; use bytesize::ByteSize; #[cfg(feature = "failpoints")] use fail::fail_point; -use mrecordlog::error::{AppendError, DeleteQueueError}; +use mrecordlog::error::AppendError; use quickwit_proto::ingest::DocBatchV2; use quickwit_proto::types::{Position, QueueId}; use tracing::instrument; @@ -150,17 +150,6 @@ pub(super) fn check_enough_capacity( Ok(()) } -/// Deletes a queue from the WAL. Returns without error if the queue does not exist. -pub async fn force_delete_queue( - mrecordlog: &mut MultiRecordLogAsync, - queue_id: &QueueId, -) -> io::Result<()> { - match mrecordlog.delete_queue(queue_id).await { - Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => Ok(()), - Err(DeleteQueueError::IoError(error)) => Err(error), - } -} - /// Returns the first and last position of the records currently stored in the queue. Returns `None` /// if the queue does not exist or is empty. pub(super) fn queue_position_range( diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index eea03a29e0b..0e19ec0916b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -39,7 +39,7 @@ use super::models::IngesterShard; use super::rate_meter::RateMeter; use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle}; use super::wal_capacity_tracker::WalCapacityTracker; -use crate::ingest_v2::mrecordlog_utils::{force_delete_queue, queue_position_range}; +use crate::ingest_v2::mrecordlog_utils::queue_position_range; use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{FollowerId, LeaderId, OpenShardCounts}; @@ -209,8 +209,7 @@ impl IngesterState { } /// Initializes the internal state of the ingester. It loads the local WAL, then lists all its - /// queues. Empty queues are deleted, while non-empty queues are recovered. However, the - /// corresponding shards are closed and become read-only. + /// queues. Every queue is recovered as a closed shard, including empty ones. pub async fn init(&self, wal_dir_path: &Path, rate_limiter_settings: RateLimiterSettings) { // Acquire locks in the same order as `lock_fully` (mrecordlog first, then inner) to // prevent ABBA deadlocks with the broadcast capacity task. @@ -230,7 +229,7 @@ impl IngesterState { ) .await; - let mut mrecordlog = match open_result { + let mrecordlog = match open_result { Ok(mrecordlog) => { info!( "opened WAL successfully in {}", @@ -254,52 +253,60 @@ impl IngesterState { } let now = Instant::now(); let mut num_closed_shards = 0; - let mut num_deleted_shards = 0; for queue_id in queue_ids { - if let Some(position_range) = queue_position_range(&mrecordlog, &queue_id) { - let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else { - // `split_queue_id` already logs an error. - continue; - }; - // The queue is not empty: recover it. - let replication_position_inclusive = Position::offset(*position_range.end()); - let truncation_position_inclusive = if *position_range.start() == 0 { - Position::Beginning - } else { - Position::offset(*position_range.start() - 1) + let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else { + // `split_queue_id` already logs an error. + continue; + }; + // We recover every shard found in the WAL as a closed shard, including empty ones. + // + // We used to delete empty shards here, but that silently diverged from the control + // plane, which kept advertising the shard as available even though it no longer + // existed on the ingester (resulting in "no shards available" errors). Instead, we + // recover an empty shard as a closed shard positioned at the beginning. An indexer + // will drain it, immediately reach EOF (there is nothing to read), and the resulting + // EOF gossip will delete the shard from the ingester, the control plane, and the + // metastore. + let (replication_position_inclusive, truncation_position_inclusive) = + match queue_position_range(&mrecordlog, &queue_id) { + // The queue is not empty. + Some(position_range) => { + let replication_position_inclusive = + Position::offset(*position_range.end()); + let truncation_position_inclusive = if *position_range.start() == 0 { + Position::Beginning + } else { + Position::offset(*position_range.start() - 1) + }; + ( + replication_position_inclusive, + truncation_position_inclusive, + ) + } + // The queue is empty. + None => (Position::Beginning, Position::Beginning), }; - let rate_limiter = RateLimiter::from_settings(rate_limiter_settings); - let rate_meter = RateMeter::default(); - // We want to advertise the shard as read-only right away. - let solo_shard = - IngesterShard::new_solo(index_uid.clone(), source_id.clone(), shard_id.clone()) - .with_state(ShardState::Closed) - .with_replication_position_inclusive(replication_position_inclusive) - .with_truncation_position_inclusive(truncation_position_inclusive) - .with_rate_limiter(rate_limiter) - .with_rate_meter(rate_meter) - .with_last_write(now) - .advertisable() // We want to advertise the shard as read-only right away. - .build(); - inner_guard.shards.insert(queue_id.clone(), solo_shard); - - num_closed_shards += 1; - } else { - // The queue is empty: delete it. - if let Err(io_error) = force_delete_queue(&mut mrecordlog, &queue_id).await { - error!("failed to delete shard `{queue_id}`: {io_error}"); - continue; - } - num_deleted_shards += 1; - } + let rate_limiter = RateLimiter::from_settings(rate_limiter_settings); + let rate_meter = RateMeter::default(); + + let solo_shard = + IngesterShard::new_solo(index_uid.clone(), source_id.clone(), shard_id.clone()) + .with_state(ShardState::Closed) + .with_replication_position_inclusive(replication_position_inclusive) + .with_truncation_position_inclusive(truncation_position_inclusive) + .with_rate_limiter(rate_limiter) + .with_rate_meter(rate_meter) + .with_last_write(now) + .advertisable() // We want to advertise the shard as read-only right away. + .build(); + inner_guard.shards.insert(queue_id.clone(), solo_shard); + + num_closed_shards += 1; } if num_closed_shards > 0 { info!("recovered and closed {num_closed_shards} shard(s)"); } - if num_deleted_shards > 0 { - info!("deleted {num_deleted_shards} empty shard(s)"); - } mrecordlog_guard.replace(mrecordlog); inner_guard.set_status(IngesterStatus::Ready).await; } From 3ccfff9f5082077da16f08bf1d7537b0083b2b55 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Thu, 11 Jun 2026 18:35:06 -0400 Subject: [PATCH 2/2] Use queues summary to recover shards --- .../src/ingest_v2/mrecordlog_utils.rs | 50 ------- .../quickwit-ingest/src/ingest_v2/state.rs | 131 +++++++++++++----- .../quickwit-ingest/src/mrecordlog_async.rs | 4 - 3 files changed, 98 insertions(+), 87 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs index 012b0bc9c4d..c4852e99379 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs @@ -14,7 +14,6 @@ use std::io; use std::iter::once; -use std::ops::RangeInclusive; use bytesize::ByteSize; #[cfg(feature = "failpoints")] @@ -150,26 +149,6 @@ pub(super) fn check_enough_capacity( Ok(()) } -/// Returns the first and last position of the records currently stored in the queue. Returns `None` -/// if the queue does not exist or is empty. -pub(super) fn queue_position_range( - mrecordlog: &MultiRecordLogAsync, - queue_id: &QueueId, -) -> Option> { - let first_position = mrecordlog - .range(queue_id, ..) - .ok()? - .next() - .map(|record| record.position)?; - - let last_position = mrecordlog - .last_record(queue_id) - .ok()? - .map(|record| record.position)?; - - Some(first_position..=last_position) -} - #[cfg(test)] mod tests { use super::*; @@ -256,33 +235,4 @@ mod tests { check_enough_capacity(&mrecordlog, ByteSize::mb(256), ByteSize(12), ByteSize(12)).unwrap(); } - - #[tokio::test] - async fn test_append_queue_position_range() { - let tempdir = tempfile::tempdir().unwrap(); - let mut mrecordlog = MultiRecordLogAsync::open(tempdir.path()).await.unwrap(); - - assert!(queue_position_range(&mrecordlog, &"queue-not-found".to_string()).is_none()); - - mrecordlog.create_queue("test-queue").await.unwrap(); - assert!(queue_position_range(&mrecordlog, &"test-queue".to_string()).is_none()); - - mrecordlog - .append_records("test-queue", None, std::iter::once(&b"test-doc-foo"[..])) - .await - .unwrap(); - let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap(); - assert_eq!(position_range, 0..=0); - - mrecordlog - .append_records("test-queue", None, std::iter::once(&b"test-doc-bar"[..])) - .await - .unwrap(); - let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap(); - assert_eq!(position_range, 0..=1); - - mrecordlog.truncate("test-queue", 0).await.unwrap(); - let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap(); - assert_eq!(position_range, 1..=1); - } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/state.rs b/quickwit/quickwit-ingest/src/ingest_v2/state.rs index 0e19ec0916b..0dd5e4183af 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/state.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/state.rs @@ -39,7 +39,6 @@ use super::models::IngesterShard; use super::rate_meter::RateMeter; use super::replication::{ReplicationStreamTaskHandle, ReplicationTaskHandle}; use super::wal_capacity_tracker::WalCapacityTracker; -use crate::ingest_v2::mrecordlog_utils::queue_position_range; use crate::mrecordlog_async::MultiRecordLogAsync; use crate::{FollowerId, LeaderId, OpenShardCounts}; @@ -243,18 +242,15 @@ impl IngesterState { return; } }; - let queue_ids: Vec = mrecordlog - .list_queues() - .map(|queue_id| queue_id.to_string()) - .collect(); + let queues_summary = mrecordlog.summary(); - if !queue_ids.is_empty() { - info!("recovering {} shard(s)", queue_ids.len()); + if !queues_summary.queues.is_empty() { + info!("recovering {} shard(s)", queues_summary.queues.len()); } let now = Instant::now(); let mut num_closed_shards = 0; - for queue_id in queue_ids { + for (queue_id, queue_summary) in queues_summary.queues { let Some((index_uid, source_id, shard_id)) = split_queue_id(&queue_id) else { // `split_queue_id` already logs an error. continue; @@ -269,23 +265,20 @@ impl IngesterState { // EOF gossip will delete the shard from the ingester, the control plane, and the // metastore. let (replication_position_inclusive, truncation_position_inclusive) = - match queue_position_range(&mrecordlog, &queue_id) { - // The queue is not empty. - Some(position_range) => { - let replication_position_inclusive = - Position::offset(*position_range.end()); - let truncation_position_inclusive = if *position_range.start() == 0 { - Position::Beginning - } else { - Position::offset(*position_range.start() - 1) - }; - ( - replication_position_inclusive, - truncation_position_inclusive, - ) - } - // The queue is empty. - None => (Position::Beginning, Position::Beginning), + if let Some(end) = queue_summary.end { + let replication_position_inclusive = Position::offset(end); + let truncation_position_inclusive = if queue_summary.start == 0 { + Position::Beginning + } else { + Position::offset(queue_summary.start - 1) + }; + ( + replication_position_inclusive, + truncation_position_inclusive, + ) + } else { + // The queue was created but never written to. + (Position::Beginning, Position::Beginning) }; let rate_limiter = RateLimiter::from_settings(rate_limiter_settings); let rate_meter = RateMeter::default(); @@ -658,7 +651,7 @@ mod tests { use bytesize::ByteSize; use quickwit_cluster::{ChannelTransport, create_cluster_for_test}; use quickwit_config::service::QuickwitService; - use quickwit_proto::types::{NodeId, ShardId, SourceId}; + use quickwit_proto::types::{NodeId, ShardId, SourceId, queue_id}; use tokio::time::timeout; use super::*; @@ -711,23 +704,95 @@ mod tests { #[tokio::test] async fn test_ingester_state_init() { - let cluster = test_cluster().await; - let mut state = IngesterState::create(cluster, ByteSize::mb(256), ByteSize::mb(256)).await; + let index_uid = IndexUid::for_test("test-index", 0); + let source_id = SourceId::from("test-source"); + + // Queue with live records, partially truncated. + let queue_id_01 = queue_id(&index_uid, &source_id, &ShardId::from(1)); + // Queue written to and then fully truncated: empty, but it remembers its position. + let queue_id_02 = queue_id(&index_uid, &source_id, &ShardId::from(2)); + // Queue created but never written to. + let queue_id_03 = queue_id(&index_uid, &source_id, &ShardId::from(3)); + let temp_dir = tempfile::tempdir().unwrap(); + // Populate a WAL then close it, so `init` reopens it from disk. + { + let mut mrecordlog = MultiRecordLogAsync::open(temp_dir.path()).await.unwrap(); + + mrecordlog.create_queue(&queue_id_01).await.unwrap(); + mrecordlog + .append_records( + &queue_id_01, + None, + [ + &b"test-doc-foo"[..], + &b"test-doc-bar"[..], + &b"test-doc-qux"[..], + ] + .into_iter(), + ) + .await + .unwrap(); + // Records 0..=2 remain; truncate record 0 so `start` advances to 1. + mrecordlog.truncate(&queue_id_01, 0).await.unwrap(); + + mrecordlog.create_queue(&queue_id_02).await.unwrap(); + mrecordlog + .append_records( + &queue_id_02, + None, + [&b"test-doc-foo"[..], &b"test-doc-bar"[..]].into_iter(), + ) + .await + .unwrap(); + // Truncate everything: the queue is now empty but remembers position 1. + mrecordlog.truncate(&queue_id_02, 1).await.unwrap(); + + mrecordlog.create_queue(&queue_id_03).await.unwrap(); + } + let cluster = test_cluster().await; + let mut state = IngesterState::create(cluster, ByteSize::mb(256), ByteSize::mb(256)).await; state .init(temp_dir.path(), RateLimiterSettings::default()) .await; - timeout(Duration::from_millis(100), state.wait_for_ready()) .await .unwrap(); - state.lock_partially("test").await.unwrap(); + let state_guard = state.lock_fully("test").await.unwrap(); + assert_eq!(state_guard.status(), IngesterStatus::Ready); + assert_eq!(*state_guard.status_tx.borrow(), IngesterStatus::Ready); + + // Non-empty queue: recovers at its last position, truncated up to the first kept record. + let shard_01 = state_guard.shards.get(&queue_id_01).unwrap(); + assert_eq!(shard_01.shard_state, ShardState::Closed); + assert_eq!( + shard_01.replication_position_inclusive, + Position::offset(2u64) + ); + assert_eq!( + shard_01.truncation_position_inclusive, + Position::offset(0u64) + ); + + // Fully truncated queue: recovers at its last position rather than the beginning. + let shard_02 = state_guard.shards.get(&queue_id_02).unwrap(); + assert_eq!(shard_02.shard_state, ShardState::Closed); + assert_eq!( + shard_02.replication_position_inclusive, + Position::offset(1u64) + ); + assert_eq!( + shard_02.truncation_position_inclusive, + Position::offset(1u64) + ); - let locked_state = state.lock_fully("test").await.unwrap(); - assert_eq!(locked_state.status(), IngesterStatus::Ready); - assert_eq!(*locked_state.status_tx.borrow(), IngesterStatus::Ready); + // Never-written queue: recovers at the beginning. + let shard_03 = state_guard.shards.get(&queue_id_03).unwrap(); + assert_eq!(shard_03.shard_state, ShardState::Closed); + assert_eq!(shard_03.replication_position_inclusive, Position::Beginning); + assert_eq!(shard_03.truncation_position_inclusive, Position::Beginning); } fn insert_shard_with_used_capacity( diff --git a/quickwit/quickwit-ingest/src/mrecordlog_async.rs b/quickwit/quickwit-ingest/src/mrecordlog_async.rs index 25f48128127..4fc0f8aea04 100644 --- a/quickwit/quickwit-ingest/src/mrecordlog_async.rs +++ b/quickwit/quickwit-ingest/src/mrecordlog_async.rs @@ -184,10 +184,6 @@ impl MultiRecordLogAsync { self.mrecordlog_ref().list_queues() } - pub fn last_record(&self, queue: &str) -> Result>, MissingQueue> { - self.mrecordlog_ref().last_record(queue) - } - pub fn resource_usage(&self) -> ResourceUsage { self.mrecordlog_ref().resource_usage() }