Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
01fcee0
fix #7963 ClusterControllerTest.watch_withHttp2: use 30000ms timeout
Sumit6307 Jan 29, 2026
1cee03b
feat: add observability metrics for undo log manager
Sumit6307 Feb 2, 2026
628607a
fix(ci): add license header and apply spotless formatting
Sumit6307 Feb 2, 2026
46f4069
test: refactor to use spy instead of mockStatic for better compatibility
Sumit6307 Feb 2, 2026
9475b39
fix: add getRegistry() method for testability - fixes CI build failures
Sumit6307 Feb 2, 2026
0242a9e
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Feb 2, 2026
492578a
feat: add observability metrics for undo log manager
Sumit6307 Feb 5, 2026
e3afae6
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Feb 5, 2026
3b90394
fix: safe access to RegistryFactory to prevent CI failures
Sumit6307 Feb 5, 2026
42d1758
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Mar 5, 2026
49bb1a8
Merge branch '2.x' into feature/undo-log-observability
funky-eyes Mar 17, 2026
e43f3ad
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Mar 25, 2026
ae23264
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Apr 20, 2026
3963d01
feat: optimize metrics registry caching and align counter semantics
Sumit6307 Apr 20, 2026
2ec0203
style: apply spotless code formatting
Sumit6307 Apr 20, 2026
c00fb50
fix: resolve missing metric Id import in test class
Sumit6307 Apr 20, 2026
52754fa
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Apr 23, 2026
37844e3
Merge branch '2.x' into feature/undo-log-observability
Sumit6307 Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rm-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,10 @@
<artifactId>json-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-metrics-core</artifactId>
<version>${project.version}</version>
</dependency>
Comment thread
Sumit6307 marked this conversation as resolved.
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.seata.core.exception.BranchTransactionException;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.rpc.processor.Pair;
import org.apache.seata.metrics.registry.Registry;
import org.apache.seata.metrics.registry.RegistryFactory;
import org.apache.seata.rm.datasource.ConnectionContext;
import org.apache.seata.rm.datasource.ConnectionProxy;
import org.apache.seata.rm.datasource.DataSourceProxy;
Expand All @@ -47,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_ENABLE;
import static org.apache.seata.common.DefaultValues.DEFAULT_CLIENT_UNDO_COMPRESS_THRESHOLD;
Expand Down Expand Up @@ -130,6 +133,7 @@ public static void removeCurrentSerializer() {
*/
@Override
public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQLException {
long start = System.nanoTime();
try (PreparedStatement deletePST = conn.prepareStatement(DELETE_UNDO_LOG_SQL);
PreparedStatement deleteSubPST = conn.prepareStatement(DELETE_SUB_UNDO_LOG_SQL)) {
deletePST.setLong(1, branchId);
Expand All @@ -144,6 +148,14 @@ public void deleteUndoLog(String xid, long branchId, Connection conn) throws SQL
e = new SQLException(e);
}
throw (SQLException) e;
} finally {
Registry registry = getRegistry();
if (registry != null) {
registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)
.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)
.increase(1);
}
}
}

Expand All @@ -159,6 +171,8 @@ public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection
if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
return;
}
long start = System.nanoTime();
int totalDeleteRows = 0;
int xidSize = xids.size();
int branchIdSize = branchIds.size();
String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
Expand All @@ -178,6 +192,7 @@ public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection
paramsIndex++;
}
int deleteRows = deletePST.executeUpdate();
totalDeleteRows += deleteRows;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("batch delete undo log size {}", deleteRows);
}
Expand All @@ -190,6 +205,16 @@ public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection
e = new SQLException(e);
}
throw (SQLException) e;
} finally {
Registry registry = getRegistry();
if (registry != null) {
registry.getTimer(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY)
.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
if (totalDeleteRows > 0) {
registry.getCounter(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT)
.increase(totalDeleteRows);
Comment thread
Sumit6307 marked this conversation as resolved.
Outdated
}
}
}
}

Expand Down Expand Up @@ -300,6 +325,10 @@ public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
String rollbackCtx =
buildContext(parser.getName(), compressorType, UndoLogConstants.MAX_ALLOWED_PACKET, maxAllowedPacket);
insertUndoLogWithNormal(xid, branchId, rollbackCtx, undoLogContent, cp.getTargetConnection());
Registry registry = getRegistry();
if (registry != null) {
registry.getSummary(UndoLogConstants.SUMMARY_UNDO_LOG_SIZE).increase(undoLogContent.length);
}
}

