Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions crates/iceberg/src/spec/manifest/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,6 @@ impl ManifestWriter {
/// Add a delete manifest entry. This method will update following status of the entry:
/// - Update the entry status to `Deleted`
/// - Set the snapshot id to the current snapshot id
///
/// # TODO
/// Remove this allow later
#[allow(dead_code)]
pub(crate) fn add_delete_entry(&mut self, mut entry: ManifestEntry) -> Result<()> {
self.check_data_file(&entry.data_file)?;
entry.status = ManifestStatus::Deleted;
Expand Down Expand Up @@ -345,7 +341,7 @@ impl ManifestWriter {
Ok(())
}

/// Add an file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
/// Add a file as existing manifest entry. The original data and file sequence numbers, snapshot ID,
/// which were assigned at commit, must be preserved when adding an existing entry.
pub fn add_existing_file(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub const MAIN_BRANCH: &str = "main";

/// Reference to [`Snapshot`].
pub type SnapshotRef = Arc<Snapshot>;
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be Hash now?

#[serde(rename_all = "lowercase")]
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
pub enum Operation {
Expand Down
16 changes: 8 additions & 8 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ impl FastAppendAction {
#[async_trait]
impl TransactionAction for FastAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
self.key_metadata.clone(),
self.snapshot_properties.clone(),
self.added_data_files.clone(),
);
let snapshot_producer = SnapshotProducer::builder()
.with_table(table)
.with_commit_uuid(self.commit_uuid.unwrap_or_else(Uuid::now_v7))
.with_key_metadata(self.key_metadata.clone())
.with_snapshot_properties(self.snapshot_properties.clone())
.with_added_data_files(self.added_data_files.clone())
.build();

Comment on lines -87 to 94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change

// validate added files
snapshot_producer.validate_added_data_files()?;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl SnapshotProduceOperation for FastAppendOperation {

async fn existing_manifest(
&self,
snapshot_produce: &SnapshotProducer<'_>,
snapshot_produce: &mut SnapshotProducer<'_>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be mut?

) -> Result<Vec<ManifestFile>> {
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
return Ok(vec![]);
Expand Down
156 changes: 119 additions & 37 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ops::RangeFrom;
use std::ops::{Deref, RangeFrom};

use typed_builder::TypedBuilder;
use uuid::Uuid;

use crate::error::Result;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot,
SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary,
TableProperties, update_snapshot_summaries,
DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType,
ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder,
Operation, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct,
StructType, Summary, TableProperties, update_snapshot_summaries,
};
use crate::table::Table;
use crate::transaction::ActionCommit;
Expand Down Expand Up @@ -67,7 +68,6 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
fn operation(&self) -> Operation;

/// Returns manifest entries that should be marked as deleted in the new snapshot.
#[allow(unused)]
fn delete_entries(
&self,
snapshot_produce: &SnapshotProducer,
Expand All @@ -83,7 +83,7 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
/// - **Delete operations**: May exclude manifests for partitions being deleted
fn existing_manifest(
&self,
snapshot_produce: &SnapshotProducer<'_>,
snapshot_produce: &mut SnapshotProducer<'_>,
) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
}

Expand All @@ -107,41 +107,39 @@ pub(crate) trait ManifestProcess: Send + Sync {
) -> Vec<ManifestFile>;
}

#[derive(TypedBuilder)]
#[builder(field_defaults(setter(prefix = "with_")))]
pub(crate) struct SnapshotProducer<'a> {
pub(crate) table: &'a Table,
#[builder(
setter(skip),
default_code = "SnapshotProducer::generate_unique_snapshot_id(table)"
)]
snapshot_id: i64,
commit_uuid: Uuid,
#[builder(default)]
key_metadata: Option<Vec<u8>>,
#[builder(default)]
snapshot_properties: HashMap<String, String>,
#[builder(default)]
added_data_files: Vec<DataFile>,
#[builder(default)]
added_delete_files: Vec<DataFile>,
#[builder(default)]
pub deleted_data_files: Vec<DataFile>,
#[builder(default)]
pub deleted_delete_files: Vec<DataFile>,
Comment thread
CTTY marked this conversation as resolved.
Outdated
// A counter used to generate unique manifest file names.
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
#[builder(setter(skip), default_code = "(0..)")]
manifest_counter: RangeFrom<u64>,
}

impl<'a> SnapshotProducer<'a> {
pub(crate) fn new(
table: &'a Table,
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
) -> Self {
Self {
table,
snapshot_id: Self::generate_unique_snapshot_id(table),
commit_uuid,
key_metadata,
snapshot_properties,
added_data_files,
manifest_counter: (0..),
}
}

pub(crate) fn validate_added_data_files(&self) -> Result<()> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for data_file in &self.added_data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
if data_file.content_type() != DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
"Only data content type is allowed for fast append",
Expand Down Expand Up @@ -223,7 +221,11 @@ impl<'a> SnapshotProducer<'a> {
snapshot_id
}

fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result<ManifestWriter> {
pub(crate) fn new_manifest_writer(
&mut self,
content: ManifestContentType,
spec_id: i32,
) -> Result<ManifestWriter> {
let new_manifest_path = format!(
"{}/{}/{}-m{}.{}",
self.table.metadata().location(),
Expand All @@ -240,8 +242,12 @@ impl<'a> SnapshotProducer<'a> {
self.table.metadata().current_schema().clone(),
self.table
.metadata()
.default_partition_spec()
.as_ref()
.partition_spec_by_id(spec_id)
.ok_or(Error::new(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok_or will allocate the error on the success path, ok_or_else might be better?

ErrorKind::DataInvalid,
format!("Partition spec with id: {spec_id} is not found!"),
))?
.deref()
.clone(),
);
match self.table.metadata().format_version() {
Expand Down Expand Up @@ -289,8 +295,15 @@ impl<'a> SnapshotProducer<'a> {
}

// Write manifest file for added data files and return the ManifestFile for ManifestList.
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
async fn write_added_manifest(
&mut self,
content_type: ManifestContentType,
) -> Result<ManifestFile> {
let added_data_files = match content_type {
Comment thread
CTTY marked this conversation as resolved.
Outdated
ManifestContentType::Data => std::mem::take(&mut self.added_data_files),
ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files),
};

if added_data_files.is_empty() {
return Err(Error::new(
ErrorKind::PreconditionFailed,
Expand All @@ -312,13 +325,69 @@ impl<'a> SnapshotProducer<'a> {
builder.build()
}
});
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
let mut writer = self.new_manifest_writer(
content_type,
self.table.metadata().default_partition_spec_id(),
)?;
for entry in manifest_entries {
writer.add_entry(entry)?;
}
writer.write_manifest_file().await
}

async fn write_deleted_manifest(
&mut self,
deleted_entries: Vec<ManifestEntry>,
) -> Result<Vec<ManifestFile>> {
if deleted_entries.is_empty() {
Ok(Vec::new())
} else {
Comment on lines +342 to +344
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we early return here to avoid the nesting?

Suggested change
if deleted_entries.is_empty() {
Ok(Vec::new())
} else {
if deleted_entries.is_empty() {
return Ok(Vec::new())
}

// Initialize partition groups
let mut partition_groups = HashMap::new();
for entry in deleted_entries {
partition_groups
.entry(entry.data_file().partition_spec_id)
.or_insert_with(Vec::new)
.push(entry);
}

// Write manifest files for each spec-entries pair
let mut deleted_manifests = Vec::new();
for (spec_id, entries) in partition_groups {
Comment thread
CTTY marked this conversation as resolved.
let mut data_manifest_writer: Option<ManifestWriter> = None;
let mut delete_manifest_writer: Option<ManifestWriter> = None;
for entry in entries {
match entry.data_file().content_type() {
DataContentType::Data => data_manifest_writer
.get_or_insert(
self.new_manifest_writer(ManifestContentType::Data, spec_id)?,
)
.add_entry(entry)?,
Comment thread
CTTY marked this conversation as resolved.
Outdated
DataContentType::PositionDeletes | DataContentType::EqualityDeletes => {
delete_manifest_writer
.get_or_insert(
self.new_manifest_writer(
ManifestContentType::Deletes,
spec_id,
)?,
)
.add_delete_entry(entry)?
}
}
}

if let Some(writer) = data_manifest_writer {
deleted_manifests.push(writer.write_manifest_file().await?);
};
if let Some(writer) = delete_manifest_writer {
deleted_manifests.push(writer.write_manifest_file().await?);
};
}

Ok(deleted_manifests)
}
}

async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
&mut self,
snapshot_produce_operation: &OP,
Expand All @@ -329,10 +398,15 @@ impl<'a> SnapshotProducer<'a> {
// TODO: Allowing snapshot property setup with no added data files is a workaround.
// We should clean it up after all necessary actions are supported.
// For details, please refer to https://github.com/apache/iceberg-rust/issues/1548
if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() {
if self.added_data_files.is_empty()
&& self.added_delete_files.is_empty()
&& self.deleted_data_files.is_empty()
&& self.deleted_delete_files.is_empty()
&& self.snapshot_properties.is_empty()
{
return Err(Error::new(
ErrorKind::PreconditionFailed,
"No added data files or added snapshot properties found when write a manifest file",
"No added data files, delete files, or snapshot properties found when writing a manifest file",
));
}

Expand All @@ -341,12 +415,20 @@ impl<'a> SnapshotProducer<'a> {

// Process added entries.
if !self.added_data_files.is_empty() {
let added_manifest = self.write_added_manifest().await?;
let added_manifest = self.write_added_manifest(ManifestContentType::Data).await?;
manifest_files.push(added_manifest);
}
if !self.added_delete_files.is_empty() {
let added_manifest = self
.write_added_manifest(ManifestContentType::Deletes)
.await?;
manifest_files.push(added_manifest);
}

// # TODO
// Support process delete entries.
let delete_manifests = self
.write_deleted_manifest(snapshot_produce_operation.delete_entries(self).await?)
.await?;
manifest_files.extend(delete_manifests);

let manifest_files = manifest_process.process_manifests(self, manifest_files);
Ok(manifest_files)
Expand Down
Loading