diff --git a/external_segment_test.go b/external_segment_test.go index f7feb2b..6a0597d 100644 --- a/external_segment_test.go +++ b/external_segment_test.go @@ -399,6 +399,10 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) { t.Fatalf("Expected 2 initial documents, got %d", initialCount) } + // Create a fresh copy of segment data for this test + segmentBytesCopy := make([]byte, len(segmentBytes)) + copy(segmentBytesCopy, segmentBytes) + // Now stream the duplicate segment receiver3, err := writer3.EnableExternalSegments() if err != nil { @@ -410,7 +414,7 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) { t.Fatal(err) } - err = receiver3.WriteChunk(segmentBytes) + err = receiver3.WriteChunk(segmentBytesCopy) if err != nil { t.Fatal(err) } @@ -445,6 +449,30 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) { if receiver3.Status() != index.StreamIntroduced { t.Errorf("Expected segment to be introduced when deduplication is disabled, got status: %v", receiver3.Status()) } + + // Final verification: query for the specific docIDs to ensure they exist + docIDs := []string{"duplicate_doc_1", "duplicate_doc_2"} + for _, docID := range docIDs { + idQuery := NewTermQuery(docID) + idQuery.SetField("_id") + + req := NewTopNSearch(1, idQuery) + dmi, err := reader3Final.Search(context.Background(), req) + if err != nil { + t.Fatalf("Error searching for docID %s: %v", docID, err) + } + + next, err := dmi.Next() + if err != nil { + t.Fatalf("Error getting next result for docID %s: %v", docID, err) + } + + if next == nil { + t.Errorf("Expected to find document with ID %s, but search returned no results", docID) + } else { + t.Logf("Successfully verified document with ID %s exists", docID) + } + } }) // Step 3b: Test with deduplication ENABLED (should reject duplicates) @@ -495,6 +523,10 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) { t.Fatalf("Expected 2 initial documents, got %d", initialCount) } + // Create a fresh copy of segment data for this test + segmentBytesCopy := make([]byte, len(segmentBytes)) + copy(segmentBytesCopy, segmentBytes) + // Now stream the duplicate segment receiver4, err := writer4.EnableExternalSegments() if err != nil { @@ -506,7 +538,7 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) { t.Fatal(err) } - err = receiver4.WriteChunk(segmentBytes) + err = receiver4.WriteChunk(segmentBytesCopy) if err != nil { t.Fatal(err) } @@ -551,6 +583,303 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) { } t.Logf("Deduplication test passed - document count unchanged indicating duplicates were filtered out") + + // Final verification: query for the specific docIDs to ensure they exist + // When deduplication is enabled, we should still find the original documents + docIDs := []string{"duplicate_doc_1", "duplicate_doc_2"} + for _, docID := range docIDs { + idQuery := NewTermQuery(docID) + idQuery.SetField("_id") + + req := NewTopNSearch(1, idQuery) + dmi, err := reader4Final.Search(context.Background(), req) + if err != nil { + t.Fatalf("Error searching for docID %s: %v", docID, err) + } + + next, err := dmi.Next() + if err != nil { + t.Fatalf("Error getting next result for docID %s: %v", docID, err) + } + + if next == nil { + t.Errorf("Expected to find document with ID %s, but search returned no results", docID) + } else { + t.Logf("Successfully verified document with ID %s exists", docID) + } + } + }) + + // Step 3c: Test with deduplication ENABLED but NO duplicates (should accept unique documents) + t.Run("DeduplicationEnabledNoDuplicates", func(t *testing.T) { + tempDir5 := filepath.Join(os.TempDir(), "bluge_test_dup_dest_enabled_no_dups") + tempSegDir5 := filepath.Join(tempDir5, "temp_segments") + os.RemoveAll(tempDir5) + defer os.RemoveAll(tempDir5) + + config5 := DefaultConfig(tempDir5).WithExternalSegments(tempSegDir5, true) // Deduplication enabled + writer5, err := OpenWriter(config5) + if err != nil { + t.Fatal(err) + } + defer writer5.Close() + + // Add existing documents with DIFFERENT IDs (no duplicates) + existingDoc1 := NewDocument("existing_doc_1"). + AddField(NewKeywordField("type", "existing").StoreValue()). + AddField(NewTextField("content", "existing content 1")) + + existingDoc2 := NewDocument("existing_doc_2"). + AddField(NewKeywordField("type", "existing").StoreValue()). + AddField(NewTextField("content", "existing content 2")) + + // Insert documents using batch + batch := index.NewBatch() + batch.Insert(existingDoc1) + batch.Insert(existingDoc2) + err = writer5.Batch(batch) + if err != nil { + t.Fatal(err) + } + + // Verify we have 2 documents initially + reader5, err := writer5.Reader() + if err != nil { + t.Fatal(err) + } + + initialCount, err := reader5.Count() + if err != nil { + t.Fatal(err) + } + reader5.Close() + + if initialCount != 2 { + t.Fatalf("Expected 2 initial documents, got %d", initialCount) + } + + // Create a fresh segment for this test case to avoid data corruption + tempDirSource := filepath.Join(os.TempDir(), "bluge_test_fresh_source") + tempSegDirSource := filepath.Join(tempDirSource, "temp_segments") + os.RemoveAll(tempDirSource) + defer os.RemoveAll(tempDirSource) + + configSource := DefaultConfig(tempDirSource).WithExternalSegments(tempSegDirSource, true) + writerSource, err := OpenWriter(configSource) + if err != nil { + t.Fatal(err) + } + + // Create fresh documents with UNIQUE IDs (different from existing and source) + freshDoc1 := NewDocument("unique_doc_1"). + AddField(NewKeywordField("type", "fresh").StoreValue()). + AddField(NewTextField("content", "fresh content 1")) + + freshDoc2 := NewDocument("unique_doc_2"). + AddField(NewKeywordField("type", "fresh").StoreValue()). + AddField(NewTextField("content", "fresh content 2")) + + // Insert documents using batch + freshBatch := index.NewBatch() + freshBatch.Insert(freshDoc1) + freshBatch.Insert(freshDoc2) + err = writerSource.Batch(freshBatch) + if err != nil { + t.Fatal(err) + } + + // Close writerSource to persist the segment + err = writerSource.Close() + if err != nil { + t.Fatal(err) + } + + // Extract segment data from the fresh writer + dirSource := configSource.indexConfig.DirectoryFunc() + err = dirSource.Setup(true) // read-only + if err != nil { + t.Fatal(err) + } + + freshSegmentFiles, err := dirSource.List(index.ItemKindSegment) + if err != nil { + t.Fatal(err) + } + + if len(freshSegmentFiles) == 0 { + t.Fatal("No segment files found in fresh source directory") + } + + // Use the most recent segment file + freshSegmentID := freshSegmentFiles[len(freshSegmentFiles)-1] + t.Logf("Found fresh segment file with ID: %d", freshSegmentID) + + freshSegmentData, freshCloser, err := dirSource.Load(index.ItemKindSegment, freshSegmentID) + if err != nil { + t.Fatal(err) + } + defer func() { + if freshCloser != nil { + freshCloser.Close() + } + }() + + freshSegmentBytes, err := freshSegmentData.Read(0, freshSegmentData.Len()) + if err != nil { + t.Fatal(err) + } + + if len(freshSegmentBytes) == 0 { + t.Fatal("Fresh segment data is empty") + } + + t.Logf("Read fresh segment data of %d bytes", len(freshSegmentBytes)) + + // Now stream the fresh segment with unique documents (different docIDs) + receiver5, err := writer5.EnableExternalSegments() + if err != nil { + t.Fatal(err) + } + + err = receiver5.StartSegment() + if err != nil { + t.Fatal(err) + } + + err = receiver5.WriteChunk(freshSegmentBytes) + if err != nil { + t.Fatal(err) + } + + err = receiver5.CompleteSegment() + if err != nil { + t.Fatal(err) + } + + if receiver5.Status() != index.StreamIntroduced { + t.Fatal("Segment not introduced:", receiver5.Status()) + } + + // Verify the segment was accepted (document count should increase) + reader5Final, err := writer5.Reader() + if err != nil { + t.Fatal(err) + } + defer reader5Final.Close() + + finalCount, err := reader5Final.Count() + if err != nil { + t.Fatal(err) + } + + t.Logf("Document count after transfer: %d (was %d)", finalCount, initialCount) + + // Document count should increase since all documents are unique + expectedCount := initialCount + 2 // 2 unique documents from the segment + if finalCount != expectedCount { + t.Errorf("Expected document count to be %d when deduplication is enabled but no duplicates exist, got %d (was %d)", expectedCount, finalCount, initialCount) + } + + if receiver5.Status() != index.StreamIntroduced { + t.Errorf("Expected segment to be introduced when deduplication is enabled but no duplicates exist, got status: %v", receiver5.Status()) + } + + // Verify specific document IDs are present + expectedDocIDs := []string{"existing_doc_1", "existing_doc_2", "unique_doc_1", "unique_doc_2"} + foundDocIDs := make(map[string]bool) + + // Search for each expected document ID + for _, docID := range expectedDocIDs { + query := NewTermQuery(docID).SetField("_id") + searchRequest := NewTopNSearch(10, query) + dmi, err := reader5Final.Search(context.Background(), searchRequest) + if err != nil { + t.Fatalf("Error searching for document %s: %v", docID, err) + } + + next, err := dmi.Next() + if err != nil { + t.Fatalf("Error getting next result for document %s: %v", docID, err) + } + + if next != nil { + foundDocIDs[docID] = true + t.Logf("Found document with ID: %s", docID) + } else { + t.Errorf("Expected document with ID %s to be present, but it was not found", docID) + } + } + + // Verify all expected documents were found + if len(foundDocIDs) != len(expectedDocIDs) { + t.Errorf("Expected to find %d documents, but only found %d", len(expectedDocIDs), len(foundDocIDs)) + } + + // Verify document content for the fresh documents + query1 := NewTermQuery("unique_doc_1").SetField("_id") + searchRequest1 := NewTopNSearch(1, query1) + dmi1, err := reader5Final.Search(context.Background(), searchRequest1) + if err != nil { + t.Fatalf("Error searching for unique_doc_1: %v", err) + } + + next1, err := dmi1.Next() + if err != nil { + t.Fatalf("Error getting next result for unique_doc_1: %v", err) + } + + if next1 != nil { + // Check content field + err = next1.VisitStoredFields(func(field string, value []byte) bool { + if field == "content" && string(value) != "fresh content 1" { + t.Errorf("Expected content 'fresh content 1' for unique_doc_1, got '%s'", string(value)) + } + if field == "type" && string(value) != "fresh" { + t.Errorf("Expected type 'fresh' for unique_doc_1, got '%s'", string(value)) + } + return true + }) + if err != nil { + t.Errorf("Error accessing stored fields for unique_doc_1: %v", err) + } + } else { + t.Error("Expected unique_doc_1 to be found") + } + + query2 := NewTermQuery("unique_doc_2").SetField("_id") + searchRequest2 := NewTopNSearch(1, query2) + dmi2, err := reader5Final.Search(context.Background(), searchRequest2) + if err != nil { + t.Fatalf("Error searching for unique_doc_2: %v", err) + } + + next2, err := dmi2.Next() + if err != nil { + t.Fatalf("Error getting next result for unique_doc_2: %v", err) + } + + if next2 != nil { + // Check content field + err = next2.VisitStoredFields(func(field string, value []byte) bool { + if field == "content" && string(value) != "fresh content 2" { + t.Errorf("Expected content 'fresh content 2' for unique_doc_2, got '%s'", string(value)) + } + if field == "type" && string(value) != "fresh" { + t.Errorf("Expected type 'fresh' for unique_doc_2, got '%s'", string(value)) + } + return true + }) + if err != nil { + t.Errorf("Error accessing stored fields for unique_doc_2: %v", err) + } + } else { + t.Error("Expected unique_doc_2 to be found") + } + + t.Logf("Test completed successfully - document count and docid verification passed") + t.Logf("Expected %d documents, found %d documents", expectedCount, finalCount) + t.Logf("All expected document IDs verified: %v", expectedDocIDs) + t.Logf("Deduplication enabled with no duplicates - all unique documents were accepted") }) t.Log("Duplicate document handling test completed successfully") diff --git a/index/external_segment.go b/index/external_segment.go index e509078..854929e 100644 --- a/index/external_segment.go +++ b/index/external_segment.go @@ -241,18 +241,19 @@ func (esr *ExternalSegmentReceiver) introduceSegment() error { esr.status = StreamFailed return fmt.Errorf("deduplication failed: %w", err) } + if segWrapper != originalSegWrapper { + // Close the original segment wrapper to release file handles + // before attempting to remove the file + _ = originalSegWrapper.Close() - // Close the original segment wrapper to release file handles - // before attempting to remove the file - _ = originalSegWrapper.Close() - - // Try to remove the empty segment file, but don't fail if it's temporarily unavailable - err = esr.writer.directory.Remove(ItemKindSegment, esr.segmentID) - if err != nil { - // Log the error but don't fail the operation - // The file will be cleaned up later by the deletion policy - if esr.writer.config.AsyncError != nil { - esr.writer.config.AsyncError(fmt.Errorf("failed to remove empty segment (will retry later): %w", err)) + // Try to remove the empty segment file, but don't fail if it's temporarily unavailable + err = esr.writer.directory.Remove(ItemKindSegment, esr.segmentID) + if err != nil { + // Log the error but don't fail the operation + // The file will be cleaned up later by the deletion policy + if esr.writer.config.AsyncError != nil { + esr.writer.config.AsyncError(fmt.Errorf("failed to remove empty segment (will retry later): %w", err)) + } } } if segWrapper == nil {