Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,39 @@ CREATE TABLE my_table_all (
CREATE TABLE my_table_all_as PARTITIONED BY (dt) TBLPROPERTIES ('primary-key' = 'dt,hh') AS SELECT * FROM my_table_all;
```

### Create Table Like

A new table can be created from an existing source table. Available from **Spark 3.4**.

```sql
CREATE TABLE target_table LIKE source_table;
```

`CREATE TABLE LIKE` copies the source schema and partitioning.

In `SparkCatalog`, if `USING xxx` is not specified, the target inherits the source provider.

In `SparkGenericCatalog`, use `USING paimon` to enable Paimon `CREATE TABLE LIKE` semantics.

When Paimon handles the command, comments and table properties are copied only when the source and target providers are the same. If the providers are different, only the comment is copied.

`path`, `provider`, `location`, `owner`, `external` and `is-managed-location` are never copied. Users can still override the target table with `TBLPROPERTIES`.

`STORED AS` is not supported in `SparkCatalog`. In `SparkGenericCatalog`, commands without `USING paimon` use Spark native behavior.

```sql
CREATE TABLE source_tbl (
id INT,
name STRING,
pt STRING
) COMMENT 'source comment'
PARTITIONED BY (pt)
TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '5');

-- target inherits the source provider
CREATE TABLE target_tbl LIKE source_tbl;
```

## View

Views are based on the result-set of an SQL query, when using `org.apache.paimon.spark.SparkCatalog`, views are managed by paimon itself.
Expand Down Expand Up @@ -366,4 +399,3 @@ List all tags of a table.
```sql
SHOW TAGS T;
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.SparkSource

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.execution.command.LeafRunnableCommand

import java.util.Locale

import scala.collection.JavaConverters._

