diff --git a/Cargo.lock b/Cargo.lock index 248a9e8e8..0d581d015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4397,17 +4397,22 @@ dependencies = [ name = "moonlink_service" version = "0.0.1" dependencies = [ + "arrow", + "arrow-array", "arrow-ipc", "arrow-schema", "axum 0.8.4", + "bytes", "clap", "moonlink", "moonlink_backend", "moonlink_metadata_store", "moonlink_rpc", + "parquet", "reqwest", "serde", "serde_json", + "tempfile", "thiserror 2.0.14", "tokio", "tower-http", diff --git a/src/moonlink/src/lib.rs b/src/moonlink/src/lib.rs index 0a99417d1..0f5889410 100644 --- a/src/moonlink/src/lib.rs +++ b/src/moonlink/src/lib.rs @@ -26,7 +26,7 @@ pub use table_notify::TableEvent; pub use union_read::{ReadState, ReadStateFilepathRemap, ReadStateManager}; #[cfg(any(test, feature = "test-utils"))] -pub use union_read::decode_read_state_for_testing; +pub use union_read::{decode_read_state_for_testing, decode_serialized_read_state_for_testing}; #[cfg(feature = "bench")] pub use storage::GlobalIndex; diff --git a/src/moonlink/src/union_read.rs b/src/moonlink/src/union_read.rs index 831bc3f30..8c7844764 100644 --- a/src/moonlink/src/union_read.rs +++ b/src/moonlink/src/union_read.rs @@ -7,4 +7,4 @@ pub use read_state::ReadStateFilepathRemap; pub use read_state_manager::ReadStateManager; #[cfg(any(test, feature = "test-utils"))] -pub use read_state::decode_read_state_for_testing; +pub use read_state::{decode_read_state_for_testing, decode_serialized_read_state_for_testing}; diff --git a/src/moonlink/src/union_read/read_state.rs b/src/moonlink/src/union_read/read_state.rs index 1c2dca3c6..dc81737fc 100644 --- a/src/moonlink/src/union_read/read_state.rs +++ b/src/moonlink/src/union_read/read_state.rs @@ -139,6 +139,24 @@ pub fn decode_read_state_for_testing( ) } +#[cfg(any(test, feature = "test-utils"))] +#[allow(clippy::type_complexity)] +pub fn decode_serialized_read_state_for_testing( + data: Vec, +) -> ( + Vec, /*data_file_paths*/ + Vec, /*puffin_file_paths*/ + Vec, + Vec<(u32, u32)>, +) { + let read_state = ReadState { + data, + associated_files: vec![], + cache_handles: vec![], + }; + decode_read_state_for_testing(&read_state) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/moonlink_service/Cargo.toml b/src/moonlink_service/Cargo.toml index 39a2352d1..9cff58ec1 100644 --- a/src/moonlink_service/Cargo.toml +++ b/src/moonlink_service/Cargo.toml @@ -25,6 +25,15 @@ tokio = { workspace = true } tower-http = { version = "0.6", features = ["cors", "trace"] } tracing = { workspace = true } +[dev-dependencies] +reqwest = { version = "0.12", features = ["json"] } +arrow-array = { workspace = true } +bytes = "1" +tokio = { workspace = true } +parquet = { workspace = true } +arrow = { workspace = true } +tempfile = { workspace = true } + [[example]] name = "rest_api_demo" required-features = ["rest_api_demo"] diff --git a/src/moonlink_service/src/lib.rs b/src/moonlink_service/src/lib.rs index a282fb981..5ceba3eac 100644 --- a/src/moonlink_service/src/lib.rs +++ b/src/moonlink_service/src/lib.rs @@ -162,3 +162,6 @@ pub async fn start_with_config(config: ServiceConfig) -> Result<()> { info!("Moonlink service shut down complete"); Ok(()) } + +#[cfg(test)] +mod moonlink_standalone_test; diff --git a/src/moonlink_service/src/moonlink_standalone_test.rs b/src/moonlink_service/src/moonlink_standalone_test.rs new file mode 100644 index 000000000..2399923e6 --- /dev/null +++ b/src/moonlink_service/src/moonlink_standalone_test.rs @@ -0,0 +1,161 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use moonlink::decode_serialized_read_state_for_testing; +use serde_json::json; +use bytes::Bytes; +use tokio::net::TcpStream; +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow::datatypes::Schema as ArrowSchema; +use arrow::datatypes::{DataType, Field}; +use tempfile::TempDir; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use std::io::Cursor; +use reqwest; + +use moonlink_rpc::{scan_table_begin, scan_table_end}; +use crate::{start_with_config, ServiceConfig, READINESS_PROBE_PORT}; + +/// Local moonlink REST API IP/port address. +const REST_ADDR: &str = "http://54.245.134.191:3030"; +/// Local moonlink server IP/port address. +const MOONLINK_ADDR: &str = "54.245.134.191:3031"; +/// Test database name. +const DATABASE: &str = "pg_mooncake"; +/// Test table name. +const TABLE: &str = "public.test-table"; + +/// Send request to readiness endpoint. +async fn test_readiness_probe() { + let url = format!("http://54.245.134.191:{}/ready", READINESS_PROBE_PORT); + loop { + if let Ok(resp) = reqwest::get(&url).await { + if resp.status() == reqwest::StatusCode::OK { + return; + } + } + println!("not ready, waiting..."); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} + +/// Util function to load all record batches inside of the given [`path`]. +pub async fn read_all_batches(url: &str) -> Vec { + let resp = reqwest::get(url).await.unwrap(); + assert!(resp.status().is_success()); + + let data: Bytes = resp.bytes().await.unwrap(); + let mut reader = ParquetRecordBatchReaderBuilder::try_new(data).unwrap() + .build().unwrap(); + + // Collect all record batches. + let batches = reader + .into_iter() + .map(|b| b.unwrap()) // handle Err properly in production + .collect(); + + batches +} + +/// Util function to create test arrow schema. +fn create_test_arrow_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, /*nullable=*/false).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "0".to_string(), + )])), + Field::new("name", DataType::Utf8, /*nullable=*/false).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "1".to_string(), + )])), + Field::new("email", DataType::Utf8, /*nullable=*/true).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "2".to_string(), + )])), + Field::new("age", DataType::Int32, /*nullable=*/true).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "3".to_string(), + )])), + ])) +} + +#[tokio::test] +async fn test_moonlink_standalone() { + println!("before readiness probe"); + test_readiness_probe().await; + println!("after readiness probe"); + + // Create test table. + let client = reqwest::Client::new(); + let create_table_payload = json!({ + "mooncake_database": DATABASE, + "mooncake_table": TABLE, + "schema": [ + {"name": "id", "data_type": "int32", "nullable": false}, + {"name": "name", "data_type": "string", "nullable": false}, + {"name": "email", "data_type": "string", "nullable": true}, + {"name": "age", "data_type": "int32", "nullable": true} + ] + }); + let response = client + .post(format!("{REST_ADDR}/tables/{TABLE}")) + .header("content-type", "application/json") + .json(&create_table_payload) + .send() + .await.unwrap(); + println!("response = {:?}", response); + assert!(response.status().is_success()); + + // Ingest some data. + let insert_payload = json!({ + "operation": "insert", + "data": { + "id": 4, + "name": "Dad", + "email": "Dad@example.com", + "age": 40 + } + }); + let response = client + .post(format!("{REST_ADDR}/ingest/{TABLE}")) + .header("content-type", "application/json") + .json(&insert_payload) + .send() + .await.unwrap(); + println!("insert response = {:?}", response); + assert!(response.status().is_success()); + + // Scan table and get data file and puffin files back. + let mut moonlink_stream = TcpStream::connect(MOONLINK_ADDR).await.unwrap(); + let bytes = scan_table_begin( + &mut moonlink_stream, + DATABASE.to_string(), + TABLE.to_string(), + 0, + ).await.unwrap(); + let (data_file_paths, puffin_file_paths, puffin_deletion, positional_deletion) = decode_serialized_read_state_for_testing(bytes); + assert_eq!(data_file_paths.len(), 1); + println!("data file paths = {:?}", data_file_paths); + let record_batches = read_all_batches(&data_file_paths[0]).await; + let expected_arrow_batch = RecordBatch::try_new( + create_test_arrow_schema(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StringArray::from(vec!["Alice Johnson".to_string()])), + Arc::new(StringArray::from(vec!["alice@example.com".to_string()])), + Arc::new(Int32Array::from(vec![30])), + ], + ) + .unwrap(); + assert_eq!(record_batches, vec![expected_arrow_batch]); + + assert!(puffin_file_paths.is_empty()); + assert!(puffin_deletion.is_empty()); + assert!(positional_deletion.is_empty()); + + scan_table_end( + &mut moonlink_stream, + DATABASE.to_string(), + TABLE.to_string(), + ).await.unwrap(); +}