Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/integrations/datafusion/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@ use crate::error::to_datafusion_error;

mod options;
mod schemas;
mod snapshots;

type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;

const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas", schemas::build)];
const TABLES: &[(&str, Builder)] = &[
("options", options::build),
("schemas", schemas::build),
("snapshots", snapshots::build),
];

/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
///
Expand Down
160 changes: 160 additions & 0 deletions crates/integrations/datafusion/src/system_tables/snapshots.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Mirrors Java [SnapshotsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java).

use std::any::Any;
use std::sync::{Arc, OnceLock};

use async_trait::async_trait;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::catalog::Session;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use paimon::table::{SnapshotManager, Table};

use crate::error::to_datafusion_error;

pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
Ok(Arc::new(SnapshotsTable { table }))
}

fn snapshots_schema() -> SchemaRef {
static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
SCHEMA
.get_or_init(|| {
Arc::new(Schema::new(vec![
Field::new("snapshot_id", DataType::Int64, false),
Field::new("schema_id", DataType::Int64, false),
Field::new("commit_user", DataType::Utf8, false),
Field::new("commit_identifier", DataType::Int64, false),
Field::new("commit_kind", DataType::Utf8, false),
Field::new(
"commit_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("base_manifest_list", DataType::Utf8, false),
Field::new("delta_manifest_list", DataType::Utf8, false),
Field::new("changelog_manifest_list", DataType::Utf8, true),
Field::new("total_record_count", DataType::Int64, true),
Field::new("delta_record_count", DataType::Int64, true),
Field::new("changelog_record_count", DataType::Int64, true),
Field::new("watermark", DataType::Int64, true),
Field::new("next_row_id", DataType::Int64, true),
]))
})
.clone()
}

#[derive(Debug)]
struct SnapshotsTable {
table: Table,
}

#[async_trait]
impl TableProvider for SnapshotsTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
snapshots_schema()
}

fn table_type(&self) -> TableType {
TableType::View
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let sm = SnapshotManager::new(
self.table.file_io().clone(),
self.table.location().to_string(),
);
let snapshots = sm.list_all().await.map_err(to_datafusion_error)?;

let n = snapshots.len();
let mut snapshot_ids = Vec::with_capacity(n);
let mut schema_ids = Vec::with_capacity(n);
let mut commit_users: Vec<String> = Vec::with_capacity(n);
let mut commit_identifiers = Vec::with_capacity(n);
let mut commit_kinds: Vec<String> = Vec::with_capacity(n);
let mut commit_times = Vec::with_capacity(n);
let mut base_manifest_lists: Vec<String> = Vec::with_capacity(n);
let mut delta_manifest_lists: Vec<String> = Vec::with_capacity(n);
let mut changelog_manifest_lists: Vec<Option<String>> = Vec::with_capacity(n);
let mut total_record_counts: Vec<Option<i64>> = Vec::with_capacity(n);
let mut delta_record_counts: Vec<Option<i64>> = Vec::with_capacity(n);
let mut changelog_record_counts: Vec<Option<i64>> = Vec::with_capacity(n);
let mut watermarks: Vec<Option<i64>> = Vec::with_capacity(n);
let mut next_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);

for snap in &snapshots {
snapshot_ids.push(snap.id());
schema_ids.push(snap.schema_id());
commit_users.push(snap.commit_user().to_string());
commit_identifiers.push(snap.commit_identifier());
commit_kinds.push(snap.commit_kind().to_string());
commit_times.push(snap.time_millis() as i64);
base_manifest_lists.push(snap.base_manifest_list().to_string());
delta_manifest_lists.push(snap.delta_manifest_list().to_string());
changelog_manifest_lists.push(snap.changelog_manifest_list().map(str::to_string));
total_record_counts.push(snap.total_record_count());
delta_record_counts.push(snap.delta_record_count());
changelog_record_counts.push(snap.changelog_record_count());
watermarks.push(snap.watermark());
next_row_ids.push(snap.next_row_id());
}