case class PaimonCreateTableLikeCommand(
targetCatalog: TableCatalog,
targetIdent: Identifier,
sourceCatalog: TableCatalog,
sourceIdent: Identifier,
provider: Option[String],
location: Option[String],
properties: Map[String, String] = Map.empty,
ifNotExists: Boolean)
extends LeafRunnableCommand
with Logging {

override def run(sparkSession: SparkSession): Seq[Row] = {
val sourceTable = sourceCatalog.loadTable(sourceIdent)
val sourceProperties = sourceTable.properties().asScala.toMap
val sourceProvider =
normalizedProvider(sourceProperties.get(TableCatalog.PROP_PROVIDER), unknownProvider)
val targetProvider = normalizedProvider(
provider.orElse(sourceProperties.get(TableCatalog.PROP_PROVIDER)),
defaultTargetProvider)
val sourceSchema =
CharVarcharUtils.getRawSchema(sourceTable.schema(), sparkSession.sessionState.conf)
try {
targetCatalog.createTable(
targetIdent,
sourceSchema,
sourceTable.partitioning(),
buildCreateProperties(sourceProperties, sourceProvider, targetProvider).asJava)
} catch {
case _: TableAlreadyExistsException if ifNotExists =>
}

Seq.empty
}

private def buildCreateProperties(
sourceProperties: Map[String, String],
sourceProvider: String,
targetProvider: String): Map[String, String] = {
copySourceProperties(sourceProperties, sourceProvider, targetProvider) ++
Map(TableCatalog.PROP_PROVIDER -> targetProvider) ++
location.map(TableCatalog.PROP_LOCATION -> _).toMap ++
properties
}

private def copySourceProperties(
sourceProperties: Map[String, String],
sourceProvider: String,
targetProvider: String): Map[String, String] = {
val copiedComment = sourceProperties
.get(TableCatalog.PROP_COMMENT)
.map(TableCatalog.PROP_COMMENT -> _)
.toMap
val copiedProperties = filterCopiedProperties(sourceProperties)

if (sourceProvider == targetProvider) {
copiedProperties ++ copiedComment
} else {
warnSkippedProperties(sourceProvider, targetProvider, copiedProperties.keySet)
copiedComment
}
}

private def warnSkippedProperties(
sourceProvider: String,
targetProvider: String,
skippedKeys: Set[String]): Unit = {
if (skippedKeys.nonEmpty) {
logWarning(
s"Skip copying source table properties in CREATE TABLE LIKE because source provider " +
s"'$sourceProvider' differs from target provider " +
s"'$targetProvider': " +
skippedKeys.toSeq.sorted.mkString(","))
}
}

private def normalizedProvider(provider: Option[String], defaultProvider: String): String = {
provider.map(normalizeProvider).getOrElse(defaultProvider)
}

private def normalizeProvider(provider: String): String = {
provider.toLowerCase(Locale.ROOT)
}

private val unknownProvider = "unknown"

private val defaultTargetProvider = normalizeProvider(SparkSource.NAME)

private def filterCopiedProperties(sourceProperties: Map[String, String]): Map[String, String] = {
sourceProperties.filterNot {
case (key, _) if key == CoreOptions.PATH.key() => true
case (TableCatalog.PROP_PROVIDER, _) => true
case (TableCatalog.PROP_COMMENT, _) => true
case (TableCatalog.PROP_LOCATION, _) => true
case (TableCatalog.PROP_OWNER, _) => true
case (TableCatalog.PROP_EXTERNAL, _) => true
case (TableCatalog.PROP_IS_MANAGED_LOCATION, _) => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util
import java.util.Locale

import scala.collection.JavaConverters._

Expand All @@ -50,6 +51,7 @@ case class PaimonFormatTable(table: FormatTable)

override def properties: util.Map[String, String] = {
val properties = new util.HashMap[String, String](table.options())
properties.put(TableCatalog.PROP_PROVIDER, table.format.name().toLowerCase(Locale.ROOT))
if (table.comment.isPresent) {
properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
Seq(
RewritePaimonViewCommands(sparkSession),
RewritePaimonFunctionCommands(sparkSession),
RewriteCreateTableLikeCommand(sparkSession),
RewriteSparkDDLCommands(sparkSession)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog}
import org.apache.paimon.spark.catalog.SparkBaseCatalog
import org.apache.paimon.spark.commands.PaimonCreateTableLikeCommand

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.execution.command.{CreateTableLikeCommand => SparkCreateTableLikeCommand}

case class RewriteCreateTableLikeCommand(spark: SparkSession)
extends Rule[LogicalPlan]
with LookupCatalog {

protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager
private lazy val gteqSpark3_4: Boolean = org.apache.spark.SPARK_VERSION >= "3.4"

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!gteqSpark3_4) {
return plan
}

plan.resolveOperatorsUp {
case c: SparkCreateTableLikeCommand =>
val targetParts = toMultipartIdentifier(c.targetTable)

targetParts match {
case CatalogAndIdentifier(targetCatalog: SparkCatalog, targetIdent) =>
if (usesHiveStorageSyntax(c.fileFormat)) {
throw new UnsupportedOperationException(
"CREATE TABLE LIKE ... STORED AS is not supported for SparkCatalog.")
}
createTableLikeCommand(c, targetCatalog, targetIdent)
case CatalogAndIdentifier(targetCatalog: SparkGenericCatalog, targetIdent)
if !usesHiveStorageSyntax(c.fileFormat) &&
c.provider.exists(SparkBaseCatalog.usePaimon) =>
createTableLikeCommand(c, targetCatalog, targetIdent)
case _ => c
}
}
}

private def createTableLikeCommand(
command: SparkCreateTableLikeCommand,
targetCatalog: SparkBaseCatalog,
targetIdent: Identifier): LogicalPlan = {
toMultipartIdentifier(command.sourceTable) match {
case CatalogAndIdentifier(sourceCatalog: TableCatalog, sourceIdent) =>
PaimonCreateTableLikeCommand(
targetCatalog,
targetIdent,
sourceCatalog,
sourceIdent,
command.provider,
command.fileFormat.locationUri.map(_.toString),
command.properties,
command.ifNotExists
)
case _ =>
throw new UnsupportedOperationException(
s"CREATE TABLE LIKE source table must be resolved from TableCatalog: ${command.sourceTable}.")
}
}

private def usesHiveStorageSyntax(fileFormat: CatalogStorageFormat): Boolean = {
fileFormat.inputFormat.isDefined
}

private def toMultipartIdentifier(
targetTable: org.apache.spark.sql.catalyst.TableIdentifier): Seq[String] = {
(targetTable.catalog.toSeq ++ targetTable.database.toSeq :+ targetTable.table).toList
}
}
Loading
Loading