Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
01fcee0
fix #7963 ClusterControllerTest.watch_withHttp2: use 30000ms timeout
Sumit6307 Jan 29, 2026
1cee03b
feat: add observability metrics for undo log manager
Sumit6307 Feb 2, 2026
628607a
fix(ci): add license header and apply spotless formatting
Sumit6307 Feb 2, 2026
46f4069
test: refactor to use spy instead of mockStatic for better compatibility
Sumit6307 Feb 2, 2026
9475b39
fix: add getRegistry() method for testability - fixes CI build failures
Sumit6307 Feb 2, 2026
0242a9e
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Feb 2, 2026
492578a
feat: add observability metrics for undo log manager
Sumit6307 Feb 5, 2026
e3afae6
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Feb 5, 2026
3b90394
fix: safe access to RegistryFactory to prevent CI failures
Sumit6307 Feb 5, 2026
d5770c2
feat: Add ClickHouse AT mode support
Sumit6307 Mar 4, 2026
9563d52
Merge branch '2.x' into feature/gsoc-clickhouse-at-mode
Sumit6307 Mar 4, 2026
d0ac202
style: apply spotless formatting and fix CI
Sumit6307 Mar 4, 2026
0ec31f4
Merge remote changes
Sumit6307 Mar 4, 2026
5dd7001
style: fix spotless formatting in ClusterControllerTest
Sumit6307 Mar 4, 2026
5992e11
feat: Add ClickHouse undo_log DDL script
Sumit6307 Mar 4, 2026
457a877
fix: repair corrupted SPI files and refine clickhouse.sql
Sumit6307 Mar 4, 2026
6928ec2
fix: defensive null check in UndoLogManager
Sumit6307 Mar 4, 2026
a390909
test: add ClickhouseTableMetaCache test and refine metadata queries
Sumit6307 Mar 5, 2026
de57cd9
Merge branch '2.x' into feature/gsoc-clickhouse-at-mode
Sumit6307 Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rm-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,10 @@
<artifactId>json-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-metrics-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.rm.datasource.sql.struct.cache;

import org.apache.seata.common.exception.NotSupportYetException;
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.sqlparser.struct.ColumnMeta;
import org.apache.seata.sqlparser.struct.IndexMeta;
import org.apache.seata.sqlparser.struct.IndexType;
import org.apache.seata.sqlparser.struct.TableMeta;
import org.apache.seata.sqlparser.util.ColumnUtils;
import org.apache.seata.sqlparser.util.JdbcConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;

/**
* The type Table meta cache for ClickHouse.
*
*/
@LoadLevel(name = JdbcConstants.CLICKHOUSE)
public class ClickhouseTableMetaCache extends AbstractTableMetaCache {

protected final Logger logger = LoggerFactory.getLogger(getClass());

@Override
protected String getCacheKey(Connection connection, String tableName, String resourceId) {
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");
// Remove escape characters from table name
String defaultTableName = ColumnUtils.delEscape(tableName, JdbcConstants.CLICKHOUSE);

DatabaseMetaData databaseMetaData;
try {
databaseMetaData = connection.getMetaData();
} catch (SQLException e) {
logger.error("Could not get connection, use default cache key {}", e.getMessage(), e);
return cacheKey.append(defaultTableName).toString();
}

try {
// prevent duplicated cache key
if (databaseMetaData.supportsMixedCaseIdentifiers()) {
cacheKey.append(defaultTableName);
} else {
cacheKey.append(defaultTableName.toLowerCase());
}
} catch (SQLException e) {
logger.error(
"Could not get supportsMixedCaseIdentifiers in connection metadata, use default cache key {}",
e.getMessage(),
e);
return cacheKey.append(defaultTableName).toString();
}

return cacheKey.toString();
}

@Override
protected TableMeta fetchSchema(Connection connection, String tableName) throws SQLException {
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.CLICKHOUSE) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
}
}

protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd, String originalTableName)
throws SQLException {
// default to "" for clickhouse if schema is unknown
String schemaName = rsmd.getSchemaName(1);
String catalogName = rsmd.getCatalogName(1);
String tableName = rsmd.getTableName(1);

TableMeta tm = new TableMeta();
tm.setTableName(tableName);
tm.setCaseSensitive(true);
tm.setOriginalTableName(originalTableName);

try (ResultSet rsColumns = dbmd.getColumns(catalogName, schemaName, tableName, "%");
ResultSet rsIndex = dbmd.getIndexInfo(catalogName, schemaName, tableName, false, true);
ResultSet onUpdateColumns = dbmd.getVersionColumns(catalogName, schemaName, tableName)) {
while (rsColumns.next()) {
ColumnMeta col = new ColumnMeta();
col.setTableCat(rsColumns.getString("TABLE_CAT"));
col.setTableSchemaName(rsColumns.getString("TABLE_SCHEM"));
col.setTableName(rsColumns.getString("TABLE_NAME"));
col.setColumnName(rsColumns.getString("COLUMN_NAME"));
col.setDataType(rsColumns.getInt("DATA_TYPE"));
col.setDataTypeName(rsColumns.getString("TYPE_NAME"));
col.setColumnSize(rsColumns.getInt("COLUMN_SIZE"));
col.setDecimalDigits(rsColumns.getInt("DECIMAL_DIGITS"));
col.setNumPrecRadix(rsColumns.getInt("NUM_PREC_RADIX"));
col.setNullAble(rsColumns.getInt("NULLABLE"));
col.setRemarks(rsColumns.getString("REMARKS"));
col.setColumnDef(rsColumns.getString("COLUMN_DEF"));
col.setSqlDataType(rsColumns.getInt("SQL_DATA_TYPE"));
col.setSqlDatetimeSub(rsColumns.getInt("SQL_DATETIME_SUB"));
col.setCharOctetLength(rsColumns.getInt("CHAR_OCTET_LENGTH"));
col.setOrdinalPosition(rsColumns.getInt("ORDINAL_POSITION"));
col.setIsNullAble(rsColumns.getString("IS_NULLABLE"));
col.setIsAutoincrement(rsColumns.getString("IS_AUTOINCREMENT"));
col.setCaseSensitive(rsmd.isCaseSensitive(col.getOrdinalPosition()));

if (tm.getAllColumns().containsKey(col.getColumnName())) {
throw new NotSupportYetException(
"Not support the table has the same column name with different case yet");
}
tm.getAllColumns().put(col.getColumnName(), col);
}

while (onUpdateColumns.next()) {
tm.getAllColumns().get(onUpdateColumns.getString("COLUMN_NAME")).setOnUpdate(true);
}

while (rsIndex.next()) {
String indexName = rsIndex.getString("INDEX_NAME");
String colName = rsIndex.getString("COLUMN_NAME");
ColumnMeta col = tm.getAllColumns().get(colName);
if (col == null) {
continue;
}

if (tm.getAllIndexes().containsKey(indexName)) {
IndexMeta index = tm.getAllIndexes().get(indexName);
index.getValues().add(col);
} else {
IndexMeta index = new IndexMeta();
index.setIndexName(indexName);
index.setNonUnique(rsIndex.getBoolean("NON_UNIQUE"));
index.setIndexQualifier(rsIndex.getString("INDEX_QUALIFIER"));
index.setIndexName(rsIndex.getString("INDEX_NAME"));
index.setType(rsIndex.getShort("TYPE"));
index.setOrdinalPosition(rsIndex.getShort("ORDINAL_POSITION"));
index.setAscOrDesc(rsIndex.getString("ASC_OR_DESC"));
index.setCardinality(rsIndex.getLong("CARDINALITY"));
index.getValues().add(col);
if ("PRIMARY".equalsIgnoreCase(indexName)) {
index.setIndextype(IndexType.PRIMARY);
} else if (!index.isNonUnique()) {
index.setIndextype(IndexType.UNIQUE);
} else {
index.setIndextype(IndexType.NORMAL);
}
tm.getAllIndexes().put(indexName, index);
}
}
// Note: ClickHouse sometimes returns empty indexes for certain engines. We default the primary key to the
// single column if none found below.
if (tm.getAllIndexes().isEmpty() && tm.getAllColumns().size() > 0) {
// Clickhouse's JDBC driver sometimes does not return Index metadata correctly.
}
}
return tm;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.seata.core.exception.BranchTransactionException;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.rpc.processor.Pair;
import org.apache.seata.metrics.registry.Registry;
import org.apache.seata.metrics.registry.RegistryFactory;
import org.apache.seata.rm.datasource.ConnectionContext;
import org.apache.seata.rm.datasource.ConnectionProxy;
import org.apache.seata.rm.datasource.DataSourceProxy;
Expand All @@ -47,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_ENABLE;
import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_THRESHOLD;
Expand Down Expand Up @@ -130,6 +133,7 @@ public static void removeCurrentSerializer() {
*/
@Override
public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException {
long start = System.nanoTime();
try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL);
PreparedStatement deleteSubPST = conn.prepareStatement(DELETE_SUB_UNDO_LOG_SQL)) {
deletePST.setLong(1, branchId);
Expand All @@ -144,6 +148,14 @@ public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQL
e = new SQLException(e);
}
throw (SQLException) e;
} finally {
Registry registry = getRegistry();
if (registry != null) {
registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)
.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)
.increase(1);
}
}
}

