diff --git a/rm-datasource/pom.xml b/rm-datasource/pom.xml
index e22d54adb84..b3db25d8d8d 100644
--- a/rm-datasource/pom.xml
+++ b/rm-datasource/pom.xml
@@ -168,5 +168,10 @@
json-common
${project.version}
+
+ ${project.groupId}
+ seata-metrics-core
+ ${project.version}
+
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/cache/ClickhouseTableMetaCache.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/cache/ClickhouseTableMetaCache.java
new file mode 100644
index 00000000000..308e0e6b542
--- /dev/null
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/sql/struct/cache/ClickhouseTableMetaCache.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.sql.struct.cache;
+
+import org.apache.seata.common.exception.NotSupportYetException;
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.sqlparser.struct.ColumnMeta;
+import org.apache.seata.sqlparser.struct.IndexMeta;
+import org.apache.seata.sqlparser.struct.IndexType;
+import org.apache.seata.sqlparser.struct.TableMeta;
+import org.apache.seata.sqlparser.util.ColumnUtils;
+import org.apache.seata.sqlparser.util.JdbcConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * The type Table meta cache for ClickHouse.
+ *
+ */
+@LoadLevel(name = JdbcConstants.CLICKHOUSE)
+public class ClickhouseTableMetaCache extends AbstractTableMetaCache {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Override
+ protected String getCacheKey(Connection connection, String tableName, String resourceId) {
+ StringBuilder cacheKey = new StringBuilder(resourceId);
+ cacheKey.append(".");
+ // Remove escape characters from table name
+ String defaultTableName = ColumnUtils.delEscape(tableName, JdbcConstants.CLICKHOUSE);
+
+ DatabaseMetaData databaseMetaData;
+ try {
+ databaseMetaData = connection.getMetaData();
+ } catch (SQLException e) {
+ logger.error("Could not get connection, use default cache key {}", e.getMessage(), e);
+ return cacheKey.append(defaultTableName).toString();
+ }
+
+ try {
+ // prevent duplicated cache key
+ if (databaseMetaData.supportsMixedCaseIdentifiers()) {
+ cacheKey.append(defaultTableName);
+ } else {
+ cacheKey.append(defaultTableName.toLowerCase());
+ }
+ } catch (SQLException e) {
+ logger.error(
+ "Could not get supportsMixedCaseIdentifiers in connection metadata, use default cache key {}",
+ e.getMessage(),
+ e);
+ return cacheKey.append(defaultTableName).toString();
+ }
+
+ return cacheKey.toString();
+ }
+
+ @Override
+ protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
+ String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.CLICKHOUSE) + " LIMIT 1";
+ try (PreparedStatement ps = connection.prepareStatement(sql);
+ ResultSet rs = ps.executeQuery()) {
+ return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
+ } catch (SQLException sqlEx) {
+ throw sqlEx;
+ } catch (Exception e) {
+ throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
+ }
+ }
+
+ protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd, String originalTableName)
+ throws SQLException {
+ // default to "" for clickhouse if schema is unknown
+ String schemaName = rsmd.getSchemaName(1);
+ String catalogName = rsmd.getCatalogName(1);
+ String tableName = rsmd.getTableName(1);
+
+ TableMeta tm = new TableMeta();
+ tm.setTableName(tableName);
+ tm.setCaseSensitive(true);
+ tm.setOriginalTableName(originalTableName);
+
+ try (ResultSet rsColumns = dbmd.getColumns(catalogName, schemaName, tableName, "%");
+ ResultSet rsIndex = dbmd.getIndexInfo(catalogName, schemaName, tableName, false, true);
+ ResultSet onUpdateColumns = dbmd.getVersionColumns(catalogName, schemaName, tableName)) {
+ while (rsColumns.next()) {
+ ColumnMeta col = new ColumnMeta();
+ col.setTableCat(rsColumns.getString("TABLE_CAT"));
+ col.setTableSchemaName(rsColumns.getString("TABLE_SCHEM"));
+ col.setTableName(rsColumns.getString("TABLE_NAME"));
+ col.setColumnName(rsColumns.getString("COLUMN_NAME"));
+ col.setDataType(rsColumns.getInt("DATA_TYPE"));
+ col.setDataTypeName(rsColumns.getString("TYPE_NAME"));
+ col.setColumnSize(rsColumns.getInt("COLUMN_SIZE"));
+ col.setDecimalDigits(rsColumns.getInt("DECIMAL_DIGITS"));
+ col.setNumPrecRadix(rsColumns.getInt("NUM_PREC_RADIX"));
+ col.setNullAble(rsColumns.getInt("NULLABLE"));
+ col.setRemarks(rsColumns.getString("REMARKS"));
+ col.setColumnDef(rsColumns.getString("COLUMN_DEF"));
+ col.setSqlDataType(rsColumns.getInt("SQL_DATA_TYPE"));
+ col.setSqlDatetimeSub(rsColumns.getInt("SQL_DATETIME_SUB"));
+ col.setCharOctetLength(rsColumns.getInt("CHAR_OCTET_LENGTH"));
+ col.setOrdinalPosition(rsColumns.getInt("ORDINAL_POSITION"));
+ col.setIsNullAble(rsColumns.getString("IS_NULLABLE"));
+ col.setIsAutoincrement(rsColumns.getString("IS_AUTOINCREMENT"));
+ col.setCaseSensitive(rsmd.isCaseSensitive(col.getOrdinalPosition()));
+
+ if (tm.getAllColumns().containsKey(col.getColumnName())) {
+ throw new NotSupportYetException(
+ "Not support the table has the same column name with different case yet");
+ }
+ tm.getAllColumns().put(col.getColumnName(), col);
+ }
+
+ while (onUpdateColumns.next()) {
+ ColumnMeta col = tm.getAllColumns().get(onUpdateColumns.getString("COLUMN_NAME"));
+ if (col != null) {
+ col.setOnUpdate(true);
+ }
+ }
+
+ while (rsIndex.next()) {
+ String indexName = rsIndex.getString("INDEX_NAME");
+ String colName = rsIndex.getString("COLUMN_NAME");
+ ColumnMeta col = tm.getAllColumns().get(colName);
+ if (col == null) {
+ continue;
+ }
+
+ if (tm.getAllIndexes().containsKey(indexName)) {
+ IndexMeta index = tm.getAllIndexes().get(indexName);
+ index.getValues().add(col);
+ } else {
+ IndexMeta index = new IndexMeta();
+ index.setIndexName(indexName);
+ index.setNonUnique(rsIndex.getBoolean("NON_UNIQUE"));
+ index.setIndexQualifier(rsIndex.getString("INDEX_QUALIFIER"));
+ index.setIndexName(rsIndex.getString("INDEX_NAME"));
+ index.setType(rsIndex.getShort("TYPE"));
+ index.setOrdinalPosition(rsIndex.getShort("ORDINAL_POSITION"));
+ index.setAscOrDesc(rsIndex.getString("ASC_OR_DESC"));
+ index.setCardinality(rsIndex.getLong("CARDINALITY"));
+ index.getValues().add(col);
+ if ("PRIMARY".equalsIgnoreCase(indexName)) {
+ index.setIndextype(IndexType.PRIMARY);
+ } else if (!index.isNonUnique()) {
+ index.setIndextype(IndexType.UNIQUE);
+ } else {
+ index.setIndextype(IndexType.NORMAL);
+ }
+ tm.getAllIndexes().put(indexName, index);
+ }
+ }
+ // Note: ClickHouse sometimes returns empty indexes for certain engines. We default the primary key to the
+ // single column if none found below.
+ if (tm.getAllIndexes().isEmpty() && !tm.getAllColumns().isEmpty()) {
+ logger.warn("No indexes found for table {} in ClickHouse. This may affect AT mode performance or correctness.", tableName);
+ }
+ }
+ return tm;
+ }
+}
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java
index 2d9e525fd3f..6d9dc2aa50f 100644
--- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/AbstractUndoLogManager.java
@@ -28,6 +28,8 @@
import org.apache.seata.core.exception.BranchTransactionException;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.rpc.processor.Pair;
+import org.apache.seata.metrics.registry.Registry;
+import org.apache.seata.metrics.registry.RegistryFactory;
import org.apache.seata.rm.datasource.ConnectionContext;
import org.apache.seata.rm.datasource.ConnectionProxy;
import org.apache.seata.rm.datasource.DataSourceProxy;
@@ -47,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_ENABLE;
import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_THRESHOLD;
@@ -130,6 +133,7 @@ public static void removeCurrentSerializer() {
*/
@Override
public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException {
+ long start = System.nanoTime();
try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL);
PreparedStatement deleteSubPST = conn.prepareStatement(DELETE_SUB_UNDO_LOG_SQL)) {
deletePST.setLong(1, branchId);
@@ -144,6 +148,14 @@ public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQL
e = new SQLException(e);
}
throw (SQLException) e;
+ } finally {
+ Registry registry = getRegistry();
+ if (registry != null) {
+ registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)
+ .record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
+ registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)
+ .increase(1);
+ }
}
}
@@ -159,6 +171,8 @@ public void batchDeleteUndoLog(Set xids, Set branchIds, Connection
if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}
+ long start = System.nanoTime();
+ int totalDeleteRows = 0;
int xidSize = xids.size();
int branchIdSize = branchIds.size();
String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
@@ -178,6 +192,7 @@ public void batchDeleteUndoLog(Set xids, Set branchIds, Connection
paramsIndex++;
}
int deleteRows = deletePST.executeUpdate();
+ totalDeleteRows += deleteRows;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("batch delete undo log size {}", deleteRows);
}
@@ -190,6 +205,16 @@ public void batchDeleteUndoLog(Set xids, Set branchIds, Connection
e = new SQLException(e);
}
throw (SQLException) e;
+ } finally {
+ Registry registry = getRegistry();
+ if (registry != null) {
+ registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)
+ .record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
+ if (totalDeleteRows > 0) {
+ registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)
+ .increase(totalDeleteRows);
+ }
+ }
}
}
@@ -300,6 +325,10 @@ public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
String rollbackCtx =
buildContext(parser.getName(), compressorType, UndoLogConstants.MAX_ALLOWED_PACKET, maxAllowedPacket);
insertUndoLogWithNormal(xid, branchId, rollbackCtx, undoLogContent, cp.getTargetConnection());
+ Registry registry = getRegistry();
+ if (registry != null) {
+ registry.getSummary(UndoLogConstants.SUMMARY_UNDO_LOG_SIZE).increase(undoLogContent.length);
+ }
}
/**
@@ -586,4 +615,19 @@ public boolean hasUndoLogTable(Connection conn) {
protected String getCheckUndoLogTableExistSql() {
return CHECK_UNDO_LOG_TABLE_EXIST_SQL;
}
+
+ /**
+ * Get the metrics registry instance.
+ * This method is protected to allow testing with Mockito spy.
+ *
+ * @return the Registry instance, or null if metrics are not enabled
+ */
+ protected Registry getRegistry() {
+ try {
+ return RegistryFactory.getInstance();
+ } catch (Throwable t) {
+ LOGGER.warn("Failed to get metrics registry: {}", t.getMessage());
+ return null;
+ }
+ }
}
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java
index 9585a28ff0b..2158fab620e 100644
--- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/UndoLogConstants.java
@@ -18,6 +18,8 @@
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
+import org.apache.seata.metrics.Id;
+import org.apache.seata.metrics.IdConstants;
import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION;
@@ -38,4 +40,19 @@ public interface UndoLogConstants {
String SUB_SPLIT_KEY = ",";
String MAX_ALLOWED_PACKET = "map";
+
+ Id SUMMARY_UNDO_LOG_SIZE = new Id("seata.undo.log")
+ .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
+ .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY)
+ .withTag("type", "size");
+
+ Id TIMER_UNDO_LOG_DELETE_LATENCY = new Id("seata.undo.log")
+ .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
+ .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
+ .withTag("type", "delete_latency");
+
+ Id COUNTER_UNDO_LOG_DELETE_COUNT = new Id("seata.undo.log")
+ .withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
+ .withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_COUNTER)
+ .withTag("type", "delete_count");
}
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoDeleteExecutor.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoDeleteExecutor.java
new file mode 100644
index 00000000000..fb093802aab
--- /dev/null
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoDeleteExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.common.exception.ShouldNeverHappenException;
+import org.apache.seata.common.util.CollectionUtils;
+import org.apache.seata.rm.datasource.sql.struct.Field;
+import org.apache.seata.rm.datasource.sql.struct.Row;
+import org.apache.seata.rm.datasource.sql.struct.TableRecords;
+import org.apache.seata.rm.datasource.undo.AbstractUndoExecutor;
+import org.apache.seata.rm.datasource.undo.SQLUndoLog;
+import org.apache.seata.sqlparser.util.ColumnUtils;
+import org.apache.seata.sqlparser.util.JdbcConstants;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The type Clickhouse undo delete executor.
+ *
+ */
+public class ClickhouseUndoDeleteExecutor extends AbstractUndoExecutor {
+
+ /**
+ * Instantiates a new Clickhouse undo delete executor.
+ *
+ * @param sqlUndoLog the sql undo log
+ */
+ public ClickhouseUndoDeleteExecutor(SQLUndoLog sqlUndoLog) {
+ super(sqlUndoLog);
+ }
+
+ /**
+ * INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?)
+ */
+ private static final String INSERT_SQL_TEMPLATE = "INSERT INTO %s (%s) VALUES (%s)";
+
+ /**
+ * Undo delete.
+ *
+ * Notice: PK is at last one.
+ * @see AbstractUndoExecutor#undoPrepare
+ *
+ * @return sql
+ */
+ @Override
+ protected String buildUndoSQL() {
+ TableRecords beforeImage = sqlUndoLog.getBeforeImage();
+ List beforeImageRows = beforeImage.getRows();
+ if (CollectionUtils.isEmpty(beforeImageRows)) {
+ throw new ShouldNeverHappenException("Invalid UNDO LOG");
+ }
+ Row row = beforeImageRows.get(0);
+ List fields = new ArrayList<>(row.nonPrimaryKeys());
+ fields.addAll(getOrderedPkList(beforeImage, row, JdbcConstants.CLICKHOUSE));
+
+ String insertColumns = fields.stream()
+ .map(field -> ColumnUtils.addEscape(field.getName(), JdbcConstants.CLICKHOUSE))
+ .collect(Collectors.joining(", "));
+ String insertValues = fields.stream().map(field -> "?").collect(Collectors.joining(", "));
+
+ return String.format(INSERT_SQL_TEMPLATE, sqlUndoLog.getTableName(), insertColumns, insertValues);
+ }
+
+ @Override
+ protected TableRecords getUndoRows() {
+ return sqlUndoLog.getBeforeImage();
+ }
+}
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoExecutorHolder.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoExecutorHolder.java
new file mode 100644
index 00000000000..c3097e9da19
--- /dev/null
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoExecutorHolder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.rm.datasource.undo.AbstractUndoExecutor;
+import org.apache.seata.rm.datasource.undo.SQLUndoLog;
+import org.apache.seata.rm.datasource.undo.UndoExecutorHolder;
+import org.apache.seata.sqlparser.util.JdbcConstants;
+
+/**
+ * The Type ClickhouseUndoExecutorHolder
+ *
+ */
+@LoadLevel(name = JdbcConstants.CLICKHOUSE)
+public class ClickhouseUndoExecutorHolder implements UndoExecutorHolder {
+
+ @Override
+ public AbstractUndoExecutor getInsertExecutor(SQLUndoLog sqlUndoLog) {
+ return new ClickhouseUndoInsertExecutor(sqlUndoLog);
+ }
+
+ @Override
+ public AbstractUndoExecutor getUpdateExecutor(SQLUndoLog sqlUndoLog) {
+ return new ClickhouseUndoUpdateExecutor(sqlUndoLog);
+ }
+
+ @Override
+ public AbstractUndoExecutor getDeleteExecutor(SQLUndoLog sqlUndoLog) {
+ return new ClickhouseUndoDeleteExecutor(sqlUndoLog);
+ }
+}
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoInsertExecutor.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoInsertExecutor.java
new file mode 100644
index 00000000000..e075f18151f
--- /dev/null
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoInsertExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.common.exception.ShouldNeverHappenException;
+import org.apache.seata.common.util.CollectionUtils;
+import org.apache.seata.rm.datasource.SqlGenerateUtils;
+import org.apache.seata.rm.datasource.sql.struct.Field;
+import org.apache.seata.rm.datasource.sql.struct.Row;
+import org.apache.seata.rm.datasource.sql.struct.TableRecords;
+import org.apache.seata.rm.datasource.undo.AbstractUndoExecutor;
+import org.apache.seata.rm.datasource.undo.SQLUndoLog;
+import org.apache.seata.sqlparser.util.JdbcConstants;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The type ClickHouse undo insert executor.
+ *
+ */
+public class ClickhouseUndoInsertExecutor extends AbstractUndoExecutor {
+
+ /**
+ * ALTER TABLE a DELETE WHERE pk = ?
+ */
+ private static final String DELETE_SQL_TEMPLATE = "ALTER TABLE %s DELETE WHERE %s ";
+
+ /**
+ * Undo Insert.
+ *
+ * @return sql
+ */
+ @Override
+ protected String buildUndoSQL() {
+ TableRecords afterImage = sqlUndoLog.getAfterImage();
+ List afterImageRows = afterImage.getRows();
+ if (CollectionUtils.isEmpty(afterImageRows)) {
+ throw new ShouldNeverHappenException("Invalid UNDO LOG");
+ }
+ return generateDeleteSql(afterImageRows, afterImage);
+ }
+
+ @Override
+ protected void undoPrepare(PreparedStatement undoPST, ArrayList undoValues, List pkValueList)
+ throws SQLException {
+ int undoIndex = 0;
+ for (Field pkField : pkValueList) {
+ undoIndex++;
+ undoPST.setObject(undoIndex, pkField.getValue(), pkField.getType());
+ }
+ }
+
+ private String generateDeleteSql(List rows, TableRecords afterImage) {
+ List pkNameList = getOrderedPkList(afterImage, rows.get(0), JdbcConstants.CLICKHOUSE).stream()
+ .map(e -> e.getName())
+ .collect(Collectors.toList());
+ String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(pkNameList, JdbcConstants.CLICKHOUSE);
+ return String.format(DELETE_SQL_TEMPLATE, sqlUndoLog.getTableName(), whereSql);
+ }
+
+ /**
+ * Instantiates a new ClickHouse undo insert executor.
+ *
+ * @param sqlUndoLog the sql undo log
+ */
+ public ClickhouseUndoInsertExecutor(SQLUndoLog sqlUndoLog) {
+ super(sqlUndoLog);
+ }
+
+ @Override
+ protected TableRecords getUndoRows() {
+ return sqlUndoLog.getAfterImage();
+ }
+}
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoLogManager.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoLogManager.java
new file mode 100644
index 00000000000..63f30e0243d
--- /dev/null
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoLogManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.common.loader.LoadLevel;
+import org.apache.seata.core.constants.ClientTableColumnsName;
+import org.apache.seata.rm.datasource.undo.mysql.MySQLUndoLogManager;
+import org.apache.seata.sqlparser.util.JdbcConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Date;
+
+@LoadLevel(name = JdbcConstants.CLICKHOUSE)
+public class ClickhouseUndoLogManager extends MySQLUndoLogManager {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ // ClickHouse uses ALTER TABLE ... DELETE instead of standard DELETE FROM
+ private static final String DELETE_UNDO_LOG_BY_CREATE_SQL = "ALTER TABLE " + UNDO_LOG_TABLE_NAME + " DELETE WHERE "
+ + ClientTableColumnsName.UNDO_LOG_LOG_CREATED + " <= ?";
+
+ @Override
+ public int deleteUndoLogByLogCreated(Date logCreated, int limitRows, Connection conn) throws SQLException {
+ if (logCreated == null) {
+ return 0;
+ }
+ try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_BY_CREATE_SQL)) {
+ // Clickhouse doesn't natively support LIMIT in ALTER TABLE ... DELETE easily inside prepared statements
+ // like MySQL so we omit the LIMIT parameter for log sweeping.
+ if (limitRows > 0 && logger.isDebugEnabled()) {
+ logger.debug("ClickHouse log sweeping ignores limitRows: {}", limitRows);
+ }
+ deletePST.setTimestamp(1, new java.sql.Timestamp(logCreated.getTime()));
+ int deleteRows = deletePST.executeUpdate();
+ if (logger.isDebugEnabled()) {
+ logger.debug("batch delete undo log for clickhouse, affected rows: {}", deleteRows);
+ }
+ return deleteRows;
+ } catch (Exception e) {
+ if (!(e instanceof SQLException)) {
+ e = new SQLException(e);
+ }
+ throw (SQLException) e;
+ }
+ }
+}
diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoUpdateExecutor.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoUpdateExecutor.java
new file mode 100644
index 00000000000..109195c0003
--- /dev/null
+++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoUpdateExecutor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.common.exception.ShouldNeverHappenException;
+import org.apache.seata.common.util.CollectionUtils;
+import org.apache.seata.rm.datasource.SqlGenerateUtils;
+import org.apache.seata.rm.datasource.sql.struct.Field;
+import org.apache.seata.rm.datasource.sql.struct.Row;
+import org.apache.seata.rm.datasource.sql.struct.TableRecords;
+import org.apache.seata.rm.datasource.undo.AbstractUndoExecutor;
+import org.apache.seata.rm.datasource.undo.SQLUndoLog;
+import org.apache.seata.sqlparser.util.ColumnUtils;
+import org.apache.seata.sqlparser.util.JdbcConstants;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * The type Clickhouse undo update executor.
+ *
+ */
+public class ClickhouseUndoUpdateExecutor extends AbstractUndoExecutor {
+
+ /**
+ * ALTER TABLE a UPDATE x = ?, y = ?, z = ? WHERE pk1 = ? and pk2 = ?
+ */
+ private static final String UPDATE_SQL_TEMPLATE = "ALTER TABLE %s UPDATE %s WHERE %s ";
+
+ /**
+ * Undo Update.
+ *
+ * @return sql
+ */
+ @Override
+ protected String buildUndoSQL() {
+ TableRecords beforeImage = sqlUndoLog.getBeforeImage();
+ List beforeImageRows = beforeImage.getRows();
+ if (CollectionUtils.isEmpty(beforeImageRows)) {
+ throw new ShouldNeverHappenException("Invalid UNDO LOG");
+ }
+ Row row = beforeImageRows.get(0);
+
+ List nonPkFields = row.nonPrimaryKeys();
+ String updateColumns = nonPkFields.stream()
+ .map(field -> {
+ String addEscape = ColumnUtils.addEscape(field.getName(), JdbcConstants.CLICKHOUSE);
+ return addEscape + " = ?";
+ })
+ .collect(Collectors.joining(", "));
+
+ List pkNameList = getOrderedPkList(beforeImage, row, JdbcConstants.CLICKHOUSE).stream()
+ .map(e -> e.getName())
+ .collect(Collectors.toList());
+ String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(pkNameList, JdbcConstants.CLICKHOUSE);
+
+ return String.format(UPDATE_SQL_TEMPLATE, sqlUndoLog.getTableName(), updateColumns, whereSql);
+ }
+
+ /**
+ * Instantiates a new Clickhouse undo update executor.
+ *
+ * @param sqlUndoLog the sql undo log
+ */
+ public ClickhouseUndoUpdateExecutor(SQLUndoLog sqlUndoLog) {
+ super(sqlUndoLog);
+ }
+
+ @Override
+ protected TableRecords getUndoRows() {
+ return sqlUndoLog.getBeforeImage();
+ }
+}
diff --git a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoExecutorHolder b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoExecutorHolder
index ca85fa8c3cc..d10b1875ac6 100644
--- a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoExecutorHolder
+++ b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoExecutorHolder
@@ -24,3 +24,4 @@ org.apache.seata.rm.datasource.undo.polardbx.PolarDBXUndoExecutorHolder
org.apache.seata.rm.datasource.undo.dm.DmUndoExecutorHolder
org.apache.seata.rm.datasource.undo.oscar.OscarUndoExecutorHolder
org.apache.seata.rm.datasource.undo.kingbase.KingbaseUndoExecutorHolder
+org.apache.seata.rm.datasource.undo.clickhouse.ClickhouseUndoExecutorHolder
\ No newline at end of file
diff --git a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogManager b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogManager
index e7a79f04751..0d43afe20eb 100644
--- a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogManager
+++ b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.rm.datasource.undo.UndoLogManager
@@ -24,3 +24,4 @@ org.apache.seata.rm.datasource.undo.polardbx.PolarDBXUndoLogManager
org.apache.seata.rm.datasource.undo.dm.DmUndoLogManager
org.apache.seata.rm.datasource.undo.oscar.OscarUndoLogManager
org.apache.seata.rm.datasource.undo.kingbase.KingbaseUndoLogManager
+org.apache.seata.rm.datasource.undo.clickhouse.ClickhouseUndoLogManager
\ No newline at end of file
diff --git a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.sqlparser.struct.TableMetaCache b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.sqlparser.struct.TableMetaCache
index 16f453326e0..1b6e51b4d53 100644
--- a/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.sqlparser.struct.TableMetaCache
+++ b/rm-datasource/src/main/resources/META-INF/services/org.apache.seata.sqlparser.struct.TableMetaCache
@@ -24,3 +24,4 @@ org.apache.seata.rm.datasource.sql.struct.cache.PolarDBXTableMetaCache
org.apache.seata.rm.datasource.sql.struct.cache.DmTableMetaCache
org.apache.seata.rm.datasource.sql.struct.cache.OscarTableMetaCache
org.apache.seata.rm.datasource.sql.struct.cache.KingbaseTableMetaCache
+org.apache.seata.rm.datasource.sql.struct.cache.ClickhouseTableMetaCache
\ No newline at end of file
diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/UndoLogMetricsTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/UndoLogMetricsTest.java
new file mode 100644
index 00000000000..0f0da2f509e
--- /dev/null
+++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/UndoLogMetricsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo;
+
+import org.apache.seata.metrics.Id;
+import org.apache.seata.metrics.registry.Registry;
+import org.apache.seata.rm.datasource.ConnectionContext;
+import org.apache.seata.rm.datasource.ConnectionProxy;
+import org.apache.seata.rm.datasource.DataSourceProxy;
+import org.apache.seata.rm.datasource.undo.mysql.MySQLUndoLogManager;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class UndoLogMetricsTest {
+
+ @Test
+ public void testFlushUndoLogsMetrics() throws Exception {
+ Registry registry = mock(Registry.class);
+ org.apache.seata.metrics.Summary summary = mock(org.apache.seata.metrics.Summary.class);
+ when(registry.getSummary(any(Id.class))).thenReturn(summary);
+
+ ConnectionProxy connectionProxy = mock(ConnectionProxy.class);
+ ConnectionContext context = mock(ConnectionContext.class);
+ DataSourceProxy dataSourceProxy = mock(DataSourceProxy.class);
+
+ when(connectionProxy.getContext()).thenReturn(context);
+ when(connectionProxy.getDataSourceProxy()).thenReturn(dataSourceProxy);
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement);
+ when(connectionProxy.getTargetConnection()).thenReturn(connection);
+ when(dataSourceProxy.getResourceId()).thenReturn("jdbc:mysql://localhost:3306/test");
+
+ when(context.hasUndoLog()).thenReturn(true);
+ when(context.getXid()).thenReturn("xid");
+ when(context.getBranchId()).thenReturn(123456L);
+ when(context.getUndoItems()).thenReturn(Collections.emptyList());
+
+ // Spy on the manager to mock getRegistry()
+ MySQLUndoLogManager manager = spy(new MySQLUndoLogManager());
+ doReturn(registry).when(manager).getRegistry();
+
+ manager.flushUndoLogs(connectionProxy);
+
+ verify(registry).getSummary(eq(UndoLogConstants.SUMMARY_UNDO_LOG_SIZE));
+ verify(summary).increase(anyLong());
+ }
+
+ @Test
+ public void testDeleteUndoLogMetrics() throws Exception {
+ Registry registry = mock(Registry.class);
+ org.apache.seata.metrics.Timer timer = mock(org.apache.seata.metrics.Timer.class);
+ org.apache.seata.metrics.Counter counter = mock(org.apache.seata.metrics.Counter.class);
+
+ when(registry.getTimer(any(Id.class))).thenReturn(timer);
+ when(registry.getCounter(any(Id.class))).thenReturn(counter);
+
+ Connection connection = mock(Connection.class);
+ PreparedStatement preparedStatement = mock(PreparedStatement.class);
+ when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement);
+
+ // Spy on the manager to mock getRegistry()
+ MySQLUndoLogManager manager = spy(new MySQLUndoLogManager());
+ doReturn(registry).when(manager).getRegistry();
+
+ manager.deleteUndoLog("xid", 123L, connection);
+
+ verify(registry).getTimer(eq(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY));
+ verify(timer).record(anyLong(), any(TimeUnit.class));
+ verify(registry).getCounter(eq(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT));
+ verify(counter).increase(1);
+ }
+}
diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoDeleteExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoDeleteExecutorTest.java
new file mode 100644
index 00000000000..a08217fd600
--- /dev/null
+++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoDeleteExecutorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.rm.datasource.sql.struct.Field;
+import org.apache.seata.rm.datasource.sql.struct.Row;
+import org.apache.seata.rm.datasource.sql.struct.TableMeta;
+import org.apache.seata.rm.datasource.sql.struct.TableRecords;
+import org.apache.seata.rm.datasource.undo.SQLUndoLog;
+import org.apache.seata.sqlparser.struct.ColumnMeta;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ClickhouseUndoDeleteExecutorTest {
+
+ @Test
+ public void testBuildUndoSQL() {
+ SQLUndoLog sqlUndoLog = new SQLUndoLog();
+ sqlUndoLog.setTableName("t1");
+ TableMeta tableMeta = Mockito.mock(TableMeta.class);
+ Mockito.when(tableMeta.getTableName()).thenReturn("t1");
+
+ ColumnMeta pkMeta = new ColumnMeta();
+ pkMeta.setColumnName("id");
+ pkMeta.setDataType(Types.INTEGER);
+ Mockito.when(tableMeta.getPrimaryKeyOnlyName()).thenReturn(Arrays.asList("id"));
+
+ TableRecords beforeImage = new TableRecords();
+ beforeImage.setTableName("t1");
+ beforeImage.setTableMeta(tableMeta);
+
+ Row row = new Row();
+ Field pkField = new Field("id", Types.INTEGER, 1);
+ Field nameField = new Field("name", Types.VARCHAR, "test_deleted");
+ row.add(pkField);
+ row.add(nameField);
+
+ List rows = new ArrayList<>();
+ rows.add(row);
+ beforeImage.setRows(rows);
+ sqlUndoLog.setBeforeImage(beforeImage);
+
+ ClickhouseUndoDeleteExecutor executor = new ClickhouseUndoDeleteExecutor(sqlUndoLog);
+ String sql = executor.buildUndoSQL();
+ Assertions.assertEquals("INSERT INTO t1 (name, id) VALUES (?, ?)", sql.trim());
+ }
+}
diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoInsertExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoInsertExecutorTest.java
new file mode 100644
index 00000000000..1c7e3b6fe0a
--- /dev/null
+++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoInsertExecutorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.rm.datasource.sql.struct.Field;
+import org.apache.seata.rm.datasource.sql.struct.Row;
+import org.apache.seata.rm.datasource.sql.struct.TableMeta;
+import org.apache.seata.rm.datasource.sql.struct.TableRecords;
+import org.apache.seata.rm.datasource.undo.SQLUndoLog;
+import org.apache.seata.sqlparser.struct.ColumnMeta;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ClickhouseUndoInsertExecutorTest {
+
+ @Test
+ public void testBuildUndoSQL() {
+ SQLUndoLog sqlUndoLog = new SQLUndoLog();
+ sqlUndoLog.setTableName("t1");
+ TableMeta tableMeta = Mockito.mock(TableMeta.class);
+ Mockito.when(tableMeta.getTableName()).thenReturn("t1");
+
+ ColumnMeta pkMeta = new ColumnMeta();
+ pkMeta.setColumnName("id");
+ pkMeta.setDataType(Types.INTEGER);
+ Mockito.when(tableMeta.getPrimaryKeyOnlyName()).thenReturn(Arrays.asList("id"));
+
+ TableRecords afterImage = new TableRecords();
+ afterImage.setTableName("t1");
+ afterImage.setTableMeta(tableMeta);
+
+ Row row = new Row();
+ Field pkField = new Field("id", Types.INTEGER, 1);
+ Field nameField = new Field("name", Types.VARCHAR, "test");
+ row.add(pkField);
+ row.add(nameField);
+
+ List rows = new ArrayList<>();
+ rows.add(row);
+ afterImage.setRows(rows);
+ sqlUndoLog.setAfterImage(afterImage);
+
+ ClickhouseUndoInsertExecutor executor = new ClickhouseUndoInsertExecutor(sqlUndoLog);
+ String sql = executor.buildUndoSQL();
+ Assertions.assertEquals("ALTER TABLE t1 DELETE WHERE id = ?", sql.trim());
+ }
+}
diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoUpdateExecutorTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoUpdateExecutorTest.java
new file mode 100644
index 00000000000..321a94a3f7d
--- /dev/null
+++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/clickhouse/ClickhouseUndoUpdateExecutorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.rm.datasource.undo.clickhouse;
+
+import org.apache.seata.rm.datasource.sql.struct.Field;
+import org.apache.seata.rm.datasource.sql.struct.Row;
+import org.apache.seata.rm.datasource.sql.struct.TableMeta;
+import org.apache.seata.rm.datasource.sql.struct.TableRecords;
+import org.apache.seata.rm.datasource.undo.SQLUndoLog;
+import org.apache.seata.sqlparser.struct.ColumnMeta;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class ClickhouseUndoUpdateExecutorTest {
+
+ @Test
+ public void testBuildUndoSQL() {
+ SQLUndoLog sqlUndoLog = new SQLUndoLog();
+ sqlUndoLog.setTableName("t1");
+ TableMeta tableMeta = Mockito.mock(TableMeta.class);
+ Mockito.when(tableMeta.getTableName()).thenReturn("t1");
+
+ ColumnMeta pkMeta = new ColumnMeta();
+ pkMeta.setColumnName("id");
+ pkMeta.setDataType(Types.INTEGER);
+ Mockito.when(tableMeta.getPrimaryKeyOnlyName()).thenReturn(Arrays.asList("id"));
+
+ TableRecords beforeImage = new TableRecords();
+ beforeImage.setTableName("t1");
+ beforeImage.setTableMeta(tableMeta);
+
+ Row row = new Row();
+ Field pkField = new Field("id", Types.INTEGER, 1);
+ Field nameField = new Field("name", Types.VARCHAR, "test_old");
+ row.add(pkField);
+ row.add(nameField);
+
+ List rows = new ArrayList<>();
+ rows.add(row);
+ beforeImage.setRows(rows);
+ sqlUndoLog.setBeforeImage(beforeImage);
+
+ ClickhouseUndoUpdateExecutor executor = new ClickhouseUndoUpdateExecutor(sqlUndoLog);
+ String sql = executor.buildUndoSQL();
+ Assertions.assertEquals("ALTER TABLE t1 UPDATE name = ? WHERE id = ?", sql.trim());
+ }
+}
diff --git a/script/client/at/db/clickhouse.sql b/script/client/at/db/clickhouse.sql
new file mode 100644
index 00000000000..44df21902e4
--- /dev/null
+++ b/script/client/at/db/clickhouse.sql
@@ -0,0 +1,31 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- for AT mode you must to init this sql for you business database. the seata server not need it.
+CREATE TABLE IF NOT EXISTS undo_log
+(
+ branch_id Int64,
+ xid String,
+ context String,
+ rollback_info String,
+ log_status Int32,
+ log_created DateTime64(6),
+ log_modified DateTime64(6)
+) ENGINE = MergeTree()
+ORDER BY (xid, branch_id)
+PRIMARY KEY (xid, branch_id)
+COMMENT 'AT transaction mode undo table';