/**
Expand Down Expand Up @@ -586,4 +615,19 @@ public boolean hasUndoLogTable(Connection conn) {
protected String getCheckUndoLogTableExistSql() {
return CHECK_UNDO_LOG_TABLE_EXIST_SQL;
}

/**
* Get the metrics registry instance.
* This method is protected to allow testing with Mockito spy.
*
* @return the Registry instance, or null if metrics are not enabled
*/
protected Registry getRegistry() {
try {
return RegistryFactory.getInstance();
} catch (Throwable t) {
LOGGER.warn("Failed to get metrics registry: {}", t.getMessage());
return null;
}
Comment thread
Sumit6307 marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.metrics.Id;
import org.apache.seata.metrics.IdConstants;

import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSACTION_UNDO_LOG_SERIALIZATION;

Expand All @@ -38,4 +40,19 @@ public interface UndoLogConstants {
String SUB_SPLIT_KEY = ",";

String MAX_ALLOWED_PACKET = "map";

Id SUMMARY_UNDO_LOG_SIZE = new Id("seata.undo.log")
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_SUMMARY)
.withTag("type", "size");

Id TIMER_UNDO_LOG_DELETE_LATENCY = new Id("seata.undo.log")
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_TIMER)
.withTag("type", "delete_latency");

Id COUNTER_UNDO_LOG_DELETE_COUNT = new Id("seata.undo.log")
.withTag(IdConstants.ROLE_KEY, IdConstants.ROLE_VALUE_RM)
.withTag(IdConstants.METER_KEY, IdConstants.METER_VALUE_COUNTER)
.withTag("type", "delete_count");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.rm.datasource.undo;

import org.apache.seata.metrics.Id;
import org.apache.seata.metrics.registry.Registry;
import org.apache.seata.rm.datasource.ConnectionContext;
import org.apache.seata.rm.datasource.ConnectionProxy;
import org.apache.seata.rm.datasource.DataSourceProxy;
import org.apache.seata.rm.datasource.undo.mysql.MySQLUndoLogManager;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class UndoLogMetricsTest {

@Test
public void testFlushUndoLogsMetrics() throws Exception {
Registry registry = mock(Registry.class);
org.apache.seata.metrics.Summary summary = mock(org.apache.seata.metrics.Summary.class);
when(registry.getSummary(any(Id.class))).thenReturn(summary);

ConnectionProxy connectionProxy = mock(ConnectionProxy.class);
ConnectionContext context = mock(ConnectionContext.class);
DataSourceProxy dataSourceProxy = mock(DataSourceProxy.class);

when(connectionProxy.getContext()).thenReturn(context);
when(connectionProxy.getDataSourceProxy()).thenReturn(dataSourceProxy);
Connection connection = mock(Connection.class);
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement);
when(connectionProxy.getTargetConnection()).thenReturn(connection);
when(dataSourceProxy.getResourceId()).thenReturn("jdbc:mysql://localhost:3306/test");

when(context.hasUndoLog()).thenReturn(true);
when(context.getXid()).thenReturn("xid");
when(context.getBranchId()).thenReturn(123456L);
when(context.getUndoItems()).thenReturn(Collections.emptyList());

// Spy on the manager to mock getRegistry()
MySQLUndoLogManager manager = spy(new MySQLUndoLogManager());
doReturn(registry).when(manager).getRegistry();

manager.flushUndoLogs(connectionProxy);

verify(registry).getSummary(eq(UndoLogConstants.SUMMARY_UNDO_LOG_SIZE));
verify(summary).increase(anyLong());
}

@Test
public void testDeleteUndoLogMetrics() throws Exception {
Registry registry = mock(Registry.class);
org.apache.seata.metrics.Timer timer = mock(org.apache.seata.metrics.Timer.class);
org.apache.seata.metrics.Counter counter = mock(org.apache.seata.metrics.Counter.class);

when(registry.getTimer(any(Id.class))).thenReturn(timer);
when(registry.getCounter(any(Id.class))).thenReturn(counter);

Connection connection = mock(Connection.class);
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(connection.prepareStatement(any(String.class))).thenReturn(preparedStatement);

// Spy on the manager to mock getRegistry()
MySQLUndoLogManager manager = spy(new MySQLUndoLogManager());
doReturn(registry).when(manager).getRegistry();

manager.deleteUndoLog("xid", 123L, connection);

verify(registry).getTimer(eq(UndoLogConstants.TIMER_UNDO_LOG_DELETE_LATENCY));
verify(timer).record(anyLong(), any(TimeUnit.class));
verify(registry).getCounter(eq(UndoLogConstants.COUNTER_UNDO_LOG_DELETE_COUNT));
verify(counter).increase(1);
}
Comment thread
Sumit6307 marked this conversation as resolved.
}
Loading