Expand All @@ -159,6 +171,8 @@ public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection
if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}
long start = System.nanoTime();
int totalDeleteRows = 0;
int xidSize = xids.size();
int branchIdSize = branchIds.size();
String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
Expand All @@ -178,6 +192,7 @@ public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection
paramsIndex++;
}
int deleteRows = deletePST.executeUpdate();
totalDeleteRows += deleteRows;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("batch delete undo log size {}", deleteRows);
}
Expand All @@ -190,6 +205,16 @@ public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection
e = new SQLException(e);
}
throw (SQLException) e;
} finally {
Registry registry = getRegistry();
if (registry != null) {
registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)
.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (totalDeleteRows > 0) {
registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)
.increase(totalDeleteRows);
}
}
}
}

Expand Down Expand Up @@ -300,6 +325,10 @@ public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
String rollbackCtx =
buildContext(parser.getName(), compressorType, UndoLogConstants.MAX_ALLOWED_PACKET, maxAllowedPacket);
insertUndoLogWithNormal(xid, branchId, rollbackCtx, undoLogContent, cp.getTargetConnection());
Registry registry = getRegistry();
if (registry != null) {
registry.getSummary(UndoLogConstants.SUMMARY_UNDO_LOG_SIZE).increase(undoLogContent.length);
}
}

/**
Expand Down Expand Up @@ -586,4 +615,19 @@ public boolean hasUndoLogTable(Connection conn) {
protected String getCheckUndoLogTableExistSql() {
return CHECK_UNDO_LOG_TABLE_EXIST_SQL;
}

/**
* Get the metrics registry instance.
* This method is protected to allow testing with Mockito spy.
*
* @return the Registry instance, or null if metrics are not enabled
*/
protected Registry getRegistry() {
try {
return RegistryFactory.getInstance();
} catch (Throwable t) {
LOGGER.warn("Failed to get metrics registry: {}", t.getMessage());
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.metrics.Id;
import org.apache.seata.metrics.IdConstants;

import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION;

Expand All @@ -38,4 +40,19 @@ public interface UndoLogConstants {
String SUB_SPLIT_KEY = ",";

String MAX_ALLOWED_PACKET = "map";

Id SUMMARY_UNDO_LOG_SIZE = new Id("seata.undo.log")
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY)
.withTag("type", "size");

Id TIMER_UNDO_LOG_DELETE_LATENCY = new Id("seata.undo.log")
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag("type", "delete_latency");

Id COUNTER_UNDO_LOG_DELETE_COUNT = new Id("seata.undo.log")
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_COUNTER)
.withTag("type", "delete_count");
}
Loading
Loading