let schema = snapshots_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(snapshot_ids)),
Arc::new(Int64Array::from(schema_ids)),
Arc::new(StringArray::from(commit_users)),
Arc::new(Int64Array::from(commit_identifiers)),
Arc::new(StringArray::from(commit_kinds)),
Arc::new(TimestampMillisecondArray::from(commit_times)),
Arc::new(StringArray::from(base_manifest_lists)),
Arc::new(StringArray::from(delta_manifest_lists)),
Arc::new(StringArray::from(changelog_manifest_lists)),
Arc::new(Int64Array::from(total_record_counts)),
Arc::new(Int64Array::from(delta_record_counts)),
Arc::new(Int64Array::from(changelog_record_counts)),
Arc::new(Int64Array::from(watermarks)),
Arc::new(Int64Array::from(next_row_ids)),
],
)?;

Ok(MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema,
projection.cloned(),
)?)
}
}
80 changes: 80 additions & 0 deletions crates/integrations/datafusion/tests/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,83 @@ async fn test_missing_base_table_for_system_table_errors() {
"expected error to mention both base table and system name, got: {msg}"
);
}

#[tokio::test]
async fn test_snapshots_system_table() {
let (ctx, catalog, _tmp) = create_context().await;
let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$snapshots");
let batches = run_sql(&ctx, &sql).await;

assert!(!batches.is_empty(), "$snapshots should return ≥1 batch");

let arrow_schema = batches[0].schema();
let expected_columns = [
("snapshot_id", DataType::Int64),
("schema_id", DataType::Int64),
("commit_user", DataType::Utf8),
("commit_identifier", DataType::Int64),
("commit_kind", DataType::Utf8),
(
"commit_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
),
("base_manifest_list", DataType::Utf8),
("delta_manifest_list", DataType::Utf8),
("changelog_manifest_list", DataType::Utf8),
("total_record_count", DataType::Int64),
("delta_record_count", DataType::Int64),
("changelog_record_count", DataType::Int64),
("watermark", DataType::Int64),
("next_row_id", DataType::Int64),
];
for (i, (name, dtype)) in expected_columns.iter().enumerate() {
let field = arrow_schema.field(i);
assert_eq!(field.name(), name, "column {i} name");
assert_eq!(field.data_type(), dtype, "column {i} type");
}

// Row count must match the snapshot directory listing.
let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string());
let table = catalog
.get_table(&identifier)
.await
.expect("fixture table should load");
let sm =
paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string());
let all = sm.list_all().await.expect("list_all should succeed");
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows,
all.len(),
"$snapshots rows should match list_all() length"
);

// snapshot_id column must be ascending.
let mut ids: Vec<i64> = Vec::new();
for batch in &batches {
let col = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("snapshot_id is Int64");
for i in 0..batch.num_rows() {
ids.push(col.value(i));
}
}
let mut sorted = ids.clone();
sorted.sort_unstable();
assert_eq!(ids, sorted, "snapshot_id should be ascending");

// commit_kind column must contain a known variant.
let last_batch = batches.last().unwrap();
let kind_col = last_batch
.column(4)
.as_any()
.downcast_ref::<StringArray>()
.expect("commit_kind is Utf8");
let kind = kind_col.value(last_batch.num_rows() - 1);
assert!(
["APPEND", "COMPACT", "OVERWRITE", "ANALYZE"].contains(&kind),
"unexpected commit_kind: {kind}"
);
}
11 changes: 11 additions & 0 deletions crates/paimon/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ pub enum CommitKind {
ANALYZE,
}

impl std::fmt::Display for CommitKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::APPEND => write!(f, "APPEND"),
Self::COMPACT => write!(f, "COMPACT"),
Self::OVERWRITE => write!(f, "OVERWRITE"),
Self::ANALYZE => write!(f, "ANALYZE"),
}
}
}

/// Snapshot for paimon.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/Snapshot.java#L68>.
Expand Down
76 changes: 54 additions & 22 deletions crates/paimon/src/table/snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java).
use crate::io::FileIO;
use crate::spec::Snapshot;
use futures::future::try_join_all;
use std::str;

const SNAPSHOT_DIR: &str = "snapshot";
Expand Down Expand Up @@ -90,26 +91,20 @@ impl SnapshotManager {
id_str.trim().parse().ok()
}

/// List snapshot files and find the id using the given reducer (min or max).
async fn find_by_list_files(&self, reducer: fn(i64, i64) -> i64) -> crate::Result<Option<i64>> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why modify this?

let snapshot_dir = self.snapshot_dir();
let statuses = self.file_io.list_status(&snapshot_dir).await?;
let mut result: Option<i64> = None;
for status in statuses {
if status.is_dir {
continue;
}
let name = status.path.rsplit('/').next().unwrap_or(&status.path);
if let Some(id_str) = name.strip_prefix(SNAPSHOT_PREFIX) {
if let Ok(id) = id_str.parse::<i64>() {
result = Some(match result {
Some(r) => reducer(r, id),
None => id,
});
}
}
}
Ok(result)
/// Scan the snapshot directory and collect all `snapshot-{id}` ids, sorted
/// ascending.
async fn collect_snapshot_ids(&self) -> crate::Result<Vec<i64>> {
let statuses = self.file_io.list_status(&self.snapshot_dir()).await?;
let mut ids: Vec<i64> = statuses
.into_iter()
.filter(|s| !s.is_dir)
.filter_map(|s| {
let name = s.path.rsplit('/').next().unwrap_or(&s.path);
name.strip_prefix(SNAPSHOT_PREFIX)?.parse::<i64>().ok()
})
.collect();
ids.sort_unstable();
Ok(ids)
}

/// Get the latest snapshot id.
Expand All @@ -129,7 +124,7 @@ impl SnapshotManager {
}
}
}
self.find_by_list_files(i64::max).await
Ok(self.collect_snapshot_ids().await?.into_iter().max())
}

/// Get the earliest snapshot id.
Expand All @@ -147,7 +142,18 @@ impl SnapshotManager {
return Ok(Some(hint_id));
}
}
self.find_by_list_files(i64::min).await
Ok(self.collect_snapshot_ids().await?.into_iter().min())
}

/// List all snapshot ids sorted ascending.
pub async fn list_all_ids(&self) -> crate::Result<Vec<i64>> {
self.collect_snapshot_ids().await
}

/// List all snapshots sorted by id ascending.
pub async fn list_all(&self) -> crate::Result<Vec<Snapshot>> {
let ids = self.list_all_ids().await?;
try_join_all(ids.into_iter().map(|id| self.get_snapshot(id))).await
}

/// Get a snapshot by id.
Expand Down Expand Up @@ -366,4 +372,30 @@ mod tests {
let hint = sm.read_hint(&sm.latest_hint_path()).await;
assert_eq!(hint, Some(42));
}

#[tokio::test]
async fn test_list_all_ids_empty() {
let (_, sm) = setup("memory:/test_list_empty").await;
assert!(sm.list_all_ids().await.unwrap().is_empty());
}

#[tokio::test]
async fn test_list_all_ids_sorted() {
let (_, sm) = setup("memory:/test_list_sorted").await;
for id in [3, 1, 2] {
sm.commit_snapshot(&test_snapshot(id)).await.unwrap();
}
assert_eq!(sm.list_all_ids().await.unwrap(), vec![1, 2, 3]);
}

#[tokio::test]
async fn test_list_all_loads_in_order() {
let (_, sm) = setup("memory:/test_list_all").await;
for id in [2, 1, 3] {
sm.commit_snapshot(&test_snapshot(id)).await.unwrap();
}
let snaps = sm.list_all().await.unwrap();
let ids: Vec<i64> = snaps.iter().map(|s| s.id()).collect();
assert_eq!(ids, vec![1, 2, 3]);
}
}
Loading