feat: support V0 iceberg_tables schema for SqlCatalog #2380
Conversation
|
Ok @blackmwk thanks again for reviewing, I was mistaken and have fixed this PR to match iceberg-java much more closely. I also updated the PR title and description to be more accurate since this is more of a feature to support reading V0 tables in iceberg-rust. Thanks! |
| pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; | ||
| /// catalog schema version, setting to "V1" will migrate from V0 to V1 schema | ||
| pub const SQL_CATALOG_PROP_SCHEMA_VERSION: &str = "sql.schema-version"; | ||
|
|
There was a problem hiding this comment.
Seeing these together makes me think sql_schema_version yet iceberg-java has jdbc.schema-version. I do prefer consistency for something like sql.<kebab>.
| )) | ||
| .execute(&pool) | ||
| .await | ||
| .is_ok(); |
There was a problem hiding this comment.
What if the SELECT failed here but the schema is actually V1?
I assume in that case it would pass silently here and fail later when trying to update it. This would be confusing to users
With above said, I don't have a better idea to probe the schema type... at least we should check the error type? any thoughts?
There was a problem hiding this comment.
JDBC has a method to check meta table to inspect the column if exists. Is there anything similar in sqlx?
| let tbl_metadata_location_str = tbl_metadata_location.to_string(); | ||
| self.execute(&format!( | ||
| "INSERT INTO {CATALOG_TABLE_NAME} | ||
| ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) |
There was a problem hiding this comment.
Should we consider V0 schema here as well?
There was a problem hiding this comment.
In iceberg-java, it looks like we only support creating V0 table. So I think it's fine for us to leave it here.
But still quite curious about the case when TableCreation has V1 schema.
| let requested: SchemaVersion = config | ||
| .props | ||
| .get(SQL_CATALOG_PROP_SCHEMA_VERSION) | ||
| .and_then(|v| v.parse().ok()) |
There was a problem hiding this comment.
We should fail on parse error here to avoid silent fallback for misinputs
Similar to sql_bind_style parsing: https://github.com/apache/iceberg-rust/blob/main/crates/catalog/sql/src/catalog.rs#L183
| //! }; | ||
| //! | ||
| //! #[tokio::main] | ||
| //! #[tokio::main(flavor = "current_thread")] |
There was a problem hiding this comment.
why do we need to change this?
| pub const SQL_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; | ||
| /// catalog sql bind style | ||
| pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; | ||
| /// catalog schema version, setting to "V1" will migrate from V0 to V1 schema |
There was a problem hiding this comment.
Please make the comment more clear: this is expected catalog schema version. If it's set to v1 and it's actually v0, the catalog will do a migration.
| pub const SQL_CATALOG_PROP_BIND_STYLE: &str = "sql_bind_style"; | ||
| /// catalog schema version, setting to "V1" will migrate from V0 to V1 schema | ||
| pub const SQL_CATALOG_PROP_SCHEMA_VERSION: &str = "sql.schema-version"; | ||
|
|
| )) | ||
| .execute(&pool) | ||
| .await | ||
| .is_ok(); |
There was a problem hiding this comment.
JDBC has a method to check meta table to inspect the column if exists. Is there anything similar in sqlx?
|
|
||
| // Migrate the schema to V1 if the catalog table does not support views and the caller opted in. | ||
| let schema_version = if is_v1 { | ||
| tracing::debug!("{CATALOG_TABLE_NAME} already supports views"); |
There was a problem hiding this comment.
This is confusing, why not just print messages like detect v1 schema
| let requested: SchemaVersion = config | ||
| .props | ||
| .get(SQL_CATALOG_PROP_SCHEMA_VERSION) | ||
| .and_then(|v| v.parse().ok()) |
| .and_then(|v| v.parse().ok()) | ||
| .unwrap_or(SchemaVersion::V0); | ||
| if requested == SchemaVersion::V1 { | ||
| tracing::debug!("{CATALOG_TABLE_NAME} is being updated to support views"); |
There was a problem hiding this comment.
This should be a warning, and the message should be sth like It's going to be updated to v1
|
|
||
| fn record_type_filter(&self) -> &'static str { | ||
| match self.schema_version { | ||
| SchemaVersion::V1 => "AND (iceberg_type = 'TABLE' OR iceberg_type IS NULL)", |
There was a problem hiding this comment.
I prefer to put these sqls into the SchemaVersion enum, as following:
impl SchemaVersion {
fn alter_table_sql(&self) -> Result<String> {
match self {
V0 => ...
V1 => ...
}
}
}
Which issue does this PR close?
What changes are included in this PR?
This PR adds support for using a V0 SqlCatalog from other implementations like iceberg-python or iceberg-java, and it follows the iceberg-java behavior of checking an explicit
schema-versionproperty and migrating to V1 only-if the user requested this. I also added logging at the same level as iceberg-java.Probe to see if we have a V0 or V1 table, then add the iceberg_type column if it does not exist. More details in apache/iceberg-python#3263
Are these changes tested?