Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 62 additions & 63 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,15 @@ pub(super) mod tests {
fetch_task_handle.await.unwrap();
}

#[tokio::test]
async fn test_fetch_task_signals_eof() {
/// Spawns a fetch task against a closed shard whose queue holds `num_records` records and whose
/// replication position is `replication_position_inclusive`, then asserts that fetching from
/// `from_position_exclusive` immediately signals EOF at `expected_eof_position`.
async fn check_fetch_task_signals_eof(
num_records: usize,
replication_position_inclusive: Position,
from_position_exclusive: Position,
expected_eof_position: Position,
) {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
Expand All @@ -868,26 +875,25 @@ pub(super) mod tests {
.await
.unwrap();

mrecordlog_guard
.as_mut()
.unwrap()
.append_records(
&queue_id,
None,
std::iter::once(MRecord::new_doc("test-doc-foo").encode()),
)
.await
.unwrap();
if num_records > 0 {
let records = (0..num_records).map(|_| MRecord::new_doc("test-doc-foo").encode());
mrecordlog_guard
.as_mut()
.unwrap()
.append_records(&queue_id, None, records)
.await
.unwrap();
}
drop(mrecordlog_guard);

let open_fetch_stream_request = OpenFetchStreamRequest {
client_id: client_id.clone(),
client_id,
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
from_position_exclusive: Some(Position::offset(0u64)),
from_position_exclusive: Some(from_position_exclusive.clone()),
};
let shard_status = (ShardState::Closed, Position::offset(0u64));
let shard_status = (ShardState::Closed, replication_position_inclusive);
let (_shard_status_tx, shard_status_rx) = watch::channel(shard_status);

let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
Expand All @@ -906,63 +912,56 @@ pub(super) mod tests {
assert_eq!(fetch_eof.index_uid(), &index_uid);
assert_eq!(fetch_eof.source_id, source_id);
assert_eq!(fetch_eof.shard_id(), shard_id);
assert_eq!(fetch_eof.eof_position, Some(Position::eof(0u64).as_eof()));
assert_eq!(
fetch_eof.eof_position,
Some(expected_eof_position),
"unexpected EOF position when fetching from `{from_position_exclusive:?}`"
);

fetch_task_handle.await.unwrap();
}

#[tokio::test]
async fn test_fetch_task_signals_eof_at_beginning() {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = Arc::new(RwLock::new(Some(
MultiRecordLogAsync::open(tempdir.path()).await.unwrap(),
)));
let client_id = "test-client".to_string();
let index_uid: IndexUid = IndexUid::for_test("test-index", 0);
let source_id = "test-source".to_string();
let shard_id = ShardId::from(1);
let queue_id = queue_id(&index_uid, &source_id, &shard_id);

let open_fetch_stream_request = OpenFetchStreamRequest {
client_id: client_id.clone(),
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
shard_id: Some(shard_id.clone()),
from_position_exclusive: Some(Position::Beginning),
};
let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default());
let (mut fetch_stream, fetch_task_handle) = FetchStreamTask::spawn(
open_fetch_stream_request,
mrecordlog.clone(),
shard_status_rx,
1024,
);
let mut mrecordlog_guard = mrecordlog.write().await;

mrecordlog_guard
.as_mut()
.unwrap()
.create_queue(&queue_id)
.await
.unwrap();
drop(mrecordlog_guard);
async fn test_fetch_task_signals_eof() {
check_fetch_task_signals_eof(
0,
Position::Beginning,
Position::Beginning,
Position::Beginning.as_eof(),
)
.await;

let shard_status = (ShardState::Closed, Position::Beginning);
shard_status_tx.send(shard_status).unwrap();
check_fetch_task_signals_eof(
0,
Position::Beginning,
Position::offset(0u64),
Position::eof(0u64),
)
.await;

let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
let fetch_eof = into_fetch_eof(fetch_message);
check_fetch_task_signals_eof(
0,
Position::Beginning,
Position::offset(42u64),
Position::eof(42u64),
)
.await;

assert_eq!(fetch_eof.index_uid(), &index_uid);
assert_eq!(fetch_eof.source_id, source_id);
assert_eq!(fetch_eof.shard_id(), shard_id);
assert_eq!(fetch_eof.eof_position, Some(Position::Beginning.as_eof()));
check_fetch_task_signals_eof(
1,
Position::offset(0u64),
Position::offset(0u64),
Position::eof(0u64),
)
.await;

fetch_task_handle.await.unwrap();
check_fetch_task_signals_eof(
1,
Position::offset(0u64),
Position::offset(42u64),
Position::eof(42u64),
)
.await;
}

#[tokio::test]
Expand Down
48 changes: 37 additions & 11 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,13 +1024,17 @@ impl Ingester {
let queue_id = subrequest.queue_id();
let truncate_up_to_position_inclusive = subrequest.truncate_up_to_position_inclusive();

if truncate_up_to_position_inclusive.is_eof() {
state_guard.delete_shard(&queue_id, "indexer RPC").await;
} else {
state_guard
.truncate_shard(&queue_id, truncate_up_to_position_inclusive, "indexer RPC")
.await;
}
// We deliberately do NOT delete the shard when the indexer truncates up to EOF over
// this gRPC path. Shard deletion is driven solely by the `ShardPositionsUpdate` gossip
// event (see the `EventSubscriber<ShardPositionsUpdate>` impl below), which is the same
// signal the control plane uses to delete the shard from the metastore and its model.
//
// Handling shard deletion through that single, shared signal keeps the ingester and
// control plane views consistent: the ingester never removes a shard the
// control plane does not also remove.
state_guard
.truncate_shard(&queue_id, truncate_up_to_position_inclusive, "indexer RPC")
.await;
}
let wal_usage = state_guard.mrecordlog.resource_usage();
report_wal_usage(wal_usage);
Expand Down Expand Up @@ -1545,7 +1549,14 @@ mod tests {
.await;

let state_guard = ingester.state.lock_fully("test").await.unwrap();
assert_eq!(state_guard.shards.len(), 1);
assert_eq!(state_guard.shards.len(), 3);

let solo_shard_01 = state_guard.shards.get(&queue_id_01).unwrap();
solo_shard_01.assert_is_solo();
solo_shard_01.assert_is_closed();
solo_shard_01.assert_replication_position(Position::offset(0u64));
solo_shard_01.assert_truncation_position(Position::offset(0u64));
assert!(solo_shard_01.is_advertisable);

let solo_shard_02 = state_guard.shards.get(&queue_id_02).unwrap();
solo_shard_02.assert_is_solo();
Expand All @@ -1554,6 +1565,13 @@ mod tests {
solo_shard_02.assert_truncation_position(Position::offset(0u64));
assert!(solo_shard_02.is_advertisable);

let solo_shard_03 = state_guard.shards.get(&queue_id_03).unwrap();
solo_shard_03.assert_is_solo();
solo_shard_03.assert_is_closed();
solo_shard_03.assert_replication_position(Position::Beginning);
solo_shard_03.assert_truncation_position(Position::Beginning);
assert!(solo_shard_03.is_advertisable);

state_guard
.mrecordlog
.assert_records_eq(&queue_id_02, .., &[(1, [0, 0], "test-doc-bar")]);
Expand Down Expand Up @@ -3260,16 +3278,24 @@ mod tests {
.unwrap();

let state_guard = ingester.state.lock_fully("test").await.unwrap();

assert_eq!(state_guard.shards.len(), 1);
assert_eq!(state_guard.doc_mappers.len(), 1);
assert_eq!(state_guard.shards.len(), 2);
assert_eq!(state_guard.doc_mappers.len(), 2);

assert!(state_guard.shards.contains_key(&queue_id_01));
assert!(state_guard.shards.contains_key(&queue_id_02));
assert!(state_guard.doc_mappers.contains_key(&doc_mapping_uid_01));
assert!(state_guard.doc_mappers.contains_key(&doc_mapping_uid_02));

let solo_shard_02 = state_guard.shards.get(&queue_id_02).unwrap();
solo_shard_02.assert_truncation_position(Position::eof(0u64));

state_guard
.mrecordlog
.assert_records_eq(&queue_id_01, .., &[(1, [0, 0], "test-doc-bar")]);

state_guard
.mrecordlog
.assert_records_eq(&queue_id_02, .., &[]);
}

#[tokio::test]
Expand Down
63 changes: 1 addition & 62 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

use std::io;
use std::iter::once;
use std::ops::RangeInclusive;

use bytesize::ByteSize;
#[cfg(feature = "failpoints")]
use fail::fail_point;
use mrecordlog::error::{AppendError, DeleteQueueError};
use mrecordlog::error::AppendError;
use quickwit_proto::ingest::DocBatchV2;
use quickwit_proto::types::{Position, QueueId};
use tracing::instrument;
Expand Down Expand Up @@ -150,37 +149,6 @@ pub(super) fn check_enough_capacity(
Ok(())
}

/// Deletes a queue from the WAL. Returns without error if the queue does not exist.
pub async fn force_delete_queue(
mrecordlog: &mut MultiRecordLogAsync,
queue_id: &QueueId,
) -> io::Result<()> {
match mrecordlog.delete_queue(queue_id).await {
Ok(_) | Err(DeleteQueueError::MissingQueue(_)) => Ok(()),
Err(DeleteQueueError::IoError(error)) => Err(error),
}
}

/// Returns the first and last position of the records currently stored in the queue. Returns `None`
/// if the queue does not exist or is empty.
pub(super) fn queue_position_range(
mrecordlog: &MultiRecordLogAsync,
queue_id: &QueueId,
) -> Option<RangeInclusive<u64>> {
let first_position = mrecordlog
.range(queue_id, ..)
.ok()?
.next()
.map(|record| record.position)?;

let last_position = mrecordlog
.last_record(queue_id)
.ok()?
.map(|record| record.position)?;

Some(first_position..=last_position)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -267,33 +235,4 @@ mod tests {

check_enough_capacity(&mrecordlog, ByteSize::mb(256), ByteSize(12), ByteSize(12)).unwrap();
}

#[tokio::test]
async fn test_append_queue_position_range() {
let tempdir = tempfile::tempdir().unwrap();
let mut mrecordlog = MultiRecordLogAsync::open(tempdir.path()).await.unwrap();

assert!(queue_position_range(&mrecordlog, &"queue-not-found".to_string()).is_none());

mrecordlog.create_queue("test-queue").await.unwrap();
assert!(queue_position_range(&mrecordlog, &"test-queue".to_string()).is_none());

mrecordlog
.append_records("test-queue", None, std::iter::once(&b"test-doc-foo"[..]))
.await
.unwrap();
let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap();
assert_eq!(position_range, 0..=0);

mrecordlog
.append_records("test-queue", None, std::iter::once(&b"test-doc-bar"[..]))
.await
.unwrap();
let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap();
assert_eq!(position_range, 0..=1);

mrecordlog.truncate("test-queue", 0).await.unwrap();
let position_range = queue_position_range(&mrecordlog, &"test-queue".to_string()).unwrap();
assert_eq!(position_range, 1..=1);
}
}
Loading
Loading