diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs index 65a2176d..96b038b3 100644 --- a/crates/integrations/datafusion/src/system_tables/mod.rs +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -31,10 +31,15 @@ use crate::error::to_datafusion_error; mod options; mod schemas; +mod snapshots; type Builder = fn(Table) -> DFResult>; -const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas", schemas::build)]; +const TABLES: &[(&str, Builder)] = &[ + ("options", options::build), + ("schemas", schemas::build), + ("snapshots", snapshots::build), +]; /// Parse a Paimon object name into `(base_table, optional system_table_name)`. /// diff --git a/crates/integrations/datafusion/src/system_tables/snapshots.rs b/crates/integrations/datafusion/src/system_tables/snapshots.rs new file mode 100644 index 00000000..6d0ae0b5 --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/snapshots.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [SnapshotsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java). + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; +use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::catalog::Session; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use paimon::table::{SnapshotManager, Table}; + +use crate::error::to_datafusion_error; + +pub(super) fn build(table: Table) -> DFResult> { + Ok(Arc::new(SnapshotsTable { table })) +} + +fn snapshots_schema() -> SchemaRef { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("snapshot_id", DataType::Int64, false), + Field::new("schema_id", DataType::Int64, false), + Field::new("commit_user", DataType::Utf8, false), + Field::new("commit_identifier", DataType::Int64, false), + Field::new("commit_kind", DataType::Utf8, false), + Field::new( + "commit_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("base_manifest_list", DataType::Utf8, false), + Field::new("delta_manifest_list", DataType::Utf8, false), + Field::new("changelog_manifest_list", DataType::Utf8, true), + Field::new("total_record_count", DataType::Int64, true), + Field::new("delta_record_count", DataType::Int64, true), + Field::new("changelog_record_count", DataType::Int64, true), + Field::new("watermark", DataType::Int64, true), + Field::new("next_row_id", DataType::Int64, true), + ])) + }) + .clone() +} + +#[derive(Debug)] +struct SnapshotsTable { + table: Table, +} + +#[async_trait] +impl TableProvider for SnapshotsTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + snapshots_schema() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let sm = SnapshotManager::new( + self.table.file_io().clone(), + self.table.location().to_string(), + ); + let snapshots = sm.list_all().await.map_err(to_datafusion_error)?; + + let n = snapshots.len(); + let mut snapshot_ids = Vec::with_capacity(n); + let mut schema_ids = Vec::with_capacity(n); + let mut commit_users: Vec = Vec::with_capacity(n); + let mut commit_identifiers = Vec::with_capacity(n); + let mut commit_kinds: Vec = Vec::with_capacity(n); + let mut commit_times = Vec::with_capacity(n); + let mut base_manifest_lists: Vec = Vec::with_capacity(n); + let mut delta_manifest_lists: Vec = Vec::with_capacity(n); + let mut changelog_manifest_lists: Vec> = Vec::with_capacity(n); + let mut total_record_counts: Vec> = Vec::with_capacity(n); + let mut delta_record_counts: Vec> = Vec::with_capacity(n); + let mut changelog_record_counts: Vec> = Vec::with_capacity(n); + let mut watermarks: Vec> = Vec::with_capacity(n); + let mut next_row_ids: Vec> = Vec::with_capacity(n); + + for snap in &snapshots { + snapshot_ids.push(snap.id()); + schema_ids.push(snap.schema_id()); + commit_users.push(snap.commit_user().to_string()); + commit_identifiers.push(snap.commit_identifier()); + commit_kinds.push(snap.commit_kind().to_string()); + commit_times.push(snap.time_millis() as i64); + base_manifest_lists.push(snap.base_manifest_list().to_string()); + delta_manifest_lists.push(snap.delta_manifest_list().to_string()); + changelog_manifest_lists.push(snap.changelog_manifest_list().map(str::to_string)); + total_record_counts.push(snap.total_record_count()); + delta_record_counts.push(snap.delta_record_count()); + changelog_record_counts.push(snap.changelog_record_count()); + watermarks.push(snap.watermark()); + next_row_ids.push(snap.next_row_id()); + } + + let schema = snapshots_schema(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(snapshot_ids)), + Arc::new(Int64Array::from(schema_ids)), + Arc::new(StringArray::from(commit_users)), + Arc::new(Int64Array::from(commit_identifiers)), + Arc::new(StringArray::from(commit_kinds)), + Arc::new(TimestampMillisecondArray::from(commit_times)), + Arc::new(StringArray::from(base_manifest_lists)), + Arc::new(StringArray::from(delta_manifest_lists)), + Arc::new(StringArray::from(changelog_manifest_lists)), + Arc::new(Int64Array::from(total_record_counts)), + Arc::new(Int64Array::from(delta_record_counts)), + Arc::new(Int64Array::from(changelog_record_counts)), + Arc::new(Int64Array::from(watermarks)), + Arc::new(Int64Array::from(next_row_ids)), + ], + )?; + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema, + projection.cloned(), + )?) + } +} diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs index 734a81ed..f9c3378c 100644 --- a/crates/integrations/datafusion/tests/system_tables.rs +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -251,3 +251,83 @@ async fn test_missing_base_table_for_system_table_errors() { "expected error to mention both base table and system name, got: {msg}" ); } + +#[tokio::test] +async fn test_snapshots_system_table() { + let (ctx, catalog, _tmp) = create_context().await; + let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$snapshots"); + let batches = run_sql(&ctx, &sql).await; + + assert!(!batches.is_empty(), "$snapshots should return ≥1 batch"); + + let arrow_schema = batches[0].schema(); + let expected_columns = [ + ("snapshot_id", DataType::Int64), + ("schema_id", DataType::Int64), + ("commit_user", DataType::Utf8), + ("commit_identifier", DataType::Int64), + ("commit_kind", DataType::Utf8), + ( + "commit_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + ), + ("base_manifest_list", DataType::Utf8), + ("delta_manifest_list", DataType::Utf8), + ("changelog_manifest_list", DataType::Utf8), + ("total_record_count", DataType::Int64), + ("delta_record_count", DataType::Int64), + ("changelog_record_count", DataType::Int64), + ("watermark", DataType::Int64), + ("next_row_id", DataType::Int64), + ]; + for (i, (name, dtype)) in expected_columns.iter().enumerate() { + let field = arrow_schema.field(i); + assert_eq!(field.name(), name, "column {i} name"); + assert_eq!(field.data_type(), dtype, "column {i} type"); + } + + // Row count must match the snapshot directory listing. + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let table = catalog + .get_table(&identifier) + .await + .expect("fixture table should load"); + let sm = + paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string()); + let all = sm.list_all().await.expect("list_all should succeed"); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, + all.len(), + "$snapshots rows should match list_all() length" + ); + + // snapshot_id column must be ascending. + let mut ids: Vec = Vec::new(); + for batch in &batches { + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("snapshot_id is Int64"); + for i in 0..batch.num_rows() { + ids.push(col.value(i)); + } + } + let mut sorted = ids.clone(); + sorted.sort_unstable(); + assert_eq!(ids, sorted, "snapshot_id should be ascending"); + + // commit_kind column must contain a known variant. + let last_batch = batches.last().unwrap(); + let kind_col = last_batch + .column(4) + .as_any() + .downcast_ref::() + .expect("commit_kind is Utf8"); + let kind = kind_col.value(last_batch.num_rows() - 1); + assert!( + ["APPEND", "COMPACT", "OVERWRITE", "ANALYZE"].contains(&kind), + "unexpected commit_kind: {kind}" + ); +} diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index 013211b6..ef6664c5 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -35,6 +35,17 @@ pub enum CommitKind { ANALYZE, } +impl std::fmt::Display for CommitKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::APPEND => write!(f, "APPEND"), + Self::COMPACT => write!(f, "COMPACT"), + Self::OVERWRITE => write!(f, "OVERWRITE"), + Self::ANALYZE => write!(f, "ANALYZE"), + } + } +} + /// Snapshot for paimon. /// /// Impl Reference: . diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index abff248e..1e8e1735 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -20,6 +20,7 @@ //! Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java). use crate::io::FileIO; use crate::spec::Snapshot; +use futures::future::try_join_all; use std::str; const SNAPSHOT_DIR: &str = "snapshot"; @@ -150,6 +151,37 @@ impl SnapshotManager { self.find_by_list_files(i64::min).await } + /// List all snapshot ids sorted ascending. Returns an empty vector when + /// the snapshot directory does not exist. + pub async fn list_all_ids(&self) -> crate::Result> { + let snapshot_dir = self.snapshot_dir(); + let statuses = match self.file_io.list_status(&snapshot_dir).await { + Ok(s) => s, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(Vec::new()); + } + Err(e) => return Err(e), + }; + let mut ids: Vec = statuses + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + let name = s.path.rsplit('/').next().unwrap_or(&s.path); + name.strip_prefix(SNAPSHOT_PREFIX)?.parse::().ok() + }) + .collect(); + ids.sort_unstable(); + Ok(ids) + } + + /// List all snapshots sorted by id ascending. + pub async fn list_all(&self) -> crate::Result> { + let ids = self.list_all_ids().await?; + try_join_all(ids.into_iter().map(|id| self.get_snapshot(id))).await + } + /// Get a snapshot by id. pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result { let snapshot_path = self.snapshot_path(snapshot_id); @@ -366,4 +398,38 @@ mod tests { let hint = sm.read_hint(&sm.latest_hint_path()).await; assert_eq!(hint, Some(42)); } + + #[tokio::test] + async fn test_list_all_ids_empty() { + let (_, sm) = setup("memory:/test_list_empty").await; + assert!(sm.list_all_ids().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_ids_missing_dir_returns_empty() { + let file_io = test_file_io(); + let sm = SnapshotManager::new(file_io, "memory:/test_list_missing".to_string()); + assert!(sm.list_all_ids().await.unwrap().is_empty()); + assert!(sm.list_all().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_ids_sorted() { + let (_, sm) = setup("memory:/test_list_sorted").await; + for id in [3, 1, 2] { + sm.commit_snapshot(&test_snapshot(id)).await.unwrap(); + } + assert_eq!(sm.list_all_ids().await.unwrap(), vec![1, 2, 3]); + } + + #[tokio::test] + async fn test_list_all_loads_in_order() { + let (_, sm) = setup("memory:/test_list_all").await; + for id in [2, 1, 3] { + sm.commit_snapshot(&test_snapshot(id)).await.unwrap(); + } + let snaps = sm.list_all().await.unwrap(); + let ids: Vec = snaps.iter().map(|s| s.id()).collect(); + assert_eq!(ids, vec![1, 2, 3]); + } }