Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
Expand Down Expand Up @@ -60,6 +61,17 @@ public Optional<ConnectorOutputMetadata> finishInsert(
session, GravitinoHandle.unWrap(insertHandle), fragments, computedStatistics);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
ConnectorOutputTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics) {
ConnectorInsertTableHandle insertHandle =
((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
return internalMetadata.finishInsert(session, insertHandle, fragments, computedStatistics);
}

@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
Expand Down Expand Up @@ -67,6 +68,18 @@ public Optional<ConnectorOutputMetadata> finishInsert(
computedStatistics);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
ConnectorOutputTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics) {
ConnectorInsertTableHandle insertHandle =
((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
return internalMetadata.finishInsert(
session, insertHandle, List.of(), fragments, computedStatistics);
}

@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
Expand Down Expand Up @@ -67,6 +68,18 @@ public Optional<ConnectorOutputMetadata> finishInsert(
computedStatistics);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
ConnectorOutputTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics) {
ConnectorInsertTableHandle insertHandle =
((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
return internalMetadata.finishInsert(
session, insertHandle, List.of(), fragments, computedStatistics);
}

@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
Expand Down Expand Up @@ -67,6 +68,18 @@ public Optional<ConnectorOutputMetadata> finishInsert(
computedStatistics);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
ConnectorOutputTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics) {
ConnectorInsertTableHandle insertHandle =
((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
return internalMetadata.finishInsert(
session, insertHandle, List.of(), fragments, computedStatistics);
}

@SuppressWarnings("deprecation")
@Override
public ConnectorMergeTableHandle beginMerge(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
Expand Down Expand Up @@ -73,6 +74,18 @@ public Optional<ConnectorOutputMetadata> finishInsert(
computedStatistics);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
ConnectorOutputTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics) {
ConnectorInsertTableHandle insertHandle =
((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
return internalMetadata.finishInsert(
session, insertHandle, List.of(), fragments, computedStatistics);
}

@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.RetryMode;
Expand Down Expand Up @@ -83,6 +84,18 @@ public Optional<ConnectorOutputMetadata> finishInsert(
computedStatistics);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(
ConnectorSession session,
ConnectorOutputTableHandle tableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics) {
ConnectorInsertTableHandle insertHandle =
((GravitinoOutputTableHandle) tableHandle).getInternalHandle();
return internalMetadata.finishInsert(
session, insertHandle, List.of(), fragments, computedStatistics);
}

@Override
public ConnectorMergeTableHandle beginMerge(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableHandle;
Expand Down Expand Up @@ -220,6 +221,46 @@ public void createTable(
catalogConnectorMetadata.createTable(table, saveMode == SaveMode.IGNORE);
}

@Override
public ConnectorOutputTableHandle beginCreateTable(
ConnectorSession session,
ConnectorTableMetadata tableMetadata,
Optional<ConnectorTableLayout> layout,
RetryMode retryMode,
boolean noExistingData) {
// First, create the table in the Gravitino catalog
GravitinoTable table = metadataAdapter.createTable(tableMetadata);
catalogConnectorMetadata.createTable(table, false);

// Get the table handle from the internal connector for the newly created table
SchemaTableName tableName = tableMetadata.getTable();
ConnectorTableHandle internalTableHandle =
internalMetadata.getTableHandle(session, tableName, Optional.empty(), Optional.empty());

// Delegate to the internal connector's insert path to write data,
// avoiding double table creation in the original connector
List<ColumnHandle> columns =
new ArrayList<>(internalMetadata.getColumnHandles(session, internalTableHandle).values());
ConnectorInsertTableHandle insertTableHandle =
internalMetadata.beginInsert(session, internalTableHandle, columns, retryMode);
return new GravitinoOutputTableHandle(insertTableHandle, tableName);
}

@Override
public Optional<ConnectorTableLayout> getNewTableLayout(
ConnectorSession session, ConnectorTableMetadata tableMetadata) {
return internalMetadata
.getNewTableLayout(session, tableMetadata)
.map(
result ->
result.getPartitioning().isPresent()
? new ConnectorTableLayout(
new GravitinoPartitioningHandle(result.getPartitioning().get()),
result.getPartitionColumns(),
result.supportsMultipleWritersPerPartition())
: new ConnectorTableLayout(result.getPartitionColumns()));
}

@Override
public void createSchema(
ConnectorSession session,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.trino.connector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.SchemaTableName;

/**
* The GravitinoOutputTableHandle is used for handling CTAS (CREATE TABLE AS SELECT) operations.
*
* <p>Internally wraps a {@link ConnectorInsertTableHandle} because the Gravitino connector creates
* the table first via catalogConnectorMetadata, then delegates data writing to the internal
* connector's insert path, avoiding double table creation.
*
* <p>Also stores the {@link SchemaTableName} so that {@code rollbackCreateTable} can drop the table
* from the Gravitino catalog if the CTAS operation fails.
*/
public class GravitinoOutputTableHandle
implements ConnectorOutputTableHandle, GravitinoHandle<ConnectorInsertTableHandle> {

private final String schemaName;
private final String tableName;

private HandleWrapper<ConnectorInsertTableHandle> handleWrapper =
new HandleWrapper<>(ConnectorInsertTableHandle.class);

/**
* Constructs a new GravitinoOutputTableHandle from serialized properties.
*
* @param handleString the serialized handle string
* @param schemaName the schema name of the created table
* @param tableName the table name of the created table
*/
@JsonCreator
public GravitinoOutputTableHandle(
@JsonProperty(HANDLE_STRING) String handleString,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("tableName") String tableName) {
this.handleWrapper = handleWrapper.fromJson(handleString);
this.schemaName = schemaName;
this.tableName = tableName;
}

/**
* Constructs a new GravitinoOutputTableHandle from a ConnectorInsertTableHandle.
*
* @param insertTableHandle the internal connector insert table handle
* @param tableName the schema-qualified table name for rollback support
*/
public GravitinoOutputTableHandle(
ConnectorInsertTableHandle insertTableHandle, SchemaTableName tableName) {
this.handleWrapper = new HandleWrapper<>(insertTableHandle);
this.schemaName = tableName.getSchemaName();
this.tableName = tableName.getTableName();
}

@JsonProperty
@Override
public String getHandleString() {
return handleWrapper.toJson();
}

@Override
public ConnectorInsertTableHandle getInternalHandle() {
return handleWrapper.getHandle();
}

/** Returns the schema name of the table created during CTAS. */
@JsonProperty
public String getSchemaName() {
return schemaName;
}

/** Returns the table name of the table created during CTAS. */
@JsonProperty
public String getTableName() {
return tableName;
}

/** Returns the schema-qualified table name for rollback support. */
public SchemaTableName toSchemaTableName() {
return new SchemaTableName(schemaName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTransactionHandle;
import org.apache.commons.lang3.NotImplementedException;

/** This class provides a ConnectorPageSink for Trino to write data to internal connector. */
public class GravitinoPageSinkProvider implements ConnectorPageSinkProvider {
Expand All @@ -49,7 +48,12 @@ public ConnectorPageSink createPageSink(
ConnectorSession session,
ConnectorOutputTableHandle outputTableHandle,
ConnectorPageSinkId pageSinkId) {
throw new NotImplementedException();
// GravitinoOutputTableHandle wraps a ConnectorInsertTableHandle internally,
// so delegate to the insert-path createPageSink
ConnectorInsertTableHandle insertHandle =
((GravitinoOutputTableHandle) outputTableHandle).getInternalHandle();
return pageSinkProvider.createPageSink(
GravitinoHandle.unWrap(transactionHandle), session, insertHandle, pageSinkId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMergeTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.ConnectorPartitioningHandle;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableHandle;
Expand Down Expand Up @@ -164,6 +165,10 @@ static void registerHandleSerializationModule(ObjectMapper objectMapper) {
new AbstractTypedJacksonModule<>(
ConnectorInsertTableHandle.class, nameResolver, classResolver) {});

objectMapper.registerModule(
new AbstractTypedJacksonModule<>(
ConnectorOutputTableHandle.class, nameResolver, classResolver) {});

objectMapper.registerModule(
new AbstractTypedJacksonModule<>(
ConnectorMergeTableHandle.class, nameResolver, classResolver) {});
Expand Down
Loading
